winrt: Change the way tcp packets are handled
Similar to the way datagrams are handled for udp sockets the worker now takes care of tcp data. Thus we avoid race conditions which stopped data processing. It could happen that data was read from the socket into the buffer and before readyRead was emitted the buffer was completely read. In this case readNotification is set to false and no new data is processed afterwards. Additionally the buffer was replaced by a vector of QByteArray. The buffer kept growing and was never cleared (and there is no obvious way for clearing the buffer), so that an overflow happened eventually. pendingReadOperations (and its mutex) could be removed as well. There is only one situation where they could clash and that's the initial read. Having two members is preferred over having a list of operations and a mutex. Task-number: QTBUG-56438 Change-Id: Idbad58e47785996023748c310530892163f24594 Reviewed-by: Maurice Kalinowski <maurice.kalinowski@qt.io>
This commit is contained in:
parent
6ae9dc3f37
commit
14ea8759da
@ -126,6 +126,33 @@ static HRESULT qt_winrt_try_create_thread_network_context(QString host, ComPtr<I
|
||||
}
|
||||
#endif // _MSC_VER >= 1900
|
||||
|
||||
typedef QHash<qintptr, IStreamSocket *> TcpSocketHash;
|
||||
|
||||
struct SocketHandler
|
||||
{
|
||||
SocketHandler() : socketCount(0) {}
|
||||
qintptr socketCount;
|
||||
TcpSocketHash pendingTcpSockets;
|
||||
};
|
||||
|
||||
Q_GLOBAL_STATIC(SocketHandler, gSocketHandler)
|
||||
|
||||
struct SocketGlobal
|
||||
{
|
||||
SocketGlobal()
|
||||
{
|
||||
HRESULT hr;
|
||||
hr = GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_Buffer).Get(),
|
||||
&bufferFactory);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
|
||||
ComPtr<IBufferFactory> bufferFactory;
|
||||
};
|
||||
Q_GLOBAL_STATIC(SocketGlobal, g)
|
||||
|
||||
#define READ_BUFFER_SIZE 65536
|
||||
|
||||
static inline QString qt_QStringFromHString(const HString &string)
|
||||
{
|
||||
UINT32 length;
|
||||
@ -136,8 +163,43 @@ static inline QString qt_QStringFromHString(const HString &string)
|
||||
class SocketEngineWorker : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
SocketEngineWorker(QNativeSocketEnginePrivate *engine)
|
||||
: enginePrivate(engine)
|
||||
{
|
||||
}
|
||||
|
||||
~SocketEngineWorker()
|
||||
{
|
||||
if (Q_UNLIKELY(initialReadOp)) {
|
||||
ComPtr<IAsyncInfo> info;
|
||||
HRESULT hr = initialReadOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
if (info) {
|
||||
hr = info->Cancel();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
hr = info->Close();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
}
|
||||
|
||||
if (readOp) {
|
||||
ComPtr<IAsyncInfo> info;
|
||||
HRESULT hr = readOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
if (info) {
|
||||
hr = info->Cancel();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
hr = info->Close();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
signals:
|
||||
void newDatagramsReceived(const QList<WinRtDatagram> &datagram);
|
||||
void newDataReceived(const QVector<QByteArray> &data);
|
||||
void socketErrorOccured(QAbstractSocket::SocketError error);
|
||||
|
||||
public slots:
|
||||
Q_INVOKABLE void notifyAboutNewDatagrams()
|
||||
@ -148,7 +210,30 @@ public slots:
|
||||
emit newDatagramsReceived(datagrams);
|
||||
}
|
||||
|
||||
Q_INVOKABLE void notifyAboutNewData()
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
const QVector<QByteArray> newData = std::move(pendingData);
|
||||
pendingData.clear();
|
||||
emit newDataReceived(newData);
|
||||
}
|
||||
|
||||
public:
|
||||
void startReading()
|
||||
{
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
ComPtr<IInputStream> stream;
|
||||
hr = tcpSocket->get_InputStream(&stream);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, initialReadOp.GetAddressOf());
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
enginePrivate->socketState = QAbstractSocket::ConnectedState;
|
||||
hr = initialReadOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &SocketEngineWorker::onReadyRead).Get());
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
|
||||
HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args)
|
||||
{
|
||||
WinRtDatagram datagram;
|
||||
@ -184,9 +269,127 @@ public:
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT onReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status)
|
||||
{
|
||||
if (asyncInfo == initialReadOp.Get()) {
|
||||
initialReadOp.Reset();
|
||||
} else if (asyncInfo == readOp.Get()) {
|
||||
readOp.Reset();
|
||||
} else {
|
||||
Q_ASSERT(false);
|
||||
}
|
||||
|
||||
// A read in UnconnectedState will close the socket and return -1 and thus tell the caller,
|
||||
// that the connection was closed. The socket cannot be closed here, as the subsequent read
|
||||
// might fail then.
|
||||
if (status == Error || status == Canceled) {
|
||||
emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = asyncInfo->GetResults(&buffer);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to get read results buffer");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
UINT32 bufferLength;
|
||||
hr = buffer->get_Length(&bufferLength);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to get buffer length");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
// A zero sized buffer length signals, that the remote host closed the connection. The socket
|
||||
// cannot be closed though, as the following read might have socket descriptor -1 and thus and
|
||||
// the closing of the socket won't be communicated to the caller. So only the error is set. The
|
||||
// actual socket close happens inside of read.
|
||||
if (!bufferLength) {
|
||||
emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
ComPtr<Windows::Storage::Streams::IBufferByteAccess> byteArrayAccess;
|
||||
hr = buffer.As(&byteArrayAccess);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to get cast buffer");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
byte *data;
|
||||
hr = byteArrayAccess->Buffer(&data);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to access buffer data");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
QByteArray newData(reinterpret_cast<const char*>(data), qint64(bufferLength));
|
||||
QMutexLocker readLocker(&mutex);
|
||||
if (pendingData.isEmpty())
|
||||
QMetaObject::invokeMethod(this, "notifyAboutNewData", Qt::QueuedConnection);
|
||||
pendingData << newData;
|
||||
readLocker.unlock();
|
||||
|
||||
hr = QEventDispatcherWinRT::runOnXamlThread([buffer, this]() {
|
||||
UINT32 readBufferLength;
|
||||
ComPtr<IInputStream> stream;
|
||||
HRESULT hr = tcpSocket->get_InputStream(&stream);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to obtain input stream");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
// Reuse the stream buffer
|
||||
hr = buffer->get_Capacity(&readBufferLength);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to get buffer capacity");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
hr = buffer->put_Length(0);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "Failed to set buffer length");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "onReadyRead(): Could not read into socket stream buffer.");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &SocketEngineWorker::onReadyRead).Get());
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "onReadyRead(): Failed to set socket read callback.");
|
||||
emit socketErrorOccured(QAbstractSocket::UnknownSocketError);
|
||||
return S_OK;
|
||||
}
|
||||
return S_OK;
|
||||
});
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
void setTcpSocket(ComPtr<IStreamSocket> socket) { tcpSocket = socket; }
|
||||
|
||||
private:
|
||||
ComPtr<IStreamSocket> tcpSocket;
|
||||
|
||||
QList<WinRtDatagram> pendingDatagrams;
|
||||
QVector<QByteArray> pendingData;
|
||||
|
||||
// Protects pendingData/pendingDatagrams which are accessed from native callbacks
|
||||
QMutex mutex;
|
||||
|
||||
ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> initialReadOp;
|
||||
ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> readOp;
|
||||
|
||||
QNativeSocketEnginePrivate *enginePrivate;
|
||||
};
|
||||
|
||||
static QByteArray socketDescription(const QAbstractSocketEngine *s)
|
||||
@ -239,33 +442,6 @@ static QByteArray socketDescription(const QAbstractSocketEngine *s)
|
||||
} } while (0)
|
||||
#define Q_TR(a) QT_TRANSLATE_NOOP(QNativeSocketEngine, a)
|
||||
|
||||
typedef QHash<qintptr, IStreamSocket *> TcpSocketHash;
|
||||
|
||||
struct SocketHandler
|
||||
{
|
||||
SocketHandler() : socketCount(0) {}
|
||||
qintptr socketCount;
|
||||
TcpSocketHash pendingTcpSockets;
|
||||
};
|
||||
|
||||
Q_GLOBAL_STATIC(SocketHandler, gSocketHandler)
|
||||
|
||||
struct SocketGlobal
|
||||
{
|
||||
SocketGlobal()
|
||||
{
|
||||
HRESULT hr;
|
||||
hr = GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_Buffer).Get(),
|
||||
&bufferFactory);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
|
||||
ComPtr<IBufferFactory> bufferFactory;
|
||||
};
|
||||
Q_GLOBAL_STATIC(SocketGlobal, g)
|
||||
|
||||
#define READ_BUFFER_SIZE 65536
|
||||
|
||||
template <typename T>
|
||||
static AsyncStatus opStatus(const ComPtr<T> &op)
|
||||
{
|
||||
@ -315,6 +491,10 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
|
||||
connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection);
|
||||
connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::newDataReceived,
|
||||
this, &QNativeSocketEngine::handleNewData, Qt::QueuedConnection);
|
||||
connect(d->worker, &SocketEngineWorker::socketErrorOccured,
|
||||
this, &QNativeSocketEngine::handleTcpError, Qt::QueuedConnection);
|
||||
}
|
||||
|
||||
QNativeSocketEngine::~QNativeSocketEngine()
|
||||
@ -358,23 +538,9 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket::
|
||||
|
||||
// Start processing incoming data
|
||||
if (d->socketType == QAbstractSocket::TcpSocket) {
|
||||
HRESULT hr = QEventDispatcherWinRT::runOnXamlThread([d, socket, socketState, this]() {
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
|
||||
RETURN_HR_IF_FAILED("initialize(): Could not create buffer");
|
||||
ComPtr<IInputStream> stream;
|
||||
hr = socket->get_InputStream(&stream);
|
||||
RETURN_HR_IF_FAILED("initialize(): Could not obtain input stream");
|
||||
ComPtr<IAsyncBufferOperation> readOp;
|
||||
hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf());
|
||||
RETURN_HR_IF_FAILED_WITH_ARGS("initialize(): Failed to read from the socket buffer (%s).",
|
||||
socketDescription(this).constData());
|
||||
QMutexLocker locker(&d->readOperationsMutex);
|
||||
d->pendingReadOps.append(readOp);
|
||||
d->socketState = socketState;
|
||||
hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
|
||||
RETURN_HR_IF_FAILED_WITH_ARGS("initialize(): Failed to set socket read callback (%s).",
|
||||
socketDescription(this).constData());
|
||||
HRESULT hr = QEventDispatcherWinRT::runOnXamlThread([d, socket, this]() {
|
||||
d->worker->setTcpSocket(socket);
|
||||
d->worker->startReading();
|
||||
return S_OK;
|
||||
});
|
||||
if (FAILED(hr))
|
||||
@ -639,20 +805,6 @@ void QNativeSocketEngine::close()
|
||||
}
|
||||
#endif // _MSC_VER >= 1900
|
||||
|
||||
QMutexLocker locker(&d->readOperationsMutex);
|
||||
for (ComPtr<IAsyncBufferOperation> readOp : d->pendingReadOps) {
|
||||
ComPtr<IAsyncInfo> info;
|
||||
hr = readOp.As(&info);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
if (info) {
|
||||
hr = info->Cancel();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
hr = info->Close();
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
}
|
||||
}
|
||||
locker.unlock();
|
||||
|
||||
if (d->socketDescriptor != -1) {
|
||||
ComPtr<IClosable> socket;
|
||||
if (d->socketType == QAbstractSocket::TcpSocket) {
|
||||
@ -730,14 +882,32 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
|
||||
// happens and there isn't anything left in the buffer, we have to return -1 in order to signal
|
||||
// the closing of the socket.
|
||||
QMutexLocker mutexLocker(&d->readMutex);
|
||||
if (d->readBytes.pos() == d->readBytes.size() && d->socketState != QAbstractSocket::ConnectedState) {
|
||||
if (d->pendingData.isEmpty() && d->socketState != QAbstractSocket::ConnectedState) {
|
||||
close();
|
||||
return -1;
|
||||
}
|
||||
|
||||
qint64 b = d->readBytes.read(data, maxlen);
|
||||
d->bytesAvailable = d->readBytes.size() - d->readBytes.pos();
|
||||
return b;
|
||||
QByteArray readData;
|
||||
qint64 leftToMaxLen = maxlen;
|
||||
while (leftToMaxLen > 0 && !d->pendingData.isEmpty()) {
|
||||
QByteArray pendingData = d->pendingData.takeFirst();
|
||||
// Do not read the whole data. Put the rest of it back into the "queue"
|
||||
if (leftToMaxLen < pendingData.length()) {
|
||||
readData += pendingData.left(leftToMaxLen);
|
||||
pendingData = pendingData.remove(0, maxlen);
|
||||
d->pendingData.prepend(pendingData);
|
||||
break;
|
||||
} else {
|
||||
readData += pendingData;
|
||||
leftToMaxLen -= pendingData.length();
|
||||
}
|
||||
}
|
||||
const int copyLength = qMin(maxlen, qint64(readData.length()));
|
||||
d->bytesAvailable -= copyLength;
|
||||
mutexLocker.unlock();
|
||||
|
||||
memcpy(data, readData, copyLength);
|
||||
return copyLength;
|
||||
}
|
||||
|
||||
qint64 QNativeSocketEngine::write(const char *data, qint64 len)
|
||||
@ -913,7 +1083,7 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
|
||||
|
||||
// If we are a client, we are ready to read if our buffer has data
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
if (!d->readBytes.atEnd())
|
||||
if (!d->pendingData.isEmpty())
|
||||
return true;
|
||||
|
||||
// Nothing to do, wait for more events
|
||||
@ -1001,21 +1171,8 @@ void QNativeSocketEngine::establishRead()
|
||||
|
||||
HRESULT hr;
|
||||
hr = QEventDispatcherWinRT::runOnXamlThread([d]() {
|
||||
ComPtr<IInputStream> stream;
|
||||
HRESULT hr = d->tcpSocket()->get_InputStream(&stream);
|
||||
RETURN_HR_IF_FAILED("establishRead(): Failed to get socket input stream");
|
||||
|
||||
ComPtr<IBuffer> buffer;
|
||||
hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
|
||||
RETURN_HR_IF_FAILED("establishRead(): Failed to create buffer");
|
||||
|
||||
ComPtr<IAsyncBufferOperation> readOp;
|
||||
hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf());
|
||||
RETURN_HR_IF_FAILED("establishRead(): Failed to initiate socket read");
|
||||
QMutexLocker locker(&d->readOperationsMutex);
|
||||
d->pendingReadOps.append(readOp);
|
||||
hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
|
||||
RETURN_HR_IF_FAILED("establishRead(): Failed to register read callback");
|
||||
d->worker->setTcpSocket(d->tcpSocket());
|
||||
d->worker->startReading();
|
||||
return S_OK;
|
||||
});
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
@ -1032,6 +1189,32 @@ void QNativeSocketEngine::handleNewDatagrams(const QList<WinRtDatagram> &datagra
|
||||
emit readReady();
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::handleNewData(const QVector<QByteArray> &data)
|
||||
{
|
||||
// Defer putting the data into the list until the next event loop iteration
|
||||
// (where the readyRead signal is emitted as well)
|
||||
QMetaObject::invokeMethod(this, "putIntoPendingData", Qt::QueuedConnection,
|
||||
Q_ARG(QVector<QByteArray>, data));
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::handleTcpError(QAbstractSocket::SocketError error)
|
||||
{
|
||||
Q_D(QNativeSocketEngine);
|
||||
QNativeSocketEnginePrivate::ErrorString errorString;
|
||||
switch (error) {
|
||||
case QAbstractSocket::RemoteHostClosedError:
|
||||
errorString = QNativeSocketEnginePrivate::RemoteHostClosedErrorString;
|
||||
break;
|
||||
default:
|
||||
errorString = QNativeSocketEnginePrivate::UnknownSocketErrorString;
|
||||
}
|
||||
|
||||
d->setError(error, errorString);
|
||||
d->socketState = QAbstractSocket::UnconnectedState;
|
||||
if (d->notifyOnRead)
|
||||
emit readReady();
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram> &datagrams)
|
||||
{
|
||||
Q_D(QNativeSocketEngine);
|
||||
@ -1039,6 +1222,18 @@ void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram>
|
||||
d->pendingDatagrams.append(datagrams);
|
||||
}
|
||||
|
||||
void QNativeSocketEngine::putIntoPendingData(const QVector<QByteArray> &data)
|
||||
{
|
||||
Q_D(QNativeSocketEngine);
|
||||
QMutexLocker locker(&d->readMutex);
|
||||
d->pendingData.append(data);
|
||||
for (const QByteArray &newData : data)
|
||||
d->bytesAvailable += newData.length();
|
||||
locker.unlock();
|
||||
if (d->notifyOnRead)
|
||||
readNotification();
|
||||
}
|
||||
|
||||
bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol)
|
||||
{
|
||||
Q_UNUSED(socketProtocol);
|
||||
@ -1093,7 +1288,7 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate()
|
||||
, notifyOnException(false)
|
||||
, closingDown(false)
|
||||
, socketDescriptor(-1)
|
||||
, worker(new SocketEngineWorker)
|
||||
, worker(new SocketEngineWorker(this))
|
||||
, sslSocket(Q_NULLPTR)
|
||||
, connectionToken( { -1 } )
|
||||
{
|
||||
@ -1481,109 +1676,6 @@ HRESULT QNativeSocketEnginePrivate::handleConnectOpFinished(IAsyncAction *action
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status)
|
||||
{
|
||||
if (closingDown || wasDeleted || isDeletingChildren
|
||||
|| socketState == QAbstractSocket::UnconnectedState) {
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
Q_Q(QNativeSocketEngine);
|
||||
QMutexLocker locker(&readOperationsMutex);
|
||||
for (int i = 0; i < pendingReadOps.count(); ++i) {
|
||||
if (pendingReadOps.at(i).Get() == asyncInfo) {
|
||||
pendingReadOps.takeAt(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
locker.unlock();
|
||||
|
||||
// A read in UnconnectedState will close the socket and return -1 and thus tell the caller,
|
||||
// that the connection was closed. The socket cannot be closed here, as the subsequent read
|
||||
// might fail then.
|
||||
if (status == Error || status == Canceled) {
|
||||
setError(QAbstractSocket::RemoteHostClosedError, RemoteHostClosedErrorString);
|
||||
socketState = QAbstractSocket::UnconnectedState;
|
||||
if (notifyOnRead)
|
||||
emit q->readReady();
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
ComPtr<IBuffer> buffer;
|
||||
HRESULT hr = asyncInfo->GetResults(&buffer);
|
||||
RETURN_OK_IF_FAILED("Failed to get read results buffer");
|
||||
|
||||
UINT32 bufferLength;
|
||||
hr = buffer->get_Length(&bufferLength);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
// A zero sized buffer length signals, that the remote host closed the connection. The socket
|
||||
// cannot be closed though, as the following read might have socket descriptor -1 and thus and
|
||||
// the closing of the socket won't be communicated to the caller. So only the error is set. The
|
||||
// actual socket close happens inside of read.
|
||||
if (!bufferLength) {
|
||||
setError(QAbstractSocket::RemoteHostClosedError, RemoteHostClosedErrorString);
|
||||
socketState = QAbstractSocket::UnconnectedState;
|
||||
if (notifyOnRead)
|
||||
emit q->readReady();
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
ComPtr<Windows::Storage::Streams::IBufferByteAccess> byteArrayAccess;
|
||||
hr = buffer.As(&byteArrayAccess);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
byte *data;
|
||||
hr = byteArrayAccess->Buffer(&data);
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
|
||||
QMutexLocker readLocker(&readMutex);
|
||||
if (readBytes.atEnd()) // Everything has been read; the buffer is safe to reset
|
||||
readBytes.close();
|
||||
if (!readBytes.isOpen())
|
||||
readBytes.open(QBuffer::ReadWrite|QBuffer::Truncate);
|
||||
qint64 readPos = readBytes.pos();
|
||||
readBytes.seek(readBytes.size());
|
||||
Q_ASSERT(readBytes.atEnd());
|
||||
readBytes.write(reinterpret_cast<const char*>(data), qint64(bufferLength));
|
||||
readBytes.seek(readPos);
|
||||
bytesAvailable = readBytes.size() - readBytes.pos();
|
||||
readLocker.unlock();
|
||||
|
||||
if (notifyOnRead)
|
||||
emit q->readReady();
|
||||
|
||||
hr = QEventDispatcherWinRT::runOnXamlThread([buffer, q, this]() {
|
||||
UINT32 readBufferLength;
|
||||
ComPtr<IInputStream> stream;
|
||||
HRESULT hr = tcpSocket()->get_InputStream(&stream);
|
||||
RETURN_HR_IF_FAILED("handleReadyRead(): Could not obtain input stream");
|
||||
|
||||
// Reuse the stream buffer
|
||||
hr = buffer->get_Capacity(&readBufferLength);
|
||||
RETURN_HR_IF_FAILED("handleReadyRead(): Could not obtain buffer capacity");
|
||||
hr = buffer->put_Length(0);
|
||||
RETURN_HR_IF_FAILED("handleReadyRead(): Could not set buffer length");
|
||||
|
||||
ComPtr<IAsyncBufferOperation> readOp;
|
||||
hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp);
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "handleReadyRead(): Could not read into socket stream buffer (%s).",
|
||||
socketDescription(q).constData());
|
||||
return S_OK;
|
||||
}
|
||||
QMutexLocker locker(&readOperationsMutex);
|
||||
pendingReadOps.append(readOp);
|
||||
hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &QNativeSocketEnginePrivate::handleReadyRead).Get());
|
||||
if (FAILED(hr)) {
|
||||
qErrnoWarning(hr, "handleReadyRead(): Failed to set socket read callback (%s).",
|
||||
socketDescription(q).constData());
|
||||
return S_OK;
|
||||
}
|
||||
return S_OK;
|
||||
});
|
||||
Q_ASSERT_SUCCEEDED(hr);
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args)
|
||||
{
|
||||
Q_Q(QNativeSocketEngine);
|
||||
|
@ -144,9 +144,12 @@ signals:
|
||||
private slots:
|
||||
void establishRead();
|
||||
void handleNewDatagrams(const QList<WinRtDatagram> &datagram);
|
||||
void handleNewData(const QVector<QByteArray> &data);
|
||||
void handleTcpError(QAbstractSocket::SocketError error);
|
||||
|
||||
private:
|
||||
Q_INVOKABLE void putIntoPendingDatagramsList(const QList<WinRtDatagram> &datagrams);
|
||||
Q_INVOKABLE void putIntoPendingData(const QVector<QByteArray> &data);
|
||||
|
||||
Q_DECLARE_PRIVATE(QNativeSocketEngine)
|
||||
Q_DISABLE_COPY(QNativeSocketEngine)
|
||||
@ -215,23 +218,17 @@ private:
|
||||
Microsoft::WRL::ComPtr<ABI::Windows::Networking::Sockets::IStreamSocketListener> tcpListener;
|
||||
Microsoft::WRL::ComPtr<ABI::Windows::Foundation::IAsyncAction> connectOp;
|
||||
|
||||
// Protected by readOperationsMutex. Written in handleReadyRead (native callback)
|
||||
QVector<Microsoft::WRL::ComPtr<ABI::Windows::Foundation::IAsyncOperationWithProgress<ABI::Windows::Storage::Streams::IBuffer *, UINT32>>> pendingReadOps;
|
||||
|
||||
// Protected by readMutex. Written in handleReadyRead (native callback)
|
||||
QBuffer readBytes;
|
||||
|
||||
// In case of TCP readMutex protects readBytes and bytesAvailable. In case of UDP it is
|
||||
// pendingDatagrams. They are written inside native callbacks (handleReadyRead and
|
||||
// handleNewDatagrams/putIntoPendingDatagramsList)
|
||||
mutable QMutex readMutex;
|
||||
|
||||
// As pendingReadOps is changed inside handleReadyRead(native callback) it has to be protected
|
||||
QMutex readOperationsMutex;
|
||||
|
||||
// Protected by readMutex. Written in handleReadyRead (native callback)
|
||||
QAtomicInteger<int> bytesAvailable;
|
||||
|
||||
// Protected by readMutex. Written in handleNewData/putIntoPendingData (native callback)
|
||||
QVector<QByteArray> pendingData;
|
||||
|
||||
// Protected by readMutex. Written in handleNewDatagrams/putIntoPendingDatagramsList
|
||||
QList<WinRtDatagram> pendingDatagrams;
|
||||
|
||||
@ -246,7 +243,6 @@ private:
|
||||
HRESULT handleClientConnection(ABI::Windows::Networking::Sockets::IStreamSocketListener *tcpListener,
|
||||
ABI::Windows::Networking::Sockets::IStreamSocketListenerConnectionReceivedEventArgs *args);
|
||||
HRESULT handleConnectOpFinished(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus);
|
||||
HRESULT handleReadyRead(ABI::Windows::Foundation::IAsyncOperationWithProgress<ABI::Windows::Storage::Streams::IBuffer *, UINT32> *asyncInfo, ABI::Windows::Foundation::AsyncStatus);
|
||||
};
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
Loading…
Reference in New Issue
Block a user