QWindowsPipeWriter: do not clear the buffer in thread pool callback
In a blocking application, receiving the results of write operations must be synchronized with the waitFor...() functions. But, clearing the buffer in another thread can cause the code localsocket.write(...); QVERIFY(localsocket.bytesToWrite() > 0); to fail unexpectedly, if the socket has been disconnected between the calls. So, defer resetting the buffer until checkForWrite() is called. Change-Id: I8c21036aab6a4c56d02c0d9a18d4bbce52d724f4 Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
This commit is contained in:
parent
51e28dc54f
commit
4e695eece7
@ -737,7 +737,7 @@ bool QProcessPrivate::waitForBytesWritten(const QDeadlineTimer &deadline)
|
||||
// start with, in which case we fail immediately. Also, if the input
|
||||
// pipe goes down somewhere in the code below, we avoid waiting for
|
||||
// a full timeout.
|
||||
if (pipeWriterBytesToWrite() == 0)
|
||||
if (!stdinChannel.writer || !stdinChannel.writer->isWriteOperationActive())
|
||||
return false;
|
||||
|
||||
QProcessPoller poller(*this);
|
||||
@ -747,7 +747,6 @@ bool QProcessPrivate::waitForBytesWritten(const QDeadlineTimer &deadline)
|
||||
if (ret == 0)
|
||||
break;
|
||||
|
||||
Q_ASSERT(stdinChannel.writer);
|
||||
if (stdinChannel.writer->checkForWrite())
|
||||
return true;
|
||||
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "qwindowspipewriter_p.h"
|
||||
#include <qcoreapplication.h>
|
||||
#include <QMutexLocker>
|
||||
#include <QPointer>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
@ -52,6 +53,7 @@ QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
|
||||
waitObject(NULL),
|
||||
pendingBytesWrittenValue(0),
|
||||
lastError(ERROR_SUCCESS),
|
||||
completionState(NoError),
|
||||
stopped(true),
|
||||
writeSequenceStarted(false),
|
||||
bytesWrittenPending(false),
|
||||
@ -82,7 +84,7 @@ void QWindowsPipeWriter::setHandle(HANDLE hPipeWriteEnd)
|
||||
|
||||
handle = hPipeWriteEnd;
|
||||
QMutexLocker locker(&mutex);
|
||||
startAsyncWriteLocked(&locker);
|
||||
startAsyncWriteHelper(&locker);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -123,12 +125,21 @@ qint64 QWindowsPipeWriter::bytesToWrite() const
|
||||
return writeBuffer.size() + pendingBytesWrittenValue;
|
||||
}
|
||||
|
||||
/*!
|
||||
Returns \c true if async operation is in progress.
|
||||
*/
|
||||
bool QWindowsPipeWriter::isWriteOperationActive() const
|
||||
{
|
||||
return completionState == NoError && bytesToWrite() != 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
Writes a shallow copy of \a ba to the internal buffer.
|
||||
*/
|
||||
void QWindowsPipeWriter::write(const QByteArray &ba)
|
||||
{
|
||||
writeImpl(ba);
|
||||
if (completionState != WriteDisabled)
|
||||
writeImpl(ba);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -136,7 +147,8 @@ void QWindowsPipeWriter::write(const QByteArray &ba)
|
||||
*/
|
||||
void QWindowsPipeWriter::write(const char *data, qint64 size)
|
||||
{
|
||||
writeImpl(data, size);
|
||||
if (completionState != WriteDisabled)
|
||||
writeImpl(data, size);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
@ -144,12 +156,9 @@ inline void QWindowsPipeWriter::writeImpl(Args... args)
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
|
||||
if (lastError != ERROR_SUCCESS)
|
||||
return;
|
||||
|
||||
writeBuffer.append(args...);
|
||||
|
||||
if (writeSequenceStarted)
|
||||
if (writeSequenceStarted || (lastError != ERROR_SUCCESS))
|
||||
return;
|
||||
|
||||
stopped = false;
|
||||
@ -157,13 +166,24 @@ inline void QWindowsPipeWriter::writeImpl(Args... args)
|
||||
// If we don't have an assigned handle yet, defer writing until
|
||||
// setHandle() is called.
|
||||
if (handle != INVALID_HANDLE_VALUE)
|
||||
startAsyncWriteLocked(&locker);
|
||||
startAsyncWriteHelper(&locker);
|
||||
}
|
||||
|
||||
void QWindowsPipeWriter::startAsyncWriteHelper(QMutexLocker<QMutex> *locker)
|
||||
{
|
||||
startAsyncWriteLocked();
|
||||
|
||||
// Do not post the event, if the write operation will be completed asynchronously.
|
||||
if (!bytesWrittenPending && lastError == ERROR_SUCCESS)
|
||||
return;
|
||||
|
||||
notifyCompleted(locker);
|
||||
}
|
||||
|
||||
/*!
|
||||
Starts a new write sequence.
|
||||
*/
|
||||
void QWindowsPipeWriter::startAsyncWriteLocked(QMutexLocker<QMutex> *locker)
|
||||
void QWindowsPipeWriter::startAsyncWriteLocked()
|
||||
{
|
||||
while (!writeBuffer.isEmpty()) {
|
||||
// WriteFile() returns true, if the write operation completes synchronously.
|
||||
@ -185,22 +205,6 @@ void QWindowsPipeWriter::startAsyncWriteLocked(QMutexLocker<QMutex> *locker)
|
||||
if (!writeCompleted(errorCode, numberOfBytesWritten))
|
||||
break;
|
||||
}
|
||||
|
||||
// Do not post the event, if the write operation will be completed asynchronously.
|
||||
if (!bytesWrittenPending)
|
||||
return;
|
||||
|
||||
if (!winEventActPosted) {
|
||||
winEventActPosted = true;
|
||||
locker->unlock();
|
||||
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
|
||||
} else {
|
||||
locker->unlock();
|
||||
}
|
||||
|
||||
// We set the event only after unlocking to avoid additional context
|
||||
// switches due to the released thread immediately running into the lock.
|
||||
SetEvent(syncHandle);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -232,16 +236,12 @@ void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID cont
|
||||
|
||||
pipeWriter->writeSequenceStarted = false;
|
||||
|
||||
if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered)) {
|
||||
pipeWriter->startAsyncWriteLocked(&locker);
|
||||
} else {
|
||||
// The write operation failed, so we must unblock the main thread,
|
||||
// which can wait for the event. We set the event only after unlocking
|
||||
// to avoid additional context switches due to the released thread
|
||||
// immediately running into the lock.
|
||||
locker.unlock();
|
||||
SetEvent(pipeWriter->syncHandle);
|
||||
}
|
||||
if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
|
||||
pipeWriter->startAsyncWriteLocked();
|
||||
|
||||
// We post the notification even if the write operation failed,
|
||||
// to unblock the main thread, in case it is waiting for the event.
|
||||
pipeWriter->notifyCompleted(&locker);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -258,7 +258,6 @@ bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWrit
|
||||
}
|
||||
|
||||
lastError = errorCode;
|
||||
writeBuffer.clear();
|
||||
switch (errorCode) {
|
||||
case ERROR_PIPE_NOT_CONNECTED: // the other end has closed the pipe
|
||||
case ERROR_OPERATION_ABORTED: // the operation was canceled
|
||||
@ -268,9 +267,29 @@ bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWrit
|
||||
qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
|
||||
break;
|
||||
}
|
||||
// The buffer is not cleared here, because the write progress
|
||||
// should appear on the main thread synchronously.
|
||||
return false;
|
||||
}
|
||||
|
||||
/*!
|
||||
Posts a notification event to the main thread.
|
||||
*/
|
||||
void QWindowsPipeWriter::notifyCompleted(QMutexLocker<QMutex> *locker)
|
||||
{
|
||||
if (!winEventActPosted) {
|
||||
winEventActPosted = true;
|
||||
locker->unlock();
|
||||
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
|
||||
} else {
|
||||
locker->unlock();
|
||||
}
|
||||
|
||||
// We set the event only after unlocking to avoid additional context
|
||||
// switches due to the released thread immediately running into the lock.
|
||||
SetEvent(syncHandle);
|
||||
}
|
||||
|
||||
/*!
|
||||
Receives notification that the write operation has completed.
|
||||
*/
|
||||
@ -296,14 +315,13 @@ bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
|
||||
if (allowWinActPosting)
|
||||
winEventActPosted = false;
|
||||
|
||||
if (!bytesWrittenPending)
|
||||
return false;
|
||||
|
||||
// Reset the state even if we don't emit bytesWritten().
|
||||
// It's a defined behavior to not re-emit this signal recursively.
|
||||
bytesWrittenPending = false;
|
||||
qint64 numberOfBytesWritten = pendingBytesWrittenValue;
|
||||
pendingBytesWrittenValue = 0;
|
||||
const qint64 numberOfBytesWritten = pendingBytesWrittenValue;
|
||||
const bool emitBytesWritten = bytesWrittenPending;
|
||||
if (emitBytesWritten) {
|
||||
bytesWrittenPending = false;
|
||||
pendingBytesWrittenValue = 0;
|
||||
}
|
||||
const DWORD dwError = lastError;
|
||||
|
||||
locker.unlock();
|
||||
|
||||
@ -311,8 +329,24 @@ bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
|
||||
if (stopped)
|
||||
return false;
|
||||
|
||||
emit bytesWritten(numberOfBytesWritten);
|
||||
return true;
|
||||
// Trigger 'ErrorDetected' state only once. This state must be set before
|
||||
// emitting the bytesWritten() signal. Otherwise, the write sequence will
|
||||
// be considered not finished, and we may hang if a slot connected
|
||||
// to bytesWritten() calls waitForBytesWritten().
|
||||
if (dwError != ERROR_SUCCESS && completionState == NoError) {
|
||||
QPointer<QWindowsPipeWriter> alive(this);
|
||||
completionState = ErrorDetected;
|
||||
if (emitBytesWritten)
|
||||
emit bytesWritten(numberOfBytesWritten);
|
||||
if (alive) {
|
||||
writeBuffer.clear();
|
||||
completionState = WriteDisabled;
|
||||
}
|
||||
} else if (emitBytesWritten) {
|
||||
emit bytesWritten(numberOfBytesWritten);
|
||||
}
|
||||
|
||||
return emitBytesWritten;
|
||||
}
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
@ -73,6 +73,7 @@ public:
|
||||
void stop();
|
||||
bool checkForWrite() { return consumePendingAndEmit(false); }
|
||||
qint64 bytesToWrite() const;
|
||||
bool isWriteOperationActive() const;
|
||||
HANDLE syncEvent() const { return syncHandle; }
|
||||
|
||||
Q_SIGNALS:
|
||||
@ -82,13 +83,17 @@ protected:
|
||||
bool event(QEvent *e) override;
|
||||
|
||||
private:
|
||||
enum CompletionState { NoError, ErrorDetected, WriteDisabled };
|
||||
|
||||
template <typename... Args>
|
||||
inline void writeImpl(Args... args);
|
||||
|
||||
void startAsyncWriteLocked(QMutexLocker<QMutex> *locker);
|
||||
void startAsyncWriteHelper(QMutexLocker<QMutex> *locker);
|
||||
void startAsyncWriteLocked();
|
||||
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
|
||||
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
|
||||
bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
|
||||
void notifyCompleted(QMutexLocker<QMutex> *locker);
|
||||
bool consumePendingAndEmit(bool allowWinActPosting);
|
||||
|
||||
HANDLE handle;
|
||||
@ -100,6 +105,8 @@ private:
|
||||
qint64 pendingBytesWrittenValue;
|
||||
mutable QMutex mutex;
|
||||
DWORD lastError;
|
||||
|
||||
CompletionState completionState;
|
||||
bool stopped;
|
||||
bool writeSequenceStarted;
|
||||
bool bytesWrittenPending;
|
||||
|
@ -60,7 +60,7 @@ struct QSocketPoller
|
||||
|
||||
QSocketPoller::QSocketPoller(const QLocalSocketPrivate &socket)
|
||||
{
|
||||
if (socket.pipeWriter && socket.pipeWriter->bytesToWrite() != 0) {
|
||||
if (socket.pipeWriter && socket.pipeWriter->isWriteOperationActive()) {
|
||||
handles[handleCount++] = socket.pipeWriter->syncEvent();
|
||||
writePending = true;
|
||||
}
|
||||
|
@ -132,6 +132,7 @@ private slots:
|
||||
|
||||
void writeToClientAndDisconnect_data();
|
||||
void writeToClientAndDisconnect();
|
||||
void writeToDisconnected();
|
||||
|
||||
void debug();
|
||||
void bytesWrittenSignal();
|
||||
@ -1524,6 +1525,32 @@ void tst_QLocalSocket::writeToClientAndDisconnect()
|
||||
QCOMPARE(client.state(), QLocalSocket::UnconnectedState);
|
||||
}
|
||||
|
||||
void tst_QLocalSocket::writeToDisconnected()
|
||||
{
|
||||
QLocalServer server;
|
||||
QVERIFY(server.listen("writeToDisconnected"));
|
||||
|
||||
QLocalSocket client;
|
||||
client.connectToServer("writeToDisconnected");
|
||||
QVERIFY(client.waitForConnected(3000));
|
||||
QVERIFY(server.waitForNewConnection(3000));
|
||||
QLocalSocket *serverSocket = server.nextPendingConnection();
|
||||
QVERIFY(serverSocket);
|
||||
serverSocket->abort();
|
||||
|
||||
QCOMPARE(client.state(), QLocalSocket::ConnectedState);
|
||||
QVERIFY(client.putChar(0));
|
||||
|
||||
#ifdef Q_OS_WIN
|
||||
// Ensure the asynchronous write operation is finished.
|
||||
QTest::qSleep(250);
|
||||
#endif
|
||||
|
||||
QCOMPARE(client.bytesToWrite(), qint64(1));
|
||||
QVERIFY(!client.waitForBytesWritten());
|
||||
QCOMPARE(client.state(), QLocalSocket::UnconnectedState);
|
||||
}
|
||||
|
||||
void tst_QLocalSocket::debug()
|
||||
{
|
||||
// Make sure this compiles
|
||||
|
Loading…
Reference in New Issue
Block a user