Make QWindowsPipeWriter thread-free.

Re-work QWindowsPipeWriter to not use a thread anymore but the
WriteFileEx API, similar to QWindowsPipeReader. This saves us a lot of
thread synchronization code and enables us to directly write data
without yet another buffering layer.
  Also, this fixes the dreaded deadlocks in the QWindowsPipeWriter
destructor that could occur when the reading end was closed before
the write was finished.

Task-number: QTBUG-23378
Task-number: QTBUG-38185
Change-Id: If0ae96dcd756f716ddf6fa38016080095bf3bd4e
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
This commit is contained in:
Joerg Bornemann 2016-03-07 15:57:45 +01:00
parent 5c89e2eeee
commit 0307c008bf
5 changed files with 185 additions and 154 deletions

View File

@ -666,10 +666,7 @@ bool QProcessPrivate::waitForBytesWritten(int msecs)
QIncrementalSleepTimer timer(msecs);
forever {
// Check if we have any data pending: the pipe writer has
// bytes waiting to written, or it has written data since the
// last time we called stdinChannel.writer->waitForWrite().
bool pendingDataInPipe = stdinChannel.writer && (stdinChannel.writer->bytesToWrite() || stdinChannel.writer->hadWritten());
bool pendingDataInPipe = stdinChannel.writer && stdinChannel.writer->bytesToWrite();
// If we don't have pending data, and our write buffer is
// empty, we fail.
@ -797,7 +794,6 @@ qint64 QProcessPrivate::writeToStdin(const char *data, qint64 maxlen)
stdinChannel.writer = new QWindowsPipeWriter(stdinChannel.pipe[1], q);
QObjectPrivate::connect(stdinChannel.writer, &QWindowsPipeWriter::canWrite,
this, &QProcessPrivate::_q_canWrite);
stdinChannel.writer->start();
}
return stdinChannel.writer->write(data, maxlen);

View File

@ -65,7 +65,7 @@ QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
}
static bool qt_cancelIo(HANDLE handle, OVERLAPPED *overlapped)
bool qt_cancelIo(HANDLE handle, OVERLAPPED *overlapped)
{
typedef BOOL (WINAPI *PtrCancelIoEx)(HANDLE, LPOVERLAPPED);
static PtrCancelIoEx ptrCancelIoEx = 0;

View File

@ -32,141 +32,177 @@
****************************************************************************/
#include "qwindowspipewriter_p.h"
#include "qiodevice_p.h"
QT_BEGIN_NAMESPACE
#ifndef QT_NO_THREAD
extern bool qt_cancelIo(HANDLE handle, OVERLAPPED *overlapped); // from qwindowspipereader.cpp
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipe, QObject * parent)
: QThread(parent),
writePipe(pipe),
quitNow(false),
hasWritten(false)
QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
: pipeWriter(pipeWriter)
{
}
void QWindowsPipeWriter::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
overlapped(this),
numberOfBytesToWrite(0),
pendingBytesWrittenValue(0),
stopped(true),
writeSequenceStarted(false),
notifiedCalled(false),
bytesWrittenPending(false),
inBytesWritten(false)
{
connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
lock.lock();
quitNow = true;
waitCondition.wakeOne();
lock.unlock();
if (!wait(30000))
terminate();
stop();
}
bool QWindowsPipeWriter::waitForWrite(int msecs)
{
QMutexLocker locker(&lock);
bool hadWritten = hasWritten;
hasWritten = false;
if (hadWritten)
return true;
if (!waitCondition.wait(&lock, msecs))
if (!writeSequenceStarted)
return false;
hadWritten = hasWritten;
hasWritten = false;
return hadWritten;
if (bytesWrittenPending) {
if (!inBytesWritten)
emitPendingBytesWrittenValue();
return true;
}
if (!waitForNotification(msecs))
return false;
if (bytesWrittenPending) {
if (!inBytesWritten)
emitPendingBytesWrittenValue();
return true;
}
return false;
}
qint64 QWindowsPipeWriter::write(const char *ptr, qint64 maxlen)
qint64 QWindowsPipeWriter::bytesToWrite() const
{
if (!isRunning())
return -1;
QMutexLocker locker(&lock);
data.append(ptr, maxlen);
waitCondition.wakeOne();
return maxlen;
return numberOfBytesToWrite;
}
class QPipeWriterOverlapped
void QWindowsPipeWriter::emitPendingBytesWrittenValue()
{
public:
QPipeWriterOverlapped()
{
overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
}
if (bytesWrittenPending) {
bytesWrittenPending = false;
const qint64 bytes = pendingBytesWrittenValue;
pendingBytesWrittenValue = 0;
~QPipeWriterOverlapped()
{
CloseHandle(overlapped.hEvent);
}
void prepare()
{
const HANDLE hEvent = overlapped.hEvent;
ZeroMemory(&overlapped, sizeof overlapped);
overlapped.hEvent = hEvent;
}
OVERLAPPED *operator&()
{
return &overlapped;
}
private:
OVERLAPPED overlapped;
};
void QWindowsPipeWriter::run()
{
QPipeWriterOverlapped overl;
forever {
lock.lock();
while(data.isEmpty() && (!quitNow)) {
waitCondition.wakeOne();
waitCondition.wait(&lock);
}
if (quitNow) {
lock.unlock();
quitNow = false;
break;
}
QByteArray copy = data;
lock.unlock();
const char *ptrData = copy.data();
qint64 maxlen = copy.size();
qint64 totalWritten = 0;
overl.prepare();
while ((!quitNow) && totalWritten < maxlen) {
DWORD written = 0;
if (!WriteFile(writePipe, ptrData + totalWritten,
maxlen - totalWritten, &written, &overl)) {
const DWORD writeError = GetLastError();
if (writeError == 0xE8/*NT_STATUS_INVALID_USER_BUFFER*/) {
// give the os a rest
msleep(100);
continue;
}
if (writeError != ERROR_IO_PENDING) {
qErrnoWarning(writeError, "QWindowsPipeWriter: async WriteFile failed.");
return;
}
if (!GetOverlappedResult(writePipe, &overl, &written, TRUE)) {
qErrnoWarning(GetLastError(), "QWindowsPipeWriter: GetOverlappedResult failed.");
return;
}
}
totalWritten += written;
#if defined QPIPEWRITER_DEBUG
qDebug("QWindowsPipeWriter::run() wrote %d %d/%d bytes",
written, int(totalWritten), int(maxlen));
#endif
lock.lock();
data.remove(0, written);
hasWritten = true;
lock.unlock();
}
emit bytesWritten(totalWritten);
inBytesWritten = true;
emit bytesWritten(bytes);
inBytesWritten = false;
emit canWrite();
}
}
#endif //QT_NO_THREAD
void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
}
/*!
\internal
Will be called whenever the write operation completes.
*/
void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
{
notifiedCalled = true;
writeSequenceStarted = false;
numberOfBytesToWrite = 0;
switch (errorCode) {
case ERROR_SUCCESS:
break;
case ERROR_OPERATION_ABORTED:
if (stopped)
break;
// fall through
default:
qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
break;
}
// After the writer was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new write sequence should
// be started in this case.
if (stopped)
return;
pendingBytesWrittenValue += qint64(numberOfBytesWritten);
if (!bytesWrittenPending) {
bytesWrittenPending = true;
emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
}
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
return true;
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
if (!msecs)
break;
}
return notifiedCalled;
}
qint64 QWindowsPipeWriter::write(const char *ptr, qint64 maxlen)
{
if (writeSequenceStarted)
return 0;
overlapped.clear();
numberOfBytesToWrite = maxlen;
stopped = false;
writeSequenceStarted = true;
if (!WriteFileEx(handle, ptr, maxlen, &overlapped, &writeFileCompleted)) {
writeSequenceStarted = false;
qErrnoWarning("QWindowsPipeWriter::write failed.");
}
return maxlen;
}
void QWindowsPipeWriter::stop()
{
stopped = true;
if (writeSequenceStarted) {
if (!qt_cancelIo(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
qErrnoWarning(dwError, "QWindowsPipeWriter: qt_cancelIo on handle %x failed.",
handle);
}
}
waitForNotification(-1);
}
}
QT_END_NAMESPACE

View File

@ -46,16 +46,11 @@
//
#include <qelapsedtimer.h>
#include <qthread.h>
#include <qmutex.h>
#include <qwaitcondition.h>
#include <qobject.h>
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
#ifndef QT_NO_THREAD
#define SLEEPMIN 10
#define SLEEPMAX 500
@ -104,45 +99,50 @@ private:
int nextSleep;
};
class Q_CORE_EXPORT QWindowsPipeWriter : public QThread
class Q_CORE_EXPORT QWindowsPipeWriter : public QObject
{
Q_OBJECT
public:
explicit QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent = 0);
~QWindowsPipeWriter();
qint64 write(const char *data, qint64 maxlen);
void stop();
bool waitForWrite(int msecs);
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
public:
explicit QWindowsPipeWriter(HANDLE writePipe, QObject * parent = 0);
~QWindowsPipeWriter();
bool waitForWrite(int msecs);
qint64 write(const char *data, qint64 maxlen);
qint64 bytesToWrite() const
{
QMutexLocker locker(&lock);
return data.size();
}
bool hadWritten() const
{
return hasWritten;
}
protected:
void run();
void _q_queueBytesWritten(QPrivateSignal);
private:
QByteArray data;
QWaitCondition waitCondition;
mutable QMutex lock;
HANDLE writePipe;
volatile bool quitNow;
bool hasWritten;
};
static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesWritten);
bool waitForNotification(int timeout);
void emitPendingBytesWrittenValue();
#endif //QT_NO_THREAD
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY(Overlapped)
public:
explicit Overlapped(QWindowsPipeWriter *pipeWriter);
void clear();
QWindowsPipeWriter *pipeWriter;
};
HANDLE handle;
Overlapped overlapped;
qint64 numberOfBytesToWrite;
qint64 pendingBytesWrittenValue;
bool stopped;
bool writeSequenceStarted;
bool notifiedCalled;
bool bytesWrittenPending;
bool inBytesWritten;
};
QT_END_NAMESPACE

View File

@ -214,7 +214,6 @@ qint64 QLocalSocket::writeData(const char *data, qint64 maxSize)
d->pipeWriter = new QWindowsPipeWriter(d->handle, this);
connect(d->pipeWriter, SIGNAL(canWrite()), this, SLOT(_q_canWrite()));
connect(d->pipeWriter, SIGNAL(bytesWritten(qint64)), this, SIGNAL(bytesWritten(qint64)));
d->pipeWriter->start();
}
return d->pipeWriter->write(data, maxSize);
}