qt5base-lts/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp
Alex Trotsenko f265c87e01 Allow QWindowsPipe{Reader|Writer} to work with foreign event loops, take 2
When a foreign event loop that does not enter an alertable wait state
is running (which is also the case when a native dialog window is
modal), pipe handlers would freeze temporarily due to their APC
callbacks not being invoked.

We address this problem by moving the I/O callbacks to the Windows
thread pool, and only posting completion events to the main loop
from there. That makes the actual I/O completely independent from
any main loop, while the signal delivery works also with foreign
loops (because Qt event delivery uses Windows messages, which foreign
loops typically handle correctly).

As a nice side effect, performance (and in particular scalability)
is improved.

Several other approaches have been tried:
1) Using QWinEventNotifier was about a quarter slower and scaled much
   worse. Additionally, it also required a rather egregious hack to
   handle the (pathological) case of a single thread talking to both
   ends of a QLocalSocket synchronously.
2) Queuing APCs from the thread pool to the main thread and also
   posting wake-up events to its event loop, and handling I/O on the
   main thread; this performed roughly like this solution, but scaled
   half as well, and the separate wake-up path was still deemed hacky.
3) Only posting wake-up events to the main thread from the thread pool,
   and still handling I/O on the main thread; this still performed
   comparably to 2), and the pathological case was not handled at all.
4) Using this approach for reads and that of 3) for writes was slightly
   faster with big amounts of data, but scaled slightly worse, and the
   diverging implementations were deemed not desirable.

Fixes: QTBUG-64443
Change-Id: I66443c3021d6ba98639a214c3e768be97d2cf14b
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
2021-03-02 22:53:06 +02:00

226 lines
6.5 KiB
C++

/****************************************************************************
**
** Copyright (C) 2021 The Qt Company Ltd.
** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the test suite of the Qt Toolkit.
**
** $QT_BEGIN_LICENSE:GPL-EXCEPT$
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 3 as published by the Free Software
** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-3.0.html.
**
** $QT_END_LICENSE$
**
****************************************************************************/
#include <QTest>
#include <QtCore/qglobal.h>
#include <QtCore/qthread.h>
#include <QtCore/qsemaphore.h>
#include <QtCore/qbytearray.h>
#include <QtCore/qeventloop.h>
#include <QtCore/qvector.h>
#include <QtCore/qelapsedtimer.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtNetwork/qlocalserver.h>
class tst_QLocalSocket : public QObject
{
Q_OBJECT
private slots:
void pingPong_data();
void pingPong();
void dataExchange_data();
void dataExchange();
};
class ServerThread : public QThread
{
public:
QSemaphore running;
explicit ServerThread(int chunkSize)
{
buffer.resize(chunkSize);
}
void run() override
{
QLocalServer server;
connect(&server, &QLocalServer::newConnection, [this, &server]() {
auto socket = server.nextPendingConnection();
connect(socket, &QLocalSocket::readyRead, [this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
});
QVERIFY(server.listen("foo"));
running.release();
exec();
}
protected:
QByteArray buffer;
};
class SocketFactory : public QObject
{
Q_OBJECT
public:
bool stopped = false;
explicit SocketFactory(int chunkSize, int connections)
{
buffer.resize(chunkSize);
for (int i = 0; i < connections; ++i) {
QLocalSocket *socket = new QLocalSocket(this);
Q_CHECK_PTR(socket);
connect(this, &SocketFactory::start, [this, socket]() {
QCOMPARE(socket->write(this->buffer), this->buffer.size());
});
connect(socket, &QLocalSocket::readyRead, [i, this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
emit this->bytesReceived(i, bytesAvailable);
if (!this->stopped)
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
socket->connectToServer("foo");
QCOMPARE(socket->state(), QLocalSocket::ConnectedState);
}
}
signals:
void start();
void bytesReceived(int channel, qint64 bytes);
protected:
QByteArray buffer;
};
void tst_QLocalSocket::pingPong_data()
{
QTest::addColumn<int>("connections");
for (int value : {10, 50, 100, 1000, 5000})
QTest::addRow("connections: %d", value) << value;
}
void tst_QLocalSocket::pingPong()
{
QFETCH(int, connections);
const int iterations = 100000;
Q_ASSERT(iterations >= connections && connections > 0);
ServerThread serverThread(1);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(1, connections);
QEventLoop eventLoop;
QVector<qint64> bytesToRead;
QElapsedTimer timer;
bytesToRead.fill(iterations / connections, connections);
connect(&factory, &SocketFactory::bytesReceived,
[&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(bytes);
if (--bytesToRead[channel] == 0 && --connections == 0) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0);
serverThread.quit();
serverThread.wait();
}
void tst_QLocalSocket::dataExchange_data()
{
QTest::addColumn<int>("connections");
QTest::addColumn<int>("chunkSize");
for (int connections : {1, 5, 10}) {
for (int chunkSize : {100, 1000, 10000, 100000}) {
QTest::addRow("connections: %d, chunk size: %d",
connections, chunkSize) << connections << chunkSize;
}
}
}
void tst_QLocalSocket::dataExchange()
{
QFETCH(int, connections);
QFETCH(int, chunkSize);
Q_ASSERT(chunkSize > 0 && connections > 0);
const qint64 timeToTest = 5000;
ServerThread serverThread(chunkSize);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(chunkSize, connections);
QEventLoop eventLoop;
qint64 totalReceived = 0;
QElapsedTimer timer;
connect(&factory, &SocketFactory::bytesReceived,
[&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(channel);
totalReceived += bytes;
if (timer.elapsed() >= timeToTest) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed());
serverThread.quit();
serverThread.wait();
}
QTEST_MAIN(tst_QLocalSocket)
#include "tst_qlocalsocket.moc"