From 4e695eece7e1de874f3f8bb0a530a2ae05a0014a Mon Sep 17 00:00:00 2001 From: Alex Trotsenko Date: Tue, 24 Aug 2021 18:30:12 +0300 Subject: [PATCH] QWindowsPipeWriter: do not clear the buffer in thread pool callback In a blocking application, receiving the results of write operations must be synchronized with the waitFor...() functions. But, clearing the buffer in another thread can cause the code localsocket.write(...); QVERIFY(localsocket.bytesToWrite() > 0); to fail unexpectedly, if the socket has been disconnected between the calls. So, defer resetting the buffer until checkForWrite() is called. Change-Id: I8c21036aab6a4c56d02c0d9a18d4bbce52d724f4 Reviewed-by: Oswald Buddenhagen --- src/corelib/io/qprocess_win.cpp | 3 +- src/corelib/io/qwindowspipewriter.cpp | 126 +++++++++++------- src/corelib/io/qwindowspipewriter_p.h | 9 +- src/network/socket/qlocalsocket_win.cpp | 2 +- .../socket/qlocalsocket/tst_qlocalsocket.cpp | 27 ++++ 5 files changed, 117 insertions(+), 50 deletions(-) diff --git a/src/corelib/io/qprocess_win.cpp b/src/corelib/io/qprocess_win.cpp index d5b1bd6f6a..f935e4f491 100644 --- a/src/corelib/io/qprocess_win.cpp +++ b/src/corelib/io/qprocess_win.cpp @@ -737,7 +737,7 @@ bool QProcessPrivate::waitForBytesWritten(const QDeadlineTimer &deadline) // start with, in which case we fail immediately. Also, if the input // pipe goes down somewhere in the code below, we avoid waiting for // a full timeout. - if (pipeWriterBytesToWrite() == 0) + if (!stdinChannel.writer || !stdinChannel.writer->isWriteOperationActive()) return false; QProcessPoller poller(*this); @@ -747,7 +747,6 @@ bool QProcessPrivate::waitForBytesWritten(const QDeadlineTimer &deadline) if (ret == 0) break; - Q_ASSERT(stdinChannel.writer); if (stdinChannel.writer->checkForWrite()) return true; diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp index 182c3e3c6f..843ff6a00b 100644 --- a/src/corelib/io/qwindowspipewriter.cpp +++ b/src/corelib/io/qwindowspipewriter.cpp @@ -41,6 +41,7 @@ #include "qwindowspipewriter_p.h" #include #include +#include QT_BEGIN_NAMESPACE @@ -52,6 +53,7 @@ QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent) waitObject(NULL), pendingBytesWrittenValue(0), lastError(ERROR_SUCCESS), + completionState(NoError), stopped(true), writeSequenceStarted(false), bytesWrittenPending(false), @@ -82,7 +84,7 @@ void QWindowsPipeWriter::setHandle(HANDLE hPipeWriteEnd) handle = hPipeWriteEnd; QMutexLocker locker(&mutex); - startAsyncWriteLocked(&locker); + startAsyncWriteHelper(&locker); } /*! @@ -123,12 +125,21 @@ qint64 QWindowsPipeWriter::bytesToWrite() const return writeBuffer.size() + pendingBytesWrittenValue; } +/*! + Returns \c true if async operation is in progress. +*/ +bool QWindowsPipeWriter::isWriteOperationActive() const +{ + return completionState == NoError && bytesToWrite() != 0; +} + /*! Writes a shallow copy of \a ba to the internal buffer. */ void QWindowsPipeWriter::write(const QByteArray &ba) { - writeImpl(ba); + if (completionState != WriteDisabled) + writeImpl(ba); } /*! @@ -136,7 +147,8 @@ void QWindowsPipeWriter::write(const QByteArray &ba) */ void QWindowsPipeWriter::write(const char *data, qint64 size) { - writeImpl(data, size); + if (completionState != WriteDisabled) + writeImpl(data, size); } template @@ -144,12 +156,9 @@ inline void QWindowsPipeWriter::writeImpl(Args... args) { QMutexLocker locker(&mutex); - if (lastError != ERROR_SUCCESS) - return; - writeBuffer.append(args...); - if (writeSequenceStarted) + if (writeSequenceStarted || (lastError != ERROR_SUCCESS)) return; stopped = false; @@ -157,13 +166,24 @@ inline void QWindowsPipeWriter::writeImpl(Args... args) // If we don't have an assigned handle yet, defer writing until // setHandle() is called. if (handle != INVALID_HANDLE_VALUE) - startAsyncWriteLocked(&locker); + startAsyncWriteHelper(&locker); +} + +void QWindowsPipeWriter::startAsyncWriteHelper(QMutexLocker *locker) +{ + startAsyncWriteLocked(); + + // Do not post the event, if the write operation will be completed asynchronously. + if (!bytesWrittenPending && lastError == ERROR_SUCCESS) + return; + + notifyCompleted(locker); } /*! Starts a new write sequence. */ -void QWindowsPipeWriter::startAsyncWriteLocked(QMutexLocker *locker) +void QWindowsPipeWriter::startAsyncWriteLocked() { while (!writeBuffer.isEmpty()) { // WriteFile() returns true, if the write operation completes synchronously. @@ -185,22 +205,6 @@ void QWindowsPipeWriter::startAsyncWriteLocked(QMutexLocker *locker) if (!writeCompleted(errorCode, numberOfBytesWritten)) break; } - - // Do not post the event, if the write operation will be completed asynchronously. - if (!bytesWrittenPending) - return; - - if (!winEventActPosted) { - winEventActPosted = true; - locker->unlock(); - QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); - } else { - locker->unlock(); - } - - // We set the event only after unlocking to avoid additional context - // switches due to the released thread immediately running into the lock. - SetEvent(syncHandle); } /*! @@ -232,16 +236,12 @@ void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID cont pipeWriter->writeSequenceStarted = false; - if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered)) { - pipeWriter->startAsyncWriteLocked(&locker); - } else { - // The write operation failed, so we must unblock the main thread, - // which can wait for the event. We set the event only after unlocking - // to avoid additional context switches due to the released thread - // immediately running into the lock. - locker.unlock(); - SetEvent(pipeWriter->syncHandle); - } + if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered)) + pipeWriter->startAsyncWriteLocked(); + + // We post the notification even if the write operation failed, + // to unblock the main thread, in case it is waiting for the event. + pipeWriter->notifyCompleted(&locker); } /*! @@ -258,7 +258,6 @@ bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWrit } lastError = errorCode; - writeBuffer.clear(); switch (errorCode) { case ERROR_PIPE_NOT_CONNECTED: // the other end has closed the pipe case ERROR_OPERATION_ABORTED: // the operation was canceled @@ -268,9 +267,29 @@ bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWrit qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed."); break; } + // The buffer is not cleared here, because the write progress + // should appear on the main thread synchronously. return false; } +/*! + Posts a notification event to the main thread. + */ +void QWindowsPipeWriter::notifyCompleted(QMutexLocker *locker) +{ + if (!winEventActPosted) { + winEventActPosted = true; + locker->unlock(); + QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); + } else { + locker->unlock(); + } + + // We set the event only after unlocking to avoid additional context + // switches due to the released thread immediately running into the lock. + SetEvent(syncHandle); +} + /*! Receives notification that the write operation has completed. */ @@ -296,14 +315,13 @@ bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting) if (allowWinActPosting) winEventActPosted = false; - if (!bytesWrittenPending) - return false; - - // Reset the state even if we don't emit bytesWritten(). - // It's a defined behavior to not re-emit this signal recursively. - bytesWrittenPending = false; - qint64 numberOfBytesWritten = pendingBytesWrittenValue; - pendingBytesWrittenValue = 0; + const qint64 numberOfBytesWritten = pendingBytesWrittenValue; + const bool emitBytesWritten = bytesWrittenPending; + if (emitBytesWritten) { + bytesWrittenPending = false; + pendingBytesWrittenValue = 0; + } + const DWORD dwError = lastError; locker.unlock(); @@ -311,8 +329,24 @@ bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting) if (stopped) return false; - emit bytesWritten(numberOfBytesWritten); - return true; + // Trigger 'ErrorDetected' state only once. This state must be set before + // emitting the bytesWritten() signal. Otherwise, the write sequence will + // be considered not finished, and we may hang if a slot connected + // to bytesWritten() calls waitForBytesWritten(). + if (dwError != ERROR_SUCCESS && completionState == NoError) { + QPointer alive(this); + completionState = ErrorDetected; + if (emitBytesWritten) + emit bytesWritten(numberOfBytesWritten); + if (alive) { + writeBuffer.clear(); + completionState = WriteDisabled; + } + } else if (emitBytesWritten) { + emit bytesWritten(numberOfBytesWritten); + } + + return emitBytesWritten; } QT_END_NAMESPACE diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h index ebd48260ba..81a586dc3d 100644 --- a/src/corelib/io/qwindowspipewriter_p.h +++ b/src/corelib/io/qwindowspipewriter_p.h @@ -73,6 +73,7 @@ public: void stop(); bool checkForWrite() { return consumePendingAndEmit(false); } qint64 bytesToWrite() const; + bool isWriteOperationActive() const; HANDLE syncEvent() const { return syncHandle; } Q_SIGNALS: @@ -82,13 +83,17 @@ protected: bool event(QEvent *e) override; private: + enum CompletionState { NoError, ErrorDetected, WriteDisabled }; + template inline void writeImpl(Args... args); - void startAsyncWriteLocked(QMutexLocker *locker); + void startAsyncWriteHelper(QMutexLocker *locker); + void startAsyncWriteLocked(); static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WAIT wait, TP_WAIT_RESULT waitResult); bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten); + void notifyCompleted(QMutexLocker *locker); bool consumePendingAndEmit(bool allowWinActPosting); HANDLE handle; @@ -100,6 +105,8 @@ private: qint64 pendingBytesWrittenValue; mutable QMutex mutex; DWORD lastError; + + CompletionState completionState; bool stopped; bool writeSequenceStarted; bool bytesWrittenPending; diff --git a/src/network/socket/qlocalsocket_win.cpp b/src/network/socket/qlocalsocket_win.cpp index e11c9e90be..c4da59c278 100644 --- a/src/network/socket/qlocalsocket_win.cpp +++ b/src/network/socket/qlocalsocket_win.cpp @@ -60,7 +60,7 @@ struct QSocketPoller QSocketPoller::QSocketPoller(const QLocalSocketPrivate &socket) { - if (socket.pipeWriter && socket.pipeWriter->bytesToWrite() != 0) { + if (socket.pipeWriter && socket.pipeWriter->isWriteOperationActive()) { handles[handleCount++] = socket.pipeWriter->syncEvent(); writePending = true; } diff --git a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp index f537bd9157..519a7bfacb 100644 --- a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp +++ b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp @@ -132,6 +132,7 @@ private slots: void writeToClientAndDisconnect_data(); void writeToClientAndDisconnect(); + void writeToDisconnected(); void debug(); void bytesWrittenSignal(); @@ -1524,6 +1525,32 @@ void tst_QLocalSocket::writeToClientAndDisconnect() QCOMPARE(client.state(), QLocalSocket::UnconnectedState); } +void tst_QLocalSocket::writeToDisconnected() +{ + QLocalServer server; + QVERIFY(server.listen("writeToDisconnected")); + + QLocalSocket client; + client.connectToServer("writeToDisconnected"); + QVERIFY(client.waitForConnected(3000)); + QVERIFY(server.waitForNewConnection(3000)); + QLocalSocket *serverSocket = server.nextPendingConnection(); + QVERIFY(serverSocket); + serverSocket->abort(); + + QCOMPARE(client.state(), QLocalSocket::ConnectedState); + QVERIFY(client.putChar(0)); + +#ifdef Q_OS_WIN + // Ensure the asynchronous write operation is finished. + QTest::qSleep(250); +#endif + + QCOMPARE(client.bytesToWrite(), qint64(1)); + QVERIFY(!client.waitForBytesWritten()); + QCOMPARE(client.state(), QLocalSocket::UnconnectedState); +} + void tst_QLocalSocket::debug() { // Make sure this compiles