Allow QWindowsPipe{Reader|Writer} to work with foreign event loops, take 2

When a foreign event loop that does not enter an alertable wait state
is running (which is also the case when a native dialog window is
modal), pipe handlers would freeze temporarily due to their APC
callbacks not being invoked.

We address this problem by moving the I/O callbacks to the Windows
thread pool, and only posting completion events to the main loop
from there. That makes the actual I/O completely independent from
any main loop, while the signal delivery works also with foreign
loops (because Qt event delivery uses Windows messages, which foreign
loops typically handle correctly).

As a nice side effect, performance (and in particular scalability)
is improved.

Several other approaches have been tried:
1) Using QWinEventNotifier was about a quarter slower and scaled much
   worse. Additionally, it also required a rather egregious hack to
   handle the (pathological) case of a single thread talking to both
   ends of a QLocalSocket synchronously.
2) Queuing APCs from the thread pool to the main thread and also
   posting wake-up events to its event loop, and handling I/O on the
   main thread; this performed roughly like this solution, but scaled
   half as well, and the separate wake-up path was still deemed hacky.
3) Only posting wake-up events to the main thread from the thread pool,
   and still handling I/O on the main thread; this still performed
   comparably to 2), and the pathological case was not handled at all.
4) Using this approach for reads and that of 3) for writes was slightly
   faster with big amounts of data, but scaled slightly worse, and the
   diverging implementations were deemed not desirable.

Fixes: QTBUG-64443
Change-Id: I66443c3021d6ba98639a214c3e768be97d2cf14b
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
This commit is contained in:
Alex Trotsenko 2021-01-23 13:00:22 +02:00
parent d316ae8e83
commit f265c87e01
9 changed files with 895 additions and 404 deletions

View File

@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -38,58 +39,63 @@
****************************************************************************/
#include "qwindowspipereader_p.h"
#include "qiodevice_p.h"
#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
#include <qcoreapplication.h>
#include <QMutexLocker>
QT_BEGIN_NAMESPACE
static const DWORD minReadBufferSize = 4096;
QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
: pipeReader(reader)
{
}
void QWindowsPipeReader::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
overlapped(this),
eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
bytesPending(0),
pendingReadBytes(0),
lastError(ERROR_SUCCESS),
state(Stopped),
readSequenceStarted(false),
notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
winEventActPosted(false),
inReadyRead(false)
{
connect(this, &QWindowsPipeReader::_q_queueReadyRead,
this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
overlapped.hEvent = eventHandle;
waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
if (waitObject == NULL)
qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
// Wait for thread pool callback to complete, as it can be still
// executing some completion code.
WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
CloseThreadpoolWait(waitObject);
CloseHandle(eventHandle);
CloseHandle(syncHandle);
}
/*!
Sets the handle to read from. The handle must be valid.
Do not call this function while the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
bytesPending = 0;
readyReadPending = false;
pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
lastError = ERROR_SUCCESS;
}
/*!
@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
*/
void QWindowsPipeReader::stop()
{
state = Stopped;
cancelAsyncRead();
cancelAsyncRead(Stopped);
}
/*!
@ -108,16 +113,27 @@ void QWindowsPipeReader::stop()
*/
void QWindowsPipeReader::drainAndStop()
{
state = Draining;
cancelAsyncRead();
cancelAsyncRead(Draining);
// Note that signals are not emitted in the call below, as the caller
// is expected to do that synchronously.
consumePending();
}
/*!
Stops the asynchronous read sequence.
*/
void QWindowsPipeReader::cancelAsyncRead()
void QWindowsPipeReader::cancelAsyncRead(State newState)
{
if (state != Running)
return;
QMutexLocker locker(&mutex);
state = newState;
if (readSequenceStarted) {
// This can legitimately fail due to the GetOverlappedResult()
// in the callback not being locked. We ignore ERROR_NOT_FOUND
// in this case.
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@ -125,10 +141,36 @@ void QWindowsPipeReader::cancelAsyncRead()
handle);
}
}
waitForNotification(-1);
// Wait for callback to complete.
do {
locker.unlock();
waitForNotification(QDeadlineTimer(-1));
locker.relock();
} while (readSequenceStarted);
}
}
/*!
Sets the size of internal read buffer.
*/
void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
{
QMutexLocker locker(&mutex);
readBufferMaxSize = size;
}
/*!
Returns \c true if async operation is in progress, there is
pending data to read, or a read error is pending.
*/
bool QWindowsPipeReader::isReadOperationActive() const
{
QMutexLocker locker(&mutex);
return readSequenceStarted || readyReadPending
|| (lastError != ERROR_SUCCESS && !pipeBroken);
}
/*!
Returns the number of bytes we've read so far.
*/
@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
mutex.unlock();
if (!pipeBroken) {
if (state == Running)
@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
/*!
Returns \c true if a complete line of data can be read from the buffer.
*/
bool QWindowsPipeReader::canReadLine() const
{
QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
\internal
Will be called whenever the read operation completes.
*/
void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
{
notifiedCalled = true;
readSequenceStarted = false;
switch (errorCode) {
case ERROR_SUCCESS:
break;
case ERROR_MORE_DATA:
// This is not an error. We're connected to a message mode
// pipe and the message didn't fit into the pipe's system
// buffer. We will read the remaining data in the next call.
break;
case ERROR_BROKEN_PIPE:
case ERROR_PIPE_NOT_CONNECTED:
pipeBroken = true;
break;
case ERROR_OPERATION_ABORTED:
if (state != Running)
break;
Q_FALLTHROUGH();
default:
emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
pipeBroken = true;
break;
}
// After the reader was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new read sequence should
// be started in this case.
if (state == Stopped)
return;
if (pipeBroken) {
emitPipeClosed();
return;
}
actualReadBufferSize += numberOfBytesRead;
readBuffer.truncate(actualReadBufferSize);
// Read all pending data from the pipe's buffer in 'Draining' state.
if (state == Draining) {
// Determine the number of pending bytes on the first iteration.
if (bytesPending == 0)
bytesPending = checkPipeState();
else
bytesPending -= numberOfBytesRead;
if (bytesPending == 0) // all data received
return; // unblock waitForNotification() in cancelAsyncRead()
startAsyncReadHelper(bytesPending);
if (readSequenceStarted)
notifiedCalled = false; // wait for more data
return;
}
startAsyncRead();
if (!readyReadPending) {
readyReadPending = true;
emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
}
}
/*!
\internal
Starts an asynchronous read sequence on the pipe.
*/
void QWindowsPipeReader::startAsyncRead()
{
if (readSequenceStarted)
QMutexLocker locker(&mutex);
if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
state = Running;
startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize));
}
startAsyncReadLocked();
/*!
\internal
Starts a new read sequence.
*/
void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead)
{
Q_ASSERT(bytesToRead != 0);
if (pipeBroken)
// Do not post the event, if the read operation will be completed asynchronously.
if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
bytesToRead = readBufferMaxSize - readBuffer.size();
if (bytesToRead <= 0) {
// Buffer is full. User must read data from the buffer
// before we can read more from the pipe.
if (!winEventActPosted) {
winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
} else {
locker.unlock();
}
SetEvent(syncHandle);
}
/*!
Starts a new read sequence. Thread-safety should be ensured
by the caller.
*/
void QWindowsPipeReader::startAsyncReadLocked()
{
// Determine the number of bytes to read.
qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
// This can happen only while draining; just do nothing in this case.
if (bytesToRead == 0)
return;
while (lastError == ERROR_SUCCESS) {
if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
bytesToRead = readBufferMaxSize - readBuffer.size();
if (bytesToRead <= 0) {
// Buffer is full. User must read data from the buffer
// before we can read more from the pipe.
return;
}
}
char *ptr = readBuffer.reserve(bytesToRead);
// ReadFile() returns true, if the read operation completes synchronously.
// We don't need to call GetOverlappedResult() additionally, because
// 'numberOfBytesRead' is valid in this case.
DWORD numberOfBytesRead;
DWORD errorCode = ERROR_SUCCESS;
if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
errorCode = GetLastError();
if (errorCode == ERROR_IO_PENDING) {
Q_ASSERT(state == Running);
// Operation has been queued and will complete in the future.
readSequenceStarted = true;
SetThreadpoolWait(waitObject, eventHandle, NULL);
return;
}
}
if (!readCompleted(errorCode, numberOfBytesRead))
return;
// In the 'Draining' state, we have to get all the data with one call
// to ReadFile(). Note that message mode pipes are not supported here.
if (state == Draining) {
Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
return;
}
}
char *ptr = readBuffer.reserve(bytesToRead);
readSequenceStarted = true;
overlapped.clear();
if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
readSequenceStarted = false;
const DWORD dwError = GetLastError();
switch (dwError) {
case ERROR_BROKEN_PIPE:
case ERROR_PIPE_NOT_CONNECTED:
// It may happen, that the other side closes the connection directly
// after writing data. Then we must set the appropriate socket state.
pipeBroken = true;
emit pipeClosed();
break;
default:
emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
break;
}
// We need to loop until all pending data has been read and an
// operation is queued for asynchronous completion.
// If the pipe is configured to work in message mode, we read
// the data in chunks.
bytesToRead = qMax(checkPipeState(), minReadBufferSize);
}
}
/*!
\internal
Called when ReadFileEx finished the read operation.
Thread pool callback procedure.
*/
void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
Q_UNUSED(instance);
Q_UNUSED(wait);
Q_UNUSED(waitResult);
QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
// Get the result of the asynchronous operation.
DWORD numberOfBytesTransfered = 0;
DWORD errorCode = ERROR_SUCCESS;
if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
&numberOfBytesTransfered, FALSE))
errorCode = GetLastError();
pipeReader->mutex.lock();
pipeReader->readSequenceStarted = false;
// Do not overwrite error code, if error has been detected by
// checkPipeState() in waitForPipeClosed(). Also, if the reader was
// stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and
// no new read sequence should be started in this case.
if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
// Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
// specifically for flushing the pipe.
if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
errorCode = ERROR_SUCCESS;
if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
pipeReader->startAsyncReadLocked();
if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
pipeReader->winEventActPosted = true;
pipeReader->mutex.unlock();
QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
} else {
pipeReader->mutex.unlock();
}
} else {
pipeReader->mutex.unlock();
}
// We set the event only after unlocking to avoid additional context
// switches due to the released thread immediately running into the lock.
SetEvent(pipeReader->syncHandle);
}
/*!
Will be called whenever the read operation completes. Returns \c true if
no error occurred; otherwise returns \c false.
*/
bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
{
// ERROR_MORE_DATA is not an error. We're connected to a message mode
// pipe and the message didn't fit into the pipe's system
// buffer. We will read the remaining data in the next call.
if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
readyReadPending = true;
pendingReadBytes += numberOfBytesRead;
readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
return true;
}
lastError = errorCode;
return false;
}
/*!
Receives notification that the read operation has completed.
*/
bool QWindowsPipeReader::event(QEvent *e)
{
if (e->type() == QEvent::WinEventAct) {
consumePendingAndEmit(true);
return true;
}
return QObject::event(e);
}
/*!
Updates the read buffer size and emits pending signals in the main thread.
Returns \c true, if readyRead() was emitted.
*/
bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
{
mutex.lock();
// Enable QEvent::WinEventAct posting.
if (allowWinActPosting)
winEventActPosted = false;
const bool emitReadyRead = consumePending();
const DWORD dwError = lastError;
mutex.unlock();
// Disable any further processing, if the pipe was stopped.
// We are not allowed to emit signals in either 'Stopped'
// or 'Draining' state.
if (state != Running)
return false;
if (emitReadyRead && !inReadyRead) {
QScopedValueRollback<bool> guard(inReadyRead, true);
emit readyRead();
}
// Trigger 'pipeBroken' only once.
if (dwError != ERROR_SUCCESS && !pipeBroken) {
pipeBroken = true;
if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit"));
emit pipeClosed();
}
return emitReadyRead;
}
/*!
Updates the read buffer size. Returns \c true, if readyRead()
should be emitted. Thread-safety should be ensured by the caller.
*/
bool QWindowsPipeReader::consumePending()
{
if (readyReadPending) {
readyReadPending = false;
actualReadBufferSize += pendingReadBytes;
pendingReadBytes = 0;
return true;
}
return false;
}
/*!
\internal
Returns the number of available bytes in the pipe.
Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
if (!pipeBroken) {
pipeBroken = true;
emitPipeClosed();
}
lastError = GetLastError();
return 0;
}
bool QWindowsPipeReader::waitForNotification(int timeout)
bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
do {
DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
if (waitRet == WAIT_OBJECT_0)
return true;
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
if (!msecs)
break;
}
return notifiedCalled;
}
if (waitRet != WAIT_IO_COMPLETION)
return false;
void QWindowsPipeReader::emitPendingReadyRead()
{
if (readyReadPending) {
readyReadPending = false;
QScopedValueRollback<bool> guard(inReadyRead, true);
emit readyRead();
}
}
// Some I/O completion routine was called. Wait some more.
} while (!deadline.hasExpired());
void QWindowsPipeReader::emitPipeClosed()
{
// We are not allowed to emit signals in either 'Stopped'
// or 'Draining' state.
if (state == Running)
emit pipeClosed();
return false;
}
/*!
@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
if (readyReadPending) {
if (!inReadyRead)
emitPendingReadyRead();
return true;
}
QDeadlineTimer timer(msecs);
if (!readSequenceStarted)
return false;
if (!waitForNotification(msecs))
return false;
if (readyReadPending) {
if (!inReadyRead)
emitPendingReadyRead();
return true;
// Make sure that 'syncHandle' was triggered by the thread pool callback.
while (isReadOperationActive() && waitForNotification(timer)) {
if (consumePendingAndEmit(false))
return true;
}
return false;
@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs)
bool QWindowsPipeReader::waitForPipeClosed(int msecs)
{
const int sleepTime = 10;
QElapsedTimer stopWatch;
stopWatch.start();
QDeadlineTimer timer(msecs);
while (waitForReadyRead(timer.remainingTime())) {}
if (pipeBroken)
return true;
if (timer.hasExpired())
return false;
// When the read buffer is full, the read sequence is not running,
// so we need to peek the pipe to detect disconnection.
forever {
waitForReadyRead(0);
checkPipeState();
consumePendingAndEmit(false);
if (pipeBroken)
return true;
if (stopWatch.hasExpired(msecs - sleepTime))
if (timer.hasExpired())
return false;
Sleep(sleepTime);
}
}

View File

@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -52,6 +53,8 @@
//
#include <qobject.h>
#include <qdeadlinetimer.h>
#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
@ -70,7 +73,7 @@ public:
void stop();
void drainAndStop();
void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
void setMaxReadBufferSize(qint64 size);
qint64 maxReadBufferSize() const { return readBufferMaxSize; }
bool isPipeClosed() const { return pipeBroken; }
@ -80,46 +83,46 @@ public:
bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs);
bool isReadOperationActive() const { return readSequenceStarted; }
bool isReadOperationActive() const;
Q_SIGNALS:
void winError(ulong, const QString &);
void readyRead();
void pipeClosed();
void _q_queueReadyRead(QPrivateSignal);
protected:
bool event(QEvent *e) override;
private:
void startAsyncReadHelper(qint64 bytesToRead);
void cancelAsyncRead();
static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
bool waitForNotification(int timeout);
void emitPendingReadyRead();
void emitPipeClosed();
enum State { Stopped, Running, Draining };
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY_MOVE(Overlapped)
public:
explicit Overlapped(QWindowsPipeReader *reader);
void clear();
QWindowsPipeReader *pipeReader;
};
void startAsyncReadLocked();
void cancelAsyncRead(State newState);
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
bool waitForNotification(const QDeadlineTimer &deadline);
bool consumePendingAndEmit(bool allowWinActPosting);
bool consumePending();
HANDLE handle;
Overlapped overlapped;
HANDLE eventHandle;
HANDLE syncHandle;
PTP_WAIT waitObject;
OVERLAPPED overlapped;
qint64 readBufferMaxSize;
QRingBuffer readBuffer;
qint64 actualReadBufferSize;
qint64 bytesPending;
qint64 pendingReadBytes;
mutable QMutex mutex;
DWORD lastError;
enum State { Stopped, Running, Draining } state;
State state;
bool readSequenceStarted;
bool notifiedCalled;
bool pipeBroken;
bool readyReadPending;
bool winEventActPosted;
bool inReadyRead;
};

View File

@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -38,180 +39,56 @@
****************************************************************************/
#include "qwindowspipewriter_p.h"
#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
#include <qcoreapplication.h>
#include <QMutexLocker>
QT_BEGIN_NAMESPACE
QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
: pipeWriter(pipeWriter)
{
}
void QWindowsPipeWriter::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
overlapped(this),
eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
waitObject(NULL),
pendingBytesWrittenValue(0),
lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
notifiedCalled(false),
bytesWrittenPending(false),
winEventActPosted(false),
inBytesWritten(false)
{
connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
overlapped.hEvent = eventHandle;
waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
if (waitObject == NULL)
qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
}
bool QWindowsPipeWriter::waitForWrite(int msecs)
{
if (bytesWrittenPending) {
emitPendingBytesWrittenValue();
return true;
}
if (!writeSequenceStarted)
return false;
if (!waitForNotification(msecs))
return false;
if (bytesWrittenPending) {
emitPendingBytesWrittenValue();
return true;
}
return false;
}
qint64 QWindowsPipeWriter::bytesToWrite() const
{
return buffer.size() + pendingBytesWrittenValue;
}
void QWindowsPipeWriter::emitPendingBytesWrittenValue()
{
if (bytesWrittenPending) {
// Reset the state even if we don't emit bytesWritten().
// It's a defined behavior to not re-emit this signal recursively.
bytesWrittenPending = false;
const qint64 bytes = pendingBytesWrittenValue;
pendingBytesWrittenValue = 0;
emit canWrite();
if (!inBytesWritten) {
QScopedValueRollback<bool> guard(inBytesWritten, true);
emit bytesWritten(bytes);
}
}
}
void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
CloseThreadpoolWait(waitObject);
CloseHandle(eventHandle);
CloseHandle(syncHandle);
}
/*!
\internal
Will be called whenever the write operation completes.
Stops the asynchronous write sequence.
If the write sequence is running then the I/O operation is canceled.
*/
void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
void QWindowsPipeWriter::stop()
{
notifiedCalled = true;
writeSequenceStarted = false;
Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
buffer.clear();
switch (errorCode) {
case ERROR_SUCCESS:
break;
case ERROR_OPERATION_ABORTED:
if (stopped)
break;
Q_FALLTHROUGH();
default:
qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
break;
}
// After the writer was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new write sequence should
// be started in this case.
if (stopped)
return;
pendingBytesWrittenValue += qint64(numberOfBytesWritten);
if (!bytesWrittenPending) {
bytesWrittenPending = true;
emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
}
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
return true;
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
if (!msecs)
break;
}
return notifiedCalled;
}
bool QWindowsPipeWriter::write(const QByteArray &ba)
{
if (writeSequenceStarted)
return false;
overlapped.clear();
buffer = ba;
stopped = false;
writeSequenceStarted = true;
if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
&overlapped, &writeFileCompleted)) {
writeSequenceStarted = false;
buffer.clear();
const DWORD errorCode = GetLastError();
switch (errorCode) {
case ERROR_NO_DATA: // "The pipe is being closed."
// The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
break;
default:
qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
}
return false;
}
return true;
}
void QWindowsPipeWriter::stop()
{
mutex.lock();
stopped = true;
bytesWrittenPending = false;
pendingBytesWrittenValue = 0;
if (writeSequenceStarted) {
// Trying to disable callback before canceling the operation.
// Callback invocation is unnecessary here.
SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@ -219,8 +96,239 @@ void QWindowsPipeWriter::stop()
handle);
}
}
waitForNotification(-1);
writeSequenceStarted = false;
}
mutex.unlock();
WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
/*!
Returns \c true if async operation is in progress or a bytesWritten
signal is pending.
*/
bool QWindowsPipeWriter::isWriteOperationActive() const
{
QMutexLocker locker(&mutex);
return writeSequenceStarted || bytesWrittenPending;
}
/*!
Returns the number of bytes that are waiting to be written.
*/
qint64 QWindowsPipeWriter::bytesToWrite() const
{
QMutexLocker locker(&mutex);
return writeBuffer.size() + pendingBytesWrittenValue;
}
/*!
Writes data to the pipe.
*/
bool QWindowsPipeWriter::write(const QByteArray &ba)
{
QMutexLocker locker(&mutex);
if (lastError != ERROR_SUCCESS)
return false;
writeBuffer.append(ba);
if (writeSequenceStarted)
return true;
stopped = false;
startAsyncWriteLocked();
// Do not post the event, if the write operation will be completed asynchronously.
if (!bytesWrittenPending)
return true;
if (!winEventActPosted) {
winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
} else {
locker.unlock();
}
SetEvent(syncHandle);
return true;
}
/*!
Starts a new write sequence. Thread-safety should be ensured by the caller.
*/
void QWindowsPipeWriter::startAsyncWriteLocked()
{
while (!writeBuffer.isEmpty()) {
// WriteFile() returns true, if the write operation completes synchronously.
// We don't need to call GetOverlappedResult() additionally, because
// 'numberOfBytesWritten' is valid in this case.
DWORD numberOfBytesWritten;
DWORD errorCode = ERROR_SUCCESS;
if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
&numberOfBytesWritten, &overlapped)) {
errorCode = GetLastError();
if (errorCode == ERROR_IO_PENDING) {
// Operation has been queued and will complete in the future.
writeSequenceStarted = true;
SetThreadpoolWait(waitObject, eventHandle, NULL);
return;
}
}
if (!writeCompleted(errorCode, numberOfBytesWritten))
return;
}
}
/*!
Thread pool callback procedure.
*/
void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
Q_UNUSED(instance);
Q_UNUSED(wait);
Q_UNUSED(waitResult);
QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
// Get the result of the asynchronous operation.
DWORD numberOfBytesTransfered = 0;
DWORD errorCode = ERROR_SUCCESS;
if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
&numberOfBytesTransfered, FALSE))
errorCode = GetLastError();
QMutexLocker locker(&pipeWriter->mutex);
// After the writer was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new write sequence
// should be started in this case.
if (pipeWriter->stopped)
return;
pipeWriter->writeSequenceStarted = false;
if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
pipeWriter->startAsyncWriteLocked();
if (pipeWriter->lastError == ERROR_SUCCESS && !pipeWriter->winEventActPosted) {
pipeWriter->winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
} else {
locker.unlock();
}
// We set the event only after unlocking to avoid additional context
// switches due to the released thread immediately running into the lock.
SetEvent(pipeWriter->syncHandle);
}
/*!
Will be called whenever the write operation completes. Returns \c true if
no error occurred; otherwise returns \c false.
*/
bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
if (errorCode == ERROR_SUCCESS) {
Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
bytesWrittenPending = true;
pendingBytesWrittenValue += numberOfBytesWritten;
writeBuffer.free(numberOfBytesWritten);
return true;
}
lastError = errorCode;
writeBuffer.clear();
// The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
return false;
}
/*!
Receives notification that the write operation has completed.
*/
bool QWindowsPipeWriter::event(QEvent *e)
{
if (e->type() == QEvent::WinEventAct) {
consumePendingAndEmit(true);
return true;
}
return QObject::event(e);
}
/*!
Updates the state and emits pending signals in the main thread.
Returns \c true, if bytesWritten() was emitted.
*/
bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
{
QMutexLocker locker(&mutex);
// Enable QEvent::WinEventAct posting.
if (allowWinActPosting)
winEventActPosted = false;
if (!bytesWrittenPending)
return false;
// Reset the state even if we don't emit bytesWritten().
// It's a defined behavior to not re-emit this signal recursively.
bytesWrittenPending = false;
qint64 numberOfBytesWritten = pendingBytesWrittenValue;
pendingBytesWrittenValue = 0;
locker.unlock();
// Disable any further processing, if the pipe was stopped.
if (stopped)
return false;
emit canWrite();
if (!inBytesWritten) {
QScopedValueRollback<bool> guard(inBytesWritten, true);
emit bytesWritten(numberOfBytesWritten);
}
return true;
}
bool QWindowsPipeWriter::waitForNotification(const QDeadlineTimer &deadline)
{
do {
DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
if (waitRet == WAIT_OBJECT_0)
return true;
if (waitRet != WAIT_IO_COMPLETION)
return false;
// Some I/O completion routine was called. Wait some more.
} while (!deadline.hasExpired());
return false;
}
/*!
Waits for the completion of the asynchronous write operation.
Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
or bytesWritten will be emitted by the event loop (recursive case).
*/
bool QWindowsPipeWriter::waitForWrite(int msecs)
{
QDeadlineTimer timer(msecs);
// Make sure that 'syncHandle' was triggered by the thread pool callback.
while (isWriteOperationActive() && waitForNotification(timer)) {
if (consumePendingAndEmit(false))
return true;
}
return false;
}
QT_END_NAMESPACE

View File

@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -54,7 +55,10 @@
#include <QtCore/private/qglobal_p.h>
#include <qelapsedtimer.h>
#include <qobject.h>
#include <qbytearray.h>
#include <qdeadlinetimer.h>
#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
@ -117,39 +121,37 @@ public:
bool write(const QByteArray &ba);
void stop();
bool waitForWrite(int msecs);
bool isWriteOperationActive() const { return writeSequenceStarted; }
bool isWriteOperationActive() const;
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
void _q_queueBytesWritten(QPrivateSignal);
protected:
bool event(QEvent *e) override;
private:
static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesWritten);
bool waitForNotification(int timeout);
void emitPendingBytesWrittenValue();
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY_MOVE(Overlapped)
public:
explicit Overlapped(QWindowsPipeWriter *pipeWriter);
void clear();
QWindowsPipeWriter *pipeWriter;
};
void startAsyncWriteLocked();
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
bool waitForNotification(const QDeadlineTimer &deadline);
bool consumePendingAndEmit(bool allowWinActPosting);
HANDLE handle;
Overlapped overlapped;
QByteArray buffer;
HANDLE eventHandle;
HANDLE syncHandle;
PTP_WAIT waitObject;
OVERLAPPED overlapped;
QRingBuffer writeBuffer;
qint64 pendingBytesWrittenValue;
mutable QMutex mutex;
DWORD lastError;
bool stopped;
bool writeSequenceStarted;
bool notifiedCalled;
bool bytesWrittenPending;
bool winEventActPosted;
bool inBytesWritten;
};

View File

@ -29,12 +29,15 @@
#include <QTest>
#include <QEvent>
#include <QtTest/QSignalSpy>
#include <QtCore/qthread.h>
#include <QtGui/qguiapplication.h>
#include <QtGui/qpainter.h>
#include <QtGui/qrasterwindow.h>
#include <QtNetwork/qtcpserver.h>
#include <QtNetwork/qtcpsocket.h>
#include <QtNetwork/qlocalserver.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtCore/qelapsedtimer.h>
#include <QtCore/qtimer.h>
#include <QtCore/qwineventnotifier.h>
@ -51,6 +54,7 @@ class tst_NoQtEventLoop : public QObject
private slots:
void consumeMouseEvents();
void consumeSocketEvents();
void consumeLocalSocketEvents();
void consumeWinEvents_data();
void consumeWinEvents();
void deliverEventsInLivelock();
@ -318,6 +322,44 @@ void tst_NoQtEventLoop::consumeSocketEvents()
QVERIFY(server.hasPendingConnections());
}
void tst_NoQtEventLoop::consumeLocalSocketEvents()
{
int argc = 1;
char *argv[] = { const_cast<char *>("test"), 0 };
QGuiApplication app(argc, argv);
QLocalServer server;
QLocalSocket client;
QSignalSpy readyReadSpy(&client, &QIODevice::readyRead);
QVERIFY(server.listen("consumeLocalSocketEvents"));
client.connectToServer("consumeLocalSocketEvents");
QVERIFY(client.waitForConnected(200));
QVERIFY(server.waitForNewConnection(200));
QLocalSocket *clientSocket = server.nextPendingConnection();
QVERIFY(clientSocket);
QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten);
server.close();
bool timeExpired = false;
QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() {
timeExpired = true;
});
QVERIFY(clientSocket->putChar(0));
// Exec own message loop
MSG msg;
while (::GetMessage(&msg, NULL, 0, 0)) {
::TranslateMessage(&msg);
::DispatchMessage(&msg);
if (timeExpired || readyReadSpy.count() != 0)
break;
}
QVERIFY(!timeExpired);
QCOMPARE(bytesWrittenSpy.count(), 1);
QCOMPARE(readyReadSpy.count(), 1);
}
void tst_NoQtEventLoop::consumeWinEvents_data()
{
QTest::addColumn<bool>("peeking");

View File

@ -639,26 +639,6 @@ void tst_QLocalSocket::readBufferOverflow()
QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));
// no more bytes available
QCOMPARE(client.bytesAvailable(), 0);
#ifdef Q_OS_WIN
serverSocket->write(buffer, readBufferSize);
QVERIFY(serverSocket->waitForBytesWritten());
// ensure the read completion routine is called
SleepEx(100, true);
QVERIFY(client.waitForReadyRead());
QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));
// Test overflow caused by an asynchronous pipe operation.
client.setReadBufferSize(1);
serverSocket->write(buffer, 2);
QVERIFY(client.waitForReadyRead());
// socket disconnects, if there any error on pipe
QCOMPARE(client.state(), QLocalSocket::ConnectedState);
QCOMPARE(client.bytesAvailable(), qint64(2));
QCOMPARE(client.read(buffer, 2), qint64(2));
#endif
}
static qint64 writeCommand(const QVariant &command, QIODevice *device, int commandCounter)

View File

@ -1,4 +1,5 @@
# Generated from socket.pro.
add_subdirectory(qlocalsocket)
add_subdirectory(qtcpserver)
add_subdirectory(qudpsocket)

View File

@ -0,0 +1,14 @@
#####################################################################
## tst_bench_qlocalsocket Binary:
#####################################################################
qt_internal_add_benchmark(tst_bench_qlocalsocket
SOURCES
tst_qlocalsocket.cpp
PUBLIC_LIBRARIES
Qt::Network
Qt::Test
)
#### Keys ignored in scope 1:.:.:qlocalsocket.pro:<TRUE>:
# TEMPLATE = "app"

View File

@ -0,0 +1,225 @@
/****************************************************************************
**
** Copyright (C) 2021 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the test suite of the Qt Toolkit.
**
** $QT_BEGIN_LICENSE:GPL-EXCEPT$
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 3 as published by the Free Software
** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-3.0.html.
**
** $QT_END_LICENSE$
**
****************************************************************************/
#include <QTest>
#include <QtCore/qglobal.h>
#include <QtCore/qthread.h>
#include <QtCore/qsemaphore.h>
#include <QtCore/qbytearray.h>
#include <QtCore/qeventloop.h>
#include <QtCore/qvector.h>
#include <QtCore/qelapsedtimer.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtNetwork/qlocalserver.h>
class tst_QLocalSocket : public QObject
{
Q_OBJECT
private slots:
void pingPong_data();
void pingPong();
void dataExchange_data();
void dataExchange();
};
class ServerThread : public QThread
{
public:
QSemaphore running;
explicit ServerThread(int chunkSize)
{
buffer.resize(chunkSize);
}
void run() override
{
QLocalServer server;
connect(&server, &QLocalServer::newConnection, [this, &server]() {
auto socket = server.nextPendingConnection();
connect(socket, &QLocalSocket::readyRead, [this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
});
QVERIFY(server.listen("foo"));
running.release();
exec();
}
protected:
QByteArray buffer;
};
class SocketFactory : public QObject
{
Q_OBJECT
public:
bool stopped = false;
explicit SocketFactory(int chunkSize, int connections)
{
buffer.resize(chunkSize);
for (int i = 0; i < connections; ++i) {
QLocalSocket *socket = new QLocalSocket(this);
Q_CHECK_PTR(socket);
connect(this, &SocketFactory::start, [this, socket]() {
QCOMPARE(socket->write(this->buffer), this->buffer.size());
});
connect(socket, &QLocalSocket::readyRead, [i, this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
emit this->bytesReceived(i, bytesAvailable);
if (!this->stopped)
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
socket->connectToServer("foo");
QCOMPARE(socket->state(), QLocalSocket::ConnectedState);
}
}
signals:
void start();
void bytesReceived(int channel, qint64 bytes);
protected:
QByteArray buffer;
};
void tst_QLocalSocket::pingPong_data()
{
QTest::addColumn<int>("connections");
for (int value : {10, 50, 100, 1000, 5000})
QTest::addRow("connections: %d", value) << value;
}
void tst_QLocalSocket::pingPong()
{
QFETCH(int, connections);
const int iterations = 100000;
Q_ASSERT(iterations >= connections && connections > 0);
ServerThread serverThread(1);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(1, connections);
QEventLoop eventLoop;
QVector<qint64> bytesToRead;
QElapsedTimer timer;
bytesToRead.fill(iterations / connections, connections);
connect(&factory, &SocketFactory::bytesReceived,
[&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(bytes);
if (--bytesToRead[channel] == 0 && --connections == 0) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0);
serverThread.quit();
serverThread.wait();
}
void tst_QLocalSocket::dataExchange_data()
{
QTest::addColumn<int>("connections");
QTest::addColumn<int>("chunkSize");
for (int connections : {1, 5, 10}) {
for (int chunkSize : {100, 1000, 10000, 100000}) {
QTest::addRow("connections: %d, chunk size: %d",
connections, chunkSize) << connections << chunkSize;
}
}
}
void tst_QLocalSocket::dataExchange()
{
QFETCH(int, connections);
QFETCH(int, chunkSize);
Q_ASSERT(chunkSize > 0 && connections > 0);
const qint64 timeToTest = 5000;
ServerThread serverThread(chunkSize);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(chunkSize, connections);
QEventLoop eventLoop;
qint64 totalReceived = 0;
QElapsedTimer timer;
connect(&factory, &SocketFactory::bytesReceived,
[&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(channel);
totalReceived += bytes;
if (timer.elapsed() >= timeToTest) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed());
serverThread.quit();
serverThread.wait();
}
QTEST_MAIN(tst_QLocalSocket)
#include "tst_qlocalsocket.moc"