QIODevice: add infrastructure for multistreaming

Some sequential devices allow data to be partitioned into several
channels that have the property of independently sequenced delivery.
Supporting such devices uniformly requires a unified API which provides
the user with a uniform concept of multistreaming.

This patch is based on QProcess's multiplexing model and introduces
the following features:

 - ability to get the number of channels;
 - multiple internal read/write buffers;
 - channel selection functions;
 - notification signals on channel activity.

To keep the source code compatible with single-channel implementations,
introduce a private class that references the current read buffer and
hides multistreaming internals from the user.

Bump the TypeInformationVersion field in qtHookData, to notify the
Qt Creator developers that the offset of QFilePrivate::fileName was
changed and dumpers should be adapted.

[ChangeLog][QtCore] Added multistreaming to QIODevice.

Change-Id: Idcaa6a618927c101c4c7284d2a633913be6a6ee2
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@theqtcompany.com>
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
This commit is contained in:
Alex Trotsenko 2015-04-24 19:02:02 +03:00
parent acbd79996d
commit 1c0494e63b
6 changed files with 264 additions and 14 deletions

View File

@ -67,7 +67,7 @@ quintptr Q_CORE_EXPORT qtHookData[] = {
// The required sizes and offsets are tested in tests/auto/other/toolsupport.
// When this fails and the change was intentional, adjust the test and
// adjust this value here.
1
2
};
Q_STATIC_ASSERT(QHooks::LastHookIndex == sizeof(qtHookData) / sizeof(qtHookData[0]));

View File

@ -148,8 +148,12 @@ static void checkWarnMessage(const QIODevice *device, const char *function, cons
\internal
*/
QIODevicePrivate::QIODevicePrivate()
: openMode(QIODevice::NotOpen), buffer(QIODEVICE_BUFFERSIZE),
: openMode(QIODevice::NotOpen),
pos(0), devicePos(0),
readChannelCount(0),
writeChannelCount(0),
currentReadChannel(0),
currentWriteChannel(0),
transactionPos(0),
transactionStarted(false)
, baseReadLineDataCalled(false)
@ -278,6 +282,15 @@ QIODevicePrivate::~QIODevicePrivate()
mechanism implemented by QIODevice. See startTransaction() and related
functions for more details.
Some sequential devices support communicating via multiple channels. These
channels represent separate streams of data that have the property of
independently sequenced delivery. Once the device is opened, you can
determine the number of channels by calling the readChannelCount() and
writeChannelCount() functions. To switch between channels, call
setCurrentReadChannel() and setCurrentWriteChannel(), respectively.
QIODevice also provides additional signals to handle asynchronous
communication on a per-channel basis.
\sa QBuffer, QFile, QTcpSocket
*/
@ -315,8 +328,8 @@ QIODevicePrivate::~QIODevicePrivate()
/*! \fn QIODevice::bytesWritten(qint64 bytes)
This signal is emitted every time a payload of data has been
written to the device. The \a bytes argument is set to the number
of bytes that were written in this payload.
written to the device's current write channel. The \a bytes argument is
set to the number of bytes that were written in this payload.
bytesWritten() is not emitted recursively; if you reenter the event loop
or call waitForBytesWritten() inside a slot connected to the
@ -326,13 +339,29 @@ QIODevicePrivate::~QIODevicePrivate()
\sa readyRead()
*/
/*!
\fn QIODevice::channelBytesWritten(int channel, qint64 bytes)
\since 5.7
This signal is emitted every time a payload of data has been written to
the device. The \a bytes argument is set to the number of bytes that were
written in this payload, while \a channel is the channel they were written
to. Unlike bytesWritten(), it is emitted regardless of the
\l{currentWriteChannel()}{current write channel}.
channelBytesWritten() can be emitted recursively - even for the same
channel.
\sa bytesWritten(), channelReadyRead()
*/
/*!
\fn QIODevice::readyRead()
This signal is emitted once every time new data is available for
reading from the device. It will only be emitted again once new
data is available, such as when a new payload of network data has
arrived on your network socket, or when a new block of data has
reading from the device's current read channel. It will only be emitted
again once new data is available, such as when a new payload of network
data has arrived on your network socket, or when a new block of data has
been appended to your device.
readyRead() is not emitted recursively; if you reenter the event loop or
@ -348,6 +377,20 @@ QIODevicePrivate::~QIODevicePrivate()
\sa bytesWritten()
*/
/*!
\fn QIODevice::channelReadyRead(int channel)
\since 5.7
This signal is emitted when new data is available for reading from the
device. The \a channel argument is set to the index of the read channel on
which the data has arrived. Unlike readyRead(), it is emitted regardless of
the \l{currentReadChannel()}{current read channel}.
channelReadyRead() can be emitted recursively - even for the same channel.
\sa readyRead(), channelBytesWritten()
*/
/*! \fn QIODevice::aboutToClose()
This signal is emitted when the device is about to close. Connect
@ -483,8 +526,8 @@ void QIODevice::setOpenMode(OpenMode openMode)
#endif
d->openMode = openMode;
d->accessMode = QIODevicePrivate::Unset;
if (!isReadable())
d->buffer.clear();
d->setReadChannelCount(isReadable() ? qMax(d->readChannelCount, 1) : 0);
d->setWriteChannelCount(isWritable() ? qMax(d->writeChannelCount, 1) : 0);
}
/*!
@ -560,6 +603,135 @@ bool QIODevice::isWritable() const
return (openMode() & WriteOnly) != 0;
}
/*!
\since 5.7
Returns the number of available read channels if the device is open;
otherwise returns 0.
\sa writeChannelCount(), QProcess
*/
int QIODevice::readChannelCount() const
{
return d_func()->readChannelCount;
}
/*!
\since 5.7
Returns the number of available write channels if the device is open;
otherwise returns 0.
\sa readChannelCount()
*/
int QIODevice::writeChannelCount() const
{
return d_func()->writeChannelCount;
}
/*!
\since 5.7
Returns the index of the current read channel.
\sa setCurrentReadChannel(), readChannelCount(), QProcess
*/
int QIODevice::currentReadChannel() const
{
return d_func()->currentReadChannel;
}
/*!
\since 5.7
Sets the current read channel of the QIODevice to the given \a
channel. The current input channel is used by the functions
read(), readAll(), readLine(), and getChar(). It also determines
which channel triggers QIODevice to emit readyRead().
\sa currentReadChannel(), readChannelCount(), QProcess
*/
void QIODevice::setCurrentReadChannel(int channel)
{
Q_D(QIODevice);
if (d->transactionStarted) {
checkWarnMessage(this, "setReadChannel", "Failed due to read transaction being in progress");
return;
}
#if defined QIODEVICE_DEBUG
qDebug("%p QIODevice::setCurrentReadChannel(%d), d->currentReadChannel = %d, d->readChannelCount = %d\n",
this, channel, d->currentReadChannel, d->readChannelCount);
#endif
d->setCurrentReadChannel(channel);
}
/*!
\internal
*/
void QIODevicePrivate::setReadChannelCount(int count)
{
if (count > readBuffers.size()) {
readBuffers.insert(readBuffers.end(), count - readBuffers.size(),
QRingBuffer(QIODEVICE_BUFFERSIZE));
} else {
readBuffers.resize(count);
}
readChannelCount = count;
setCurrentReadChannel(currentReadChannel);
}
/*!
\since 5.7
Returns the the index of the current write channel.
\sa setCurrentWriteChannel(), writeChannelCount()
*/
int QIODevice::currentWriteChannel() const
{
return d_func()->currentWriteChannel;
}
/*!
\since 5.7
Sets the current write channel of the QIODevice to the given \a
channel. The current output channel is used by the functions
write(), putChar(). It also determines which channel triggers
QIODevice to emit bytesWritten().
\sa currentWriteChannel(), writeChannelCount()
*/
void QIODevice::setCurrentWriteChannel(int channel)
{
Q_D(QIODevice);
#if defined QIODEVICE_DEBUG
qDebug("%p QIODevice::setCurrentWriteChannel(%d), d->currentWriteChannel = %d, d->writeChannelCount = %d\n",
this, channel, d->currentWriteChannel, d->writeChannelCount);
#endif
d->setCurrentWriteChannel(channel);
}
/*!
\internal
*/
void QIODevicePrivate::setWriteChannelCount(int count)
{
if (count > writeBuffers.size()) {
writeBuffers.insert(writeBuffers.end(), count - writeBuffers.size(),
QRingBuffer(QIODEVICE_BUFFERSIZE));
} else {
writeBuffers.resize(count);
}
writeChannelCount = count;
setCurrentWriteChannel(currentWriteChannel);
}
/*!
Opens the device and sets its OpenMode to \a mode. Returns \c true if successful;
otherwise returns \c false. This function should be called from any
@ -572,8 +744,11 @@ bool QIODevice::open(OpenMode mode)
Q_D(QIODevice);
d->openMode = mode;
d->pos = (mode & Append) ? size() : qint64(0);
d->buffer.clear();
d->accessMode = QIODevicePrivate::Unset;
d->readBuffers.clear();
d->writeBuffers.clear();
d->setReadChannelCount(isReadable() ? 1 : 0);
d->setWriteChannelCount(isWritable() ? 1 : 0);
#if defined QIODEVICE_DEBUG
printf("%p QIODevice::open(0x%x)\n", this, quint32(mode));
#endif
@ -604,7 +779,9 @@ void QIODevice::close()
d->pos = 0;
d->transactionStarted = false;
d->transactionPos = 0;
d->buffer.clear();
d->setReadChannelCount(0);
// Do not clear write buffers to allow delayed close in sockets
d->writeChannelCount = 0;
}
/*!
@ -771,11 +948,14 @@ qint64 QIODevice::bytesAvailable() const
waiting to be written. For devices with no buffer, this function
returns 0.
Subclasses that reimplement this function must call the base
implementation in order to include the size of the buffer of QIODevice.
\sa bytesAvailable(), bytesWritten(), isSequential()
*/
qint64 QIODevice::bytesToWrite() const
{
return qint64(0);
return d_func()->writeBuffer.size();
}
/*!

View File

@ -96,6 +96,13 @@ public:
bool isWritable() const;
virtual bool isSequential() const;
int readChannelCount() const;
int writeChannelCount() const;
int currentReadChannel() const;
void setCurrentReadChannel(int channel);
int currentWriteChannel() const;
void setCurrentWriteChannel(int channel);
virtual bool open(OpenMode mode);
virtual void close();
@ -142,7 +149,9 @@ public:
#ifndef QT_NO_QOBJECT
Q_SIGNALS:
void readyRead();
void channelReadyRead(int channel);
void bytesWritten(qint64 bytes);
void channelBytesWritten(int channel, qint64 bytes);
void aboutToClose();
void readChannelFinished();
#endif

View File

@ -56,6 +56,7 @@
#include "QtCore/qobjectdefs.h"
#include "QtCore/qstring.h"
#include "private/qringbuffer_p.h"
#include "QtCore/qvector.h"
#ifndef QT_NO_QOBJECT
#include "private/qobject_p.h"
#endif
@ -82,9 +83,48 @@ public:
QIODevice::OpenMode openMode;
QString errorString;
QRingBuffer buffer;
QVector<QRingBuffer> readBuffers;
QVector<QRingBuffer> writeBuffers;
class QRingBufferRef {
QRingBuffer *m_buf;
inline QRingBufferRef() : m_buf(Q_NULLPTR) { }
friend class QIODevicePrivate;
public:
// wrap functions from QRingBuffer
inline qint64 nextDataBlockSize() const { return (m_buf ? m_buf->nextDataBlockSize() : Q_INT64_C(0)); }
inline const char *readPointer() const { return (m_buf ? m_buf->readPointer() : Q_NULLPTR); }
inline const char *readPointerAtPosition(qint64 pos, qint64 &length) const { Q_ASSERT(m_buf); return m_buf->readPointerAtPosition(pos, length); }
inline void free(qint64 bytes) { Q_ASSERT(m_buf); m_buf->free(bytes); }
inline char *reserve(qint64 bytes) { Q_ASSERT(m_buf); return m_buf->reserve(bytes); }
inline char *reserveFront(qint64 bytes) { Q_ASSERT(m_buf); return m_buf->reserveFront(bytes); }
inline void truncate(qint64 pos) { Q_ASSERT(m_buf); m_buf->truncate(pos); }
inline void chop(qint64 bytes) { Q_ASSERT(m_buf); m_buf->chop(bytes); }
inline bool isEmpty() const { return !m_buf || m_buf->isEmpty(); }
inline int getChar() { return (m_buf ? m_buf->getChar() : -1); }
inline void putChar(char c) { Q_ASSERT(m_buf); m_buf->putChar(c); }
inline void ungetChar(char c) { Q_ASSERT(m_buf); m_buf->ungetChar(c); }
inline qint64 size() const { return (m_buf ? m_buf->size() : Q_INT64_C(0)); }
inline void clear() { if (m_buf) m_buf->clear(); }
inline qint64 indexOf(char c) const { return (m_buf ? m_buf->indexOf(c, m_buf->size()) : Q_INT64_C(-1)); }
inline qint64 indexOf(char c, qint64 maxLength, qint64 pos = 0) const { return (m_buf ? m_buf->indexOf(c, maxLength, pos) : Q_INT64_C(-1)); }
inline qint64 read(char *data, qint64 maxLength) { return (m_buf ? m_buf->read(data, maxLength) : Q_INT64_C(0)); }
inline QByteArray read() { return (m_buf ? m_buf->read() : QByteArray()); }
inline qint64 peek(char *data, qint64 maxLength, qint64 pos = 0) const { return (m_buf ? m_buf->peek(data, maxLength, pos) : Q_INT64_C(0)); }
inline void append(const QByteArray &qba) { Q_ASSERT(m_buf); m_buf->append(qba); }
inline qint64 skip(qint64 length) { return (m_buf ? m_buf->skip(length) : Q_INT64_C(0)); }
inline qint64 readLine(char *data, qint64 maxLength) { return (m_buf ? m_buf->readLine(data, maxLength) : Q_INT64_C(-1)); }
inline bool canReadLine() const { return m_buf && m_buf->canReadLine(); }
};
QRingBufferRef buffer;
QRingBufferRef writeBuffer;
qint64 pos;
qint64 devicePos;
int readChannelCount;
int writeChannelCount;
int currentReadChannel;
int currentWriteChannel;
qint64 transactionPos;
bool transactionStarted;
bool baseReadLineDataCalled;
@ -111,6 +151,19 @@ public:
}
void seekBuffer(qint64 newPos);
inline void setCurrentReadChannel(int channel)
{
buffer.m_buf = (channel < readBuffers.size() ? &readBuffers[channel] : nullptr);
currentReadChannel = channel;
}
inline void setCurrentWriteChannel(int channel)
{
writeBuffer.m_buf = (channel < writeBuffers.size() ? &writeBuffers[channel] : nullptr);
currentWriteChannel = channel;
}
void setReadChannelCount(int count);
void setWriteChannelCount(int count);
virtual qint64 peek(char *data, qint64 maxSize);
virtual QByteArray peek(qint64 maxSize);

View File

@ -114,6 +114,8 @@ void tst_QIODevice::constructing_QTcpSocket()
socket.connectToHost(QtNetworkSettings::serverName(), 143);
QVERIFY(socket.waitForConnected(30000));
QVERIFY(device->isOpen());
QCOMPARE(device->readChannelCount(), 1);
QCOMPARE(device->writeChannelCount(), 1);
while (!device->canReadLine())
QVERIFY(device->waitForReadyRead(30000));
@ -125,6 +127,8 @@ void tst_QIODevice::constructing_QTcpSocket()
QCOMPARE(socket.pos(), qlonglong(0));
socket.close();
QCOMPARE(socket.readChannelCount(), 0);
QCOMPARE(socket.writeChannelCount(), 0);
socket.connectToHost(QtNetworkSettings::serverName(), 143);
QVERIFY(socket.waitForConnected(30000));
QVERIFY(device->isOpen());
@ -158,6 +162,8 @@ void tst_QIODevice::constructing_QFile()
QVERIFY(file.open(QFile::ReadOnly));
QVERIFY(device->isOpen());
QCOMPARE((int) device->openMode(), (int) QFile::ReadOnly);
QCOMPARE(device->readChannelCount(), 1);
QCOMPARE(device->writeChannelCount(), 0);
char buf[1024];
memset(buf, 0, sizeof(buf));
@ -576,6 +582,8 @@ void tst_QIODevice::readAllKeepPosition()
buffer.open(QIODevice::ReadOnly);
char c;
QCOMPARE(buffer.readChannelCount(), 1);
QCOMPARE(buffer.writeChannelCount(), 0);
QVERIFY(buffer.getChar(&c));
QCOMPARE(buffer.pos(), qint64(0));
buffer.ungetChar(c);

View File

@ -124,7 +124,7 @@ void tst_toolsupport::offsets_data()
{
QTestData &data = QTest::newRow("QFilePrivate::fileName")
<< pmm_to_offsetof(&QFilePrivate::fileName);
data << 184 << 256;
data << 188 << 272;
}
#endif