QWindowsPipeWriter: ensure validity of the write buffer
QWindowsPipeWriter uses asynchronous API to perform writing. Once a cycle has been started, the write buffer must remain valid until the write operation is completed. To avoid data corruption and possibly undefined behavior, this patch makes QWindowsPipeWriter::write() take a QByteArray, which it keeps alive for the duration of the write cycle. Autotest-by: Thomas Hartmann Task-number: QTBUG-52401 Change-Id: Ia35faee735c4e684267daa1f6bd689512b670cd2 Reviewed-by: Joerg Bornemann <joerg.bornemann@theqtcompany.com>
This commit is contained in:
parent
c9f9f54d3f
commit
a4d26cf522
@ -1051,7 +1051,6 @@ bool QProcessPrivate::_q_canReadStandardError()
|
||||
*/
|
||||
bool QProcessPrivate::_q_canWrite()
|
||||
{
|
||||
Q_Q(QProcess);
|
||||
if (stdinChannel.notifier)
|
||||
stdinChannel.notifier->setEnabled(false);
|
||||
|
||||
@ -1062,31 +1061,13 @@ bool QProcessPrivate::_q_canWrite()
|
||||
return false;
|
||||
}
|
||||
|
||||
qint64 written = writeToStdin(stdinChannel.buffer.readPointer(),
|
||||
stdinChannel.buffer.nextDataBlockSize());
|
||||
if (written < 0) {
|
||||
closeChannel(&stdinChannel);
|
||||
setErrorAndEmit(QProcess::WriteError);
|
||||
return false;
|
||||
}
|
||||
const bool writeSucceeded = writeToStdin();
|
||||
|
||||
#if defined QPROCESS_DEBUG
|
||||
qDebug("QProcessPrivate::canWrite(), wrote %d bytes to the process input", int(written));
|
||||
#endif
|
||||
|
||||
if (written != 0) {
|
||||
stdinChannel.buffer.free(written);
|
||||
if (!emittedBytesWritten) {
|
||||
emittedBytesWritten = true;
|
||||
emit q->bytesWritten(written);
|
||||
emittedBytesWritten = false;
|
||||
}
|
||||
}
|
||||
if (stdinChannel.notifier && !stdinChannel.buffer.isEmpty())
|
||||
stdinChannel.notifier->setEnabled(true);
|
||||
if (stdinChannel.buffer.isEmpty() && stdinChannel.closed)
|
||||
closeWriteChannel();
|
||||
return true;
|
||||
return writeSucceeded;
|
||||
}
|
||||
|
||||
/*!
|
||||
|
@ -375,7 +375,7 @@ public:
|
||||
|
||||
qint64 bytesAvailableInChannel(const Channel *channel) const;
|
||||
qint64 readFromChannel(const Channel *channel, char *data, qint64 maxlen);
|
||||
qint64 writeToStdin(const char *data, qint64 maxlen);
|
||||
bool writeToStdin();
|
||||
|
||||
void cleanup();
|
||||
void setError(QProcess::ProcessError error, const QString &description = QString());
|
||||
|
@ -620,21 +620,36 @@ qint64 QProcessPrivate::readFromChannel(const Channel *channel, char *data, qint
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
qint64 QProcessPrivate::writeToStdin(const char *data, qint64 maxlen)
|
||||
bool QProcessPrivate::writeToStdin()
|
||||
{
|
||||
qint64 written = qt_safe_write_nosignal(stdinChannel.pipe[1], data, maxlen);
|
||||
const char *data = stdinChannel.buffer.readPointer();
|
||||
const qint64 bytesToWrite = stdinChannel.buffer.nextDataBlockSize();
|
||||
|
||||
qint64 written = qt_safe_write_nosignal(stdinChannel.pipe[1], data, bytesToWrite);
|
||||
#if defined QPROCESS_DEBUG
|
||||
qDebug("QProcessPrivate::writeToStdin(%p \"%s\", %lld) == %lld",
|
||||
data, qt_prettyDebug(data, maxlen, 16).constData(), maxlen, written);
|
||||
qDebug("QProcessPrivate::writeToStdin(), write(%p \"%s\", %lld) == %lld",
|
||||
data, qt_prettyDebug(data, bytesToWrite, 16).constData(), bytesToWrite, written);
|
||||
if (written == -1)
|
||||
qDebug("QProcessPrivate::writeToStdin(), failed to write (%s)", qPrintable(qt_error_string(errno)));
|
||||
#endif
|
||||
// If the O_NONBLOCK flag is set and If some data can be written without blocking
|
||||
// the process, write() will transfer what it can and return the number of bytes written.
|
||||
// Otherwise, it will return -1 and set errno to EAGAIN
|
||||
if (written == -1 && errno == EAGAIN)
|
||||
written = 0;
|
||||
return written;
|
||||
if (written == -1) {
|
||||
// If the O_NONBLOCK flag is set and If some data can be written without blocking
|
||||
// the process, write() will transfer what it can and return the number of bytes written.
|
||||
// Otherwise, it will return -1 and set errno to EAGAIN
|
||||
if (errno == EAGAIN)
|
||||
return true;
|
||||
|
||||
closeChannel(&stdinChannel);
|
||||
setErrorAndEmit(QProcess::WriteError);
|
||||
return false;
|
||||
}
|
||||
stdinChannel.buffer.free(written);
|
||||
if (!emittedBytesWritten && written != 0) {
|
||||
emittedBytesWritten = true;
|
||||
emit q_func()->bytesWritten(written);
|
||||
emittedBytesWritten = false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void QProcessPrivate::terminateProcess()
|
||||
|
@ -787,17 +787,23 @@ qint64 QProcessPrivate::pipeWriterBytesToWrite() const
|
||||
return stdinChannel.writer ? stdinChannel.writer->bytesToWrite() : qint64(0);
|
||||
}
|
||||
|
||||
qint64 QProcessPrivate::writeToStdin(const char *data, qint64 maxlen)
|
||||
bool QProcessPrivate::writeToStdin()
|
||||
{
|
||||
Q_Q(QProcess);
|
||||
|
||||
if (!stdinChannel.writer) {
|
||||
stdinChannel.writer = new QWindowsPipeWriter(stdinChannel.pipe[1], q);
|
||||
QObject::connect(stdinChannel.writer, &QWindowsPipeWriter::bytesWritten,
|
||||
q, &QProcess::bytesWritten);
|
||||
QObjectPrivate::connect(stdinChannel.writer, &QWindowsPipeWriter::canWrite,
|
||||
this, &QProcessPrivate::_q_canWrite);
|
||||
} else {
|
||||
if (stdinChannel.writer->isWriteOperationActive())
|
||||
return true;
|
||||
}
|
||||
|
||||
return stdinChannel.writer->write(data, maxlen);
|
||||
stdinChannel.writer->write(stdinChannel.buffer.read());
|
||||
return true;
|
||||
}
|
||||
|
||||
bool QProcessPrivate::waitForWrite(int msecs)
|
||||
|
@ -265,11 +265,9 @@ qint64 QProcessPrivate::pipeWriterBytesToWrite() const
|
||||
return 0;
|
||||
}
|
||||
|
||||
qint64 QProcessPrivate::writeToStdin(const char *data, qint64 maxlen)
|
||||
bool QProcessPrivate::writeToStdin()
|
||||
{
|
||||
Q_UNUSED(data);
|
||||
Q_UNUSED(maxlen);
|
||||
return -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool QProcessPrivate::waitForWrite(int msecs)
|
||||
|
@ -73,21 +73,19 @@ QWindowsPipeWriter::~QWindowsPipeWriter()
|
||||
|
||||
bool QWindowsPipeWriter::waitForWrite(int msecs)
|
||||
{
|
||||
if (!writeSequenceStarted)
|
||||
return false;
|
||||
|
||||
if (bytesWrittenPending) {
|
||||
if (!inBytesWritten)
|
||||
emitPendingBytesWrittenValue();
|
||||
emitPendingBytesWrittenValue();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!writeSequenceStarted)
|
||||
return false;
|
||||
|
||||
if (!waitForNotification(msecs))
|
||||
return false;
|
||||
|
||||
if (bytesWrittenPending) {
|
||||
if (!inBytesWritten)
|
||||
emitPendingBytesWrittenValue();
|
||||
emitPendingBytesWrittenValue();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -96,20 +94,24 @@ bool QWindowsPipeWriter::waitForWrite(int msecs)
|
||||
|
||||
qint64 QWindowsPipeWriter::bytesToWrite() const
|
||||
{
|
||||
return numberOfBytesToWrite;
|
||||
return numberOfBytesToWrite + pendingBytesWrittenValue;
|
||||
}
|
||||
|
||||
void QWindowsPipeWriter::emitPendingBytesWrittenValue()
|
||||
{
|
||||
if (bytesWrittenPending) {
|
||||
// Reset the state even if we don't emit bytesWritten().
|
||||
// It's a defined behavior to not re-emit this signal recursively.
|
||||
bytesWrittenPending = false;
|
||||
const qint64 bytes = pendingBytesWrittenValue;
|
||||
pendingBytesWrittenValue = 0;
|
||||
|
||||
inBytesWritten = true;
|
||||
emit bytesWritten(bytes);
|
||||
inBytesWritten = false;
|
||||
emit canWrite();
|
||||
if (!inBytesWritten) {
|
||||
inBytesWritten = true;
|
||||
emit bytesWritten(bytes);
|
||||
inBytesWritten = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,6 +131,8 @@ void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
|
||||
notifiedCalled = true;
|
||||
writeSequenceStarted = false;
|
||||
numberOfBytesToWrite = 0;
|
||||
Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == buffer.size());
|
||||
buffer.clear();
|
||||
|
||||
switch (errorCode) {
|
||||
case ERROR_SUCCESS:
|
||||
@ -173,21 +177,26 @@ bool QWindowsPipeWriter::waitForNotification(int timeout)
|
||||
return notifiedCalled;
|
||||
}
|
||||
|
||||
qint64 QWindowsPipeWriter::write(const char *ptr, qint64 maxlen)
|
||||
bool QWindowsPipeWriter::write(const QByteArray &ba)
|
||||
{
|
||||
if (writeSequenceStarted)
|
||||
return 0;
|
||||
return false;
|
||||
|
||||
overlapped.clear();
|
||||
numberOfBytesToWrite = maxlen;
|
||||
buffer = ba;
|
||||
numberOfBytesToWrite = buffer.size();
|
||||
stopped = false;
|
||||
writeSequenceStarted = true;
|
||||
if (!WriteFileEx(handle, ptr, maxlen, &overlapped, &writeFileCompleted)) {
|
||||
if (!WriteFileEx(handle, buffer.constData(), numberOfBytesToWrite,
|
||||
&overlapped, &writeFileCompleted)) {
|
||||
writeSequenceStarted = false;
|
||||
numberOfBytesToWrite = 0;
|
||||
buffer.clear();
|
||||
qErrnoWarning("QWindowsPipeWriter::write failed.");
|
||||
return false;
|
||||
}
|
||||
|
||||
return maxlen;
|
||||
return true;
|
||||
}
|
||||
|
||||
void QWindowsPipeWriter::stop()
|
||||
|
@ -47,6 +47,7 @@
|
||||
|
||||
#include <qelapsedtimer.h>
|
||||
#include <qobject.h>
|
||||
#include <qbytearray.h>
|
||||
#include <qt_windows.h>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
@ -106,7 +107,7 @@ public:
|
||||
explicit QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent = 0);
|
||||
~QWindowsPipeWriter();
|
||||
|
||||
qint64 write(const char *data, qint64 maxlen);
|
||||
bool write(const QByteArray &ba);
|
||||
void stop();
|
||||
bool waitForWrite(int msecs);
|
||||
bool isWriteOperationActive() const { return writeSequenceStarted; }
|
||||
@ -136,6 +137,7 @@ private:
|
||||
|
||||
HANDLE handle;
|
||||
Overlapped overlapped;
|
||||
QByteArray buffer;
|
||||
qint64 numberOfBytesToWrite;
|
||||
qint64 pendingBytesWrittenValue;
|
||||
bool stopped;
|
||||
|
@ -124,7 +124,7 @@ private:
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState))
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError))
|
||||
#elif defined(Q_OS_WIN)
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_bytesWritten(qint64))
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_canWrite())
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_pipeClosed())
|
||||
Q_PRIVATE_SLOT(d_func(), void _q_winError(ulong, const QString &))
|
||||
#else
|
||||
|
@ -124,8 +124,7 @@ public:
|
||||
~QLocalSocketPrivate();
|
||||
void destroyPipeHandles();
|
||||
void setErrorString(const QString &function);
|
||||
void startNextWrite();
|
||||
void _q_bytesWritten(qint64 bytes);
|
||||
void _q_canWrite();
|
||||
void _q_pipeClosed();
|
||||
void _q_winError(ulong windowsError, const QString &function);
|
||||
HANDLE handle;
|
||||
|
@ -212,11 +212,12 @@ qint64 QLocalSocket::writeData(const char *data, qint64 len)
|
||||
memcpy(dest, data, len);
|
||||
if (!d->pipeWriter) {
|
||||
d->pipeWriter = new QWindowsPipeWriter(d->handle, this);
|
||||
QObjectPrivate::connect(d->pipeWriter, &QWindowsPipeWriter::bytesWritten,
|
||||
d, &QLocalSocketPrivate::_q_bytesWritten);
|
||||
connect(d->pipeWriter, &QWindowsPipeWriter::bytesWritten,
|
||||
this, &QLocalSocket::bytesWritten);
|
||||
QObjectPrivate::connect(d->pipeWriter, &QWindowsPipeWriter::canWrite,
|
||||
d, &QLocalSocketPrivate::_q_canWrite);
|
||||
}
|
||||
if (!d->pipeWriter->isWriteOperationActive())
|
||||
d->startNextWrite();
|
||||
d->_q_canWrite();
|
||||
return len;
|
||||
}
|
||||
|
||||
@ -269,7 +270,7 @@ qint64 QLocalSocket::bytesAvailable() const
|
||||
qint64 QLocalSocket::bytesToWrite() const
|
||||
{
|
||||
Q_D(const QLocalSocket);
|
||||
return d->writeBuffer.size();
|
||||
return d->writeBuffer.size() + (d->pipeWriter ? d->pipeWriter->bytesToWrite() : 0);
|
||||
}
|
||||
|
||||
bool QLocalSocket::canReadLine() const
|
||||
@ -352,7 +353,7 @@ bool QLocalSocket::setSocketDescriptor(qintptr socketDescriptor,
|
||||
return true;
|
||||
}
|
||||
|
||||
void QLocalSocketPrivate::startNextWrite()
|
||||
void QLocalSocketPrivate::_q_canWrite()
|
||||
{
|
||||
Q_Q(QLocalSocket);
|
||||
if (writeBuffer.isEmpty()) {
|
||||
@ -360,18 +361,11 @@ void QLocalSocketPrivate::startNextWrite()
|
||||
q->close();
|
||||
} else {
|
||||
Q_ASSERT(pipeWriter);
|
||||
pipeWriter->write(writeBuffer.readPointer(), writeBuffer.nextDataBlockSize());
|
||||
if (!pipeWriter->isWriteOperationActive())
|
||||
pipeWriter->write(writeBuffer.read());
|
||||
}
|
||||
}
|
||||
|
||||
void QLocalSocketPrivate::_q_bytesWritten(qint64 bytes)
|
||||
{
|
||||
Q_Q(QLocalSocket);
|
||||
writeBuffer.free(bytes);
|
||||
startNextWrite();
|
||||
emit q->bytesWritten(bytes);
|
||||
}
|
||||
|
||||
qintptr QLocalSocket::socketDescriptor() const
|
||||
{
|
||||
Q_D(const QLocalSocket);
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include <QtTest/QtTest>
|
||||
|
||||
#include <qtextstream.h>
|
||||
#include <qdatastream.h>
|
||||
#include <QtNetwork/qlocalsocket.h>
|
||||
#include <QtNetwork/qlocalserver.h>
|
||||
|
||||
@ -78,6 +79,9 @@ private slots:
|
||||
|
||||
void readBufferOverflow();
|
||||
|
||||
void simpleCommandProtocol1();
|
||||
void simpleCommandProtocol2();
|
||||
|
||||
void fullPath();
|
||||
|
||||
void hitMaximumConnections_data();
|
||||
@ -630,6 +634,110 @@ void tst_QLocalSocket::readBufferOverflow()
|
||||
QCOMPARE(client.bytesAvailable(), 0);
|
||||
}
|
||||
|
||||
static qint64 writeCommand(const QVariant &command, QIODevice *device, int commandCounter)
|
||||
{
|
||||
QByteArray block;
|
||||
QDataStream out(&block, QIODevice::WriteOnly);
|
||||
out << qint64(0);
|
||||
out << commandCounter;
|
||||
out << command;
|
||||
out.device()->seek(0);
|
||||
out << qint64(block.size() - sizeof(qint64));
|
||||
return device->write(block);
|
||||
}
|
||||
|
||||
static QVariant readCommand(QIODevice *ioDevice, int *readCommandCounter, bool readSize = true)
|
||||
{
|
||||
QDataStream in(ioDevice);
|
||||
qint64 blockSize;
|
||||
int commandCounter;
|
||||
if (readSize)
|
||||
in >> blockSize;
|
||||
in >> commandCounter;
|
||||
*readCommandCounter = commandCounter;
|
||||
|
||||
QVariant command;
|
||||
in >> command;
|
||||
|
||||
return command;
|
||||
}
|
||||
|
||||
void tst_QLocalSocket::simpleCommandProtocol1()
|
||||
{
|
||||
QLocalServer server;
|
||||
server.listen(QStringLiteral("simpleProtocol"));
|
||||
|
||||
QLocalSocket localSocketWrite;
|
||||
localSocketWrite.connectToServer(server.serverName());
|
||||
QVERIFY(server.waitForNewConnection());
|
||||
QLocalSocket *localSocketRead = server.nextPendingConnection();
|
||||
QVERIFY(localSocketRead);
|
||||
|
||||
int readCounter = 0;
|
||||
for (int i = 0; i < 2000; ++i) {
|
||||
const QVariant command(QRect(readCounter, i, 10, 10));
|
||||
const qint64 blockSize = writeCommand(command, &localSocketWrite, i);
|
||||
while (localSocketWrite.bytesToWrite())
|
||||
QVERIFY(localSocketWrite.waitForBytesWritten());
|
||||
while (localSocketRead->bytesAvailable() < blockSize) {
|
||||
QVERIFY(localSocketRead->waitForReadyRead(1000));
|
||||
}
|
||||
const QVariant variant = readCommand(localSocketRead, &readCounter);
|
||||
QCOMPARE(readCounter, i);
|
||||
QCOMPARE(variant, command);
|
||||
}
|
||||
}
|
||||
|
||||
void tst_QLocalSocket::simpleCommandProtocol2()
|
||||
{
|
||||
QLocalServer server;
|
||||
server.listen(QStringLiteral("simpleProtocol"));
|
||||
|
||||
QLocalSocket localSocketWrite;
|
||||
localSocketWrite.connectToServer(server.serverName());
|
||||
QVERIFY(server.waitForNewConnection());
|
||||
QLocalSocket* localSocketRead = server.nextPendingConnection();
|
||||
QVERIFY(localSocketRead);
|
||||
|
||||
int readCounter = 0;
|
||||
qint64 writtenBlockSize = 0;
|
||||
qint64 blockSize = 0;
|
||||
|
||||
QObject::connect(localSocketRead, &QLocalSocket::readyRead, [&] {
|
||||
forever {
|
||||
if (localSocketRead->bytesAvailable() < sizeof(qint64))
|
||||
return;
|
||||
|
||||
if (blockSize == 0) {
|
||||
QDataStream in(localSocketRead);
|
||||
in >> blockSize;
|
||||
}
|
||||
|
||||
if (localSocketRead->bytesAvailable() < blockSize)
|
||||
return;
|
||||
|
||||
int commandNumber = 0;
|
||||
const QVariant variant = readCommand(localSocketRead, &commandNumber, false);
|
||||
QCOMPARE(writtenBlockSize, blockSize);
|
||||
QCOMPARE(readCounter, commandNumber);
|
||||
QCOMPARE(variant.userType(), (int)QMetaType::QRect);
|
||||
|
||||
readCounter++;
|
||||
blockSize = 0;
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < 500; ++i) {
|
||||
const QVariant command(QRect(readCounter, i, 10, 10));
|
||||
writtenBlockSize = writeCommand(command, &localSocketWrite, i) - sizeof(qint64);
|
||||
if (i % 10 == 0)
|
||||
QTest::qWait(1);
|
||||
}
|
||||
|
||||
localSocketWrite.abort();
|
||||
QVERIFY(localSocketRead->waitForDisconnected(1000));
|
||||
}
|
||||
|
||||
// QLocalSocket/Server can take a name or path, check that it works as expected
|
||||
void tst_QLocalSocket::fullPath()
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user