WinRT: Fix TCP socket reads

All read calls are now pulled from an intermediate buffer which is
populated from the asynchronous callback (this was a TODO previously, and
was breaking downloads of large requests). As a side-benefit, the use of
only async callbacks ensures fewer first-chance exceptions appear in the
debug output.

Task-number: QTBUG-30196
Change-Id: I5653742d8d94934a4b4a4227298865d20518bc4c
Reviewed-by: Oliver Wolff <oliver.wolff@digia.com>
This commit is contained in:
Andrew Knight 2014-04-22 12:43:28 +03:00 committed by The Qt Project
parent 13e3f269fd
commit 078c71ac8f
2 changed files with 224 additions and 131 deletions

View File

@ -74,6 +74,7 @@ typedef ITypedEventHandler<StreamSocketListener *, StreamSocketListenerConnectio
typedef ITypedEventHandler<DatagramSocket *, DatagramSocketMessageReceivedEventArgs *> DatagramReceivedHandler; typedef ITypedEventHandler<DatagramSocket *, DatagramSocketMessageReceivedEventArgs *> DatagramReceivedHandler;
typedef IAsyncOperationWithProgressCompletedHandler<IBuffer *, UINT32> SocketReadCompletedHandler; typedef IAsyncOperationWithProgressCompletedHandler<IBuffer *, UINT32> SocketReadCompletedHandler;
typedef IAsyncOperationWithProgressCompletedHandler<UINT32, UINT32> SocketWriteCompletedHandler; typedef IAsyncOperationWithProgressCompletedHandler<UINT32, UINT32> SocketWriteCompletedHandler;
typedef IAsyncOperationWithProgress<IBuffer *, UINT32> IAsyncBufferOperation;
QT_BEGIN_NAMESPACE QT_BEGIN_NAMESPACE
@ -130,6 +131,8 @@ QString qt_QStringFromHSTRING(HSTRING string)
return QString::fromWCharArray(rawString, length); return QString::fromWCharArray(rawString, length);
} }
#define READ_BUFFER_SIZE 8192
class ByteArrayBuffer : public Microsoft::WRL::RuntimeClass<RuntimeClassFlags<WinRtClassicComMix>, class ByteArrayBuffer : public Microsoft::WRL::RuntimeClass<RuntimeClassFlags<WinRtClassicComMix>,
IBuffer, Windows::Storage::Streams::IBufferByteAccess> IBuffer, Windows::Storage::Streams::IBufferByteAccess>
{ {
@ -167,16 +170,6 @@ public:
return S_OK; return S_OK;
} }
QNativeSocketEngine *engine() const
{
return m_engine;
}
void setEngine(QNativeSocketEngine *engine)
{
m_engine = engine;
}
ComPtr<IInputStream> inputStream() const ComPtr<IInputStream> inputStream() const
{ {
return m_stream; return m_stream;
@ -190,13 +183,33 @@ public:
private: private:
QByteArray m_bytes; QByteArray m_bytes;
UINT32 m_length; UINT32 m_length;
QPointer<QNativeSocketEngine> m_engine;
ComPtr<IInputStream> m_stream; ComPtr<IInputStream> m_stream;
}; };
template <typename T>
static AsyncStatus opStatus(const ComPtr<T> &op)
{
ComPtr<IAsyncInfo> info;
HRESULT hr = op.As(&info);
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to cast op to IAsyncInfo.");
return Error;
}
AsyncStatus status;
hr = info->get_Status(&status);
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to get AsyncStatus.");
return Error;
}
return status;
}
QNativeSocketEngine::QNativeSocketEngine(QObject *parent) QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
: QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent) : QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent)
{ {
connect(this, SIGNAL(connectionReady()), SLOT(connectionNotification()), Qt::QueuedConnection);
connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection);
connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection);
} }
QNativeSocketEngine::~QNativeSocketEngine() QNativeSocketEngine::~QNativeSocketEngine()
@ -230,7 +243,7 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket::
// Currently, only TCP sockets are initialized this way. // Currently, only TCP sockets are initialized this way.
SocketHandler *handler = gSocketHandler(); SocketHandler *handler = gSocketHandler();
d->tcp = handler->pendingTcpSockets.value(socketDescriptor, Q_NULLPTR); d->tcp = handler->pendingTcpSockets.take(socketDescriptor);
d->socketType = QAbstractSocket::TcpSocket; d->socketType = QAbstractSocket::TcpSocket;
if (!d->tcp || !d->fetchConnectionParameters()) if (!d->tcp || !d->fetchConnectionParameters())
@ -271,23 +284,33 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port)
return false; return false;
} }
ComPtr<IAsyncAction> op;
const QString portString = QString::number(port); const QString portString = QString::number(port);
HStringReference portReference(reinterpret_cast<LPCWSTR>(portString.utf16())); HStringReference portReference(reinterpret_cast<LPCWSTR>(portString.utf16()));
ComPtr<IAsyncAction> action;
HRESULT hr = E_FAIL; HRESULT hr = E_FAIL;
if (d->socketType == QAbstractSocket::TcpSocket) if (d->socketType == QAbstractSocket::TcpSocket)
hr = d->tcp->ConnectAsync(remoteHost.Get(), portReference.Get(), &action); hr = d->tcp->ConnectAsync(remoteHost.Get(), portReference.Get(), &op);
else if (d->socketType == QAbstractSocket::UdpSocket) else if (d->socketType == QAbstractSocket::UdpSocket)
hr = d->udp->ConnectAsync(remoteHost.Get(), portReference.Get(), &action); hr = d->udp->ConnectAsync(remoteHost.Get(), portReference.Get(), &op);
if (FAILED(hr)) { if (FAILED(hr)) {
qWarning("QNativeSocketEnginePrivate::nativeConnect:: Could not obtain connect action"); qWarning("QNativeSocketEnginePrivate::nativeConnect:: Could not obtain connect action");
return false; return false;
} }
action->put_Completed(Callback<IAsyncActionCompletedHandler>(&QNativeSocketEnginePrivate::interruptEventDispatcher).Get()); hr = op->put_Completed(Callback<IAsyncActionCompletedHandler>(
hr = action->GetResults(); d, &QNativeSocketEnginePrivate::handleConnectToHost).Get());
while ((hr = action->GetResults()) == E_ILLEGAL_METHOD_CALL) if (FAILED(hr)) {
QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents | QEventLoop::WaitForMoreEvents); qErrnoWarning(hr, "Unable to set host connection callback.");
return false;
}
d->socketState = QAbstractSocket::ConnectingState;
while (opStatus(op) == Started)
d->eventLoop.processEvents();
AsyncStatus status = opStatus(op);
if (status == Error || status == Canceled)
return false;
if (hr == 0x8007274c) { // A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. if (hr == 0x8007274c) { // A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
d->setError(QAbstractSocket::NetworkError, d->ConnectionTimeOutErrorString); d->setError(QAbstractSocket::NetworkError, d->ConnectionTimeOutErrorString);
d->socketState = QAbstractSocket::UnconnectedState; d->socketState = QAbstractSocket::UnconnectedState;
@ -305,25 +328,22 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port)
} }
if (d->socketType == QAbstractSocket::TcpSocket) { if (d->socketType == QAbstractSocket::TcpSocket) {
UINT32 capacity;
hr = d->inputBuffer->get_Capacity(&capacity);
if (FAILED(hr))
return false;
IInputStream *stream; IInputStream *stream;
hr = d->tcp->get_InputStream(&stream); hr = d->tcp->get_InputStream(&stream);
if (FAILED(hr)) if (FAILED(hr))
return false; return false;
ByteArrayBuffer *buffer = static_cast<ByteArrayBuffer *>(d->inputBuffer.Get()); ByteArrayBuffer *buffer = static_cast<ByteArrayBuffer *>(d->readBuffer.Get());
buffer->setEngine(this);
buffer->setInputStream(stream); buffer->setInputStream(stream);
ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> op; ComPtr<IAsyncBufferOperation> op;
hr = stream->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op); hr = stream->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op);
if (FAILED(hr)) if (FAILED(hr))
return false; return false;
hr = op->put_Completed(Callback<SocketReadCompletedHandler>(&QNativeSocketEnginePrivate::handleReadyRead).Get()); hr = op->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
if (FAILED(hr)) if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to set socket read callback.");
return false; return false;
} }
}
d->socketState = QAbstractSocket::ConnectedState; d->socketState = QAbstractSocket::ConnectedState;
return true; return true;
} }
@ -358,21 +378,35 @@ bool QNativeSocketEngine::bind(const QHostAddress &address, quint16 port)
d->tcpListener->add_ConnectionReceived(Callback<ClientConnectedHandler>(d, &QNativeSocketEnginePrivate::handleClientConnection).Get(), &token); d->tcpListener->add_ConnectionReceived(Callback<ClientConnectedHandler>(d, &QNativeSocketEnginePrivate::handleClientConnection).Get(), &token);
hr = d->tcpListener->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op); hr = d->tcpListener->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op);
if (FAILED(hr)) { if (FAILED(hr)) {
qWarning("Unable to bind"); // ### Set error message qErrnoWarning(hr, "Unable to bind socket."); // ### Set error message
return false; return false;
} }
} else if (d->socketType == QAbstractSocket::UdpSocket) { } else if (d->socketType == QAbstractSocket::UdpSocket) {
hr = d->udp->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op); hr = d->udp->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op);
if (FAILED(hr)) { if (FAILED(hr)) {
qWarning("unable to bind"); // ### Set error message qErrnoWarning(hr, "Unable to bind socket."); // ### Set error message
return false;
}
hr = op->put_Completed(Callback<IAsyncActionCompletedHandler>(d, &QNativeSocketEnginePrivate::handleBindCompleted).Get());
if (FAILED(hr)) {
qErrnoWarning(hr, "Unable to set bind callback.");
return false; return false;
} }
} }
if (op) { if (op) {
// Wait for connection to enter bound state - TODO: timeout, check result while (opStatus(op) == Started)
while ((hr = op->GetResults()) == E_ILLEGAL_METHOD_CALL) d->eventLoop.processEvents();
QCoreApplication::processEvents();
AsyncStatus status = opStatus(op);
if (status == Error || status == Canceled)
return false;
hr = op->GetResults();
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to bind socket");
return false;
}
d->socketState = QAbstractSocket::BoundState; d->socketState = QAbstractSocket::BoundState;
d->fetchConnectionParameters(); d->fetchConnectionParameters();
@ -410,17 +444,22 @@ int QNativeSocketEngine::accept()
if (d->socketType == QAbstractSocket::TcpSocket) { if (d->socketType == QAbstractSocket::TcpSocket) {
IStreamSocket *socket = d->pendingConnections.takeFirst(); IStreamSocket *socket = d->pendingConnections.takeFirst();
UINT32 capacity;
d->inputBuffer->get_Capacity(&capacity);
IInputStream *stream; IInputStream *stream;
socket->get_InputStream(&stream); socket->get_InputStream(&stream);
// TODO: delete buffer and stream on socket close // TODO: delete buffer and stream on socket close
ByteArrayBuffer *buffer = static_cast<ByteArrayBuffer *>(d->inputBuffer.Get()); ByteArrayBuffer *buffer = static_cast<ByteArrayBuffer *>(d->readBuffer.Get());
buffer->setEngine(this);
buffer->setInputStream(stream); buffer->setInputStream(stream);
ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> op; ComPtr<IAsyncBufferOperation> op;
stream->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op); HRESULT hr = stream->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op);
op->put_Completed(Callback<SocketReadCompletedHandler>(&QNativeSocketEnginePrivate::handleReadyRead).Get()); if (FAILED(hr)) {
qErrnoWarning(hr, "Faild to read from the socket buffer.");
return -1;
}
hr = op->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to set socket read callback.");
return -1;
}
d->currentConnections.append(socket); d->currentConnections.append(socket);
SocketHandler *handler = gSocketHandler(); SocketHandler *handler = gSocketHandler();
@ -445,7 +484,6 @@ void QNativeSocketEngine::close()
d->closingDown = true; d->closingDown = true;
socket->Close(); socket->Close();
socket->Release(); socket->Release();
closeNotification();
d->socketDescriptor = -1; d->socketDescriptor = -1;
} }
d->socketDescriptor = -1; d->socketDescriptor = -1;
@ -493,13 +531,7 @@ qint64 QNativeSocketEngine::bytesAvailable() const
if (d->socketType != QAbstractSocket::TcpSocket) if (d->socketType != QAbstractSocket::TcpSocket)
return -1; return -1;
if (d->inputBuffer) { return d->readBytes.size() - d->readBytes.pos();
UINT32 len;
d->inputBuffer->get_Length(&len);
return len;
}
return -1;
} }
qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
@ -508,54 +540,56 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
if (d->socketType != QAbstractSocket::TcpSocket) if (d->socketType != QAbstractSocket::TcpSocket)
return -1; return -1;
ComPtr<IDataReaderStatics> dataReaderStatics; QMutexLocker mutexLocker(&d->readMutex);
GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_DataReader).Get(), &dataReaderStatics); return d->readBytes.read(data, maxlen);
ComPtr<IDataReader> reader;
dataReaderStatics->FromBuffer(d->inputBuffer.Get(), &reader);
UINT32 bufferCapacity;
d->inputBuffer->get_Capacity(&bufferCapacity);
qint64 lengthToRead = maxlen < bufferCapacity ? maxlen : bufferCapacity;
UINT32 bufferLength;
d->inputBuffer->get_Length(&bufferLength);
lengthToRead = bufferLength < lengthToRead ? bufferLength : lengthToRead;
reader->ReadBytes(lengthToRead, (unsigned char*)data);
return lengthToRead;
}
template <typename T>
static qint64 nativeWrite(T *socket, const char *data, qint64 len)
{
ComPtr<IOutputStream> stream;
HRESULT hr = socket->get_OutputStream(&stream);
if (FAILED(hr))
return -1;
ComPtr<ByteArrayBuffer> buffer = Make<ByteArrayBuffer>(data, len);
ComPtr<IAsyncOperationWithProgress<UINT32, UINT32>> op;
hr = stream->WriteAsync(buffer.Get(), &op);
if (FAILED(hr))
return -1;
UINT32 bytesWritten;
while ((hr = op->GetResults(&bytesWritten)) == E_ILLEGAL_METHOD_CALL)
QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents);
return bytesWritten;
} }
qint64 QNativeSocketEngine::write(const char *data, qint64 len) qint64 QNativeSocketEngine::write(const char *data, qint64 len)
{ {
Q_D(QNativeSocketEngine); Q_D(QNativeSocketEngine);
qint64 bytesWritten = -1; HRESULT hr = E_FAIL;
ComPtr<IOutputStream> stream;
if (d->socketType == QAbstractSocket::TcpSocket) if (d->socketType == QAbstractSocket::TcpSocket)
bytesWritten = nativeWrite(d->tcp, data, len); hr = d->tcp->get_OutputStream(&stream);
else if (d->socketType == QAbstractSocket::UdpSocket) else if (d->socketType == QAbstractSocket::UdpSocket)
bytesWritten = nativeWrite(d->udp, data, len); hr = d->udp->get_OutputStream(&stream);
if (bytesWritten != -1 && d->notifyOnWrite) if (FAILED(hr)) {
writeNotification(); qErrnoWarning(hr, "Failed to get output stream to socket.");
return bytesWritten; return -1;
}
ComPtr<ByteArrayBuffer> buffer = Make<ByteArrayBuffer>(data, len);
ComPtr<IAsyncOperationWithProgress<UINT32, UINT32>> op;
hr = stream->WriteAsync(buffer.Get(), &op);
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to write to socket.");
return -1;
}
hr = op->put_Completed(Callback<IAsyncOperationWithProgressCompletedHandler<UINT32, UINT32>>(
d, &QNativeSocketEnginePrivate::handleWriteCompleted).Get());
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to set socket write callback.");
return -1;
}
while (opStatus(op) == Started)
d->eventLoop.processEvents();
AsyncStatus status = opStatus(op);
if (status == Error || status == Canceled)
return -1;
UINT32 bytesWritten;
hr = op->GetResults(&bytesWritten);
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to get written socket length.");
return -1;
}
if (bytesWritten && d->notifyOnWrite)
emit writeReady();
return bytesWritten;
} }
qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QHostAddress *addr, quint16 *port) qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QHostAddress *addr, quint16 *port)
@ -698,7 +732,7 @@ bool QNativeSocketEngine::setOption(QAbstractSocketEngine::SocketOption option,
bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut) bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
{ {
Q_D(const QNativeSocketEngine); Q_D(QNativeSocketEngine);
Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::waitForRead(), false); Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::waitForRead(), false);
Q_CHECK_NOT_STATE(QNativeSocketEngine::waitForRead(), Q_CHECK_NOT_STATE(QNativeSocketEngine::waitForRead(),
QAbstractSocket::UnconnectedState, false); QAbstractSocket::UnconnectedState, false);
@ -714,14 +748,12 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut)
return true; return true;
// If we are a client, we are ready to read if our buffer has data // If we are a client, we are ready to read if our buffer has data
UINT32 length; QMutexLocker locker(&d->readMutex);
if (FAILED(d->inputBuffer->get_Length(&length))) if (!d->readBytes.atEnd())
return false;
if (length)
return true; return true;
// Nothing to do, wait for more events // Nothing to do, wait for more events
QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents|QEventLoop::WaitForMoreEvents); d->eventLoop.processEvents();
} }
d->setError(QAbstractSocket::SocketTimeoutError, d->setError(QAbstractSocket::SocketTimeoutError,
@ -832,8 +864,8 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate()
, closingDown(false) , closingDown(false)
, socketDescriptor(-1) , socketDescriptor(-1)
{ {
ComPtr<ByteArrayBuffer> buffer = Make<ByteArrayBuffer>(8192); ComPtr<ByteArrayBuffer> buffer = Make<ByteArrayBuffer>(READ_BUFFER_SIZE);
inputBuffer = buffer; readBuffer = buffer;
} }
QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate() QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate()
@ -1119,6 +1151,11 @@ bool QNativeSocketEnginePrivate::fetchConnectionParameters()
return true; return true;
} }
HRESULT QNativeSocketEnginePrivate::handleBindCompleted(IAsyncAction *, AsyncStatus)
{
return S_OK;
}
HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener *listener, IStreamSocketListenerConnectionReceivedEventArgs *args) HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener *listener, IStreamSocketListenerConnectionReceivedEventArgs *args)
{ {
Q_Q(QNativeSocketEngine); Q_Q(QNativeSocketEngine);
@ -1126,47 +1163,91 @@ HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener
IStreamSocket *socket; IStreamSocket *socket;
args->get_Socket(&socket); args->get_Socket(&socket);
pendingConnections.append(socket); pendingConnections.append(socket);
q->connectionNotification(); emit q->connectionReady();
q->readNotification(); emit q->readReady();
return interruptEventDispatcher(0, Completed);
}
HRESULT QNativeSocketEnginePrivate::interruptEventDispatcher(IAsyncAction *, AsyncStatus)
{
if (QThread *thread = QThread::currentThread()) {
if (QAbstractEventDispatcher *dispatcher = thread->eventDispatcher())
dispatcher->interrupt();
}
return S_OK; return S_OK;
} }
HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncOperationWithProgress<IBuffer *, UINT32> *asyncInfo, AsyncStatus) HRESULT QNativeSocketEnginePrivate::handleConnectToHost(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus)
{ {
return S_OK;
}
HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status)
{
Q_Q(QNativeSocketEngine);
if (wasDeleted || isDeletingChildren)
return S_OK;
if (status == Error || status == Canceled)
return S_OK;
ByteArrayBuffer *buffer = 0; ByteArrayBuffer *buffer = 0;
HRESULT hr = asyncInfo->GetResults((IBuffer **)&buffer); HRESULT hr = asyncInfo->GetResults((IBuffer **)&buffer);
if (FAILED(hr)) if (FAILED(hr)) {
return hr; qErrnoWarning(hr, "Failed to get ready read results.");
return S_OK;
}
UINT32 len; UINT32 len;
buffer->get_Length(&len); buffer->get_Length(&len);
QNativeSocketEngine *q = buffer->engine(); if (!len) {
if (!q) if (q->isReadNotificationEnabled())
emit q->readReady();
return S_OK; return S_OK;
if (len > 0 && q->isReadNotificationEnabled()) {
q->readNotification();
} }
// Continue reading ### TODO: read into offset!!! byte *data;
UINT32 capacity; buffer->Buffer(&data);
buffer->get_Capacity(&capacity);
ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> op; readMutex.lock();
if (SUCCEEDED(buffer->inputStream()->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op))) { if (readBytes.atEnd()) // Everything has been read; the buffer is safe to reset
if (q) readBytes.close();
return op->put_Completed(Callback<SocketReadCompletedHandler>(&QNativeSocketEnginePrivate::handleReadyRead).Get()); if (!readBytes.isOpen())
else readBytes.open(QBuffer::ReadWrite|QBuffer::Truncate);
return op->put_Completed(nullptr); qint64 readPos = readBytes.pos();
readBytes.seek(readBytes.size());
Q_ASSERT(readBytes.atEnd());
readBytes.write(reinterpret_cast<const char*>(data), qint64(len));
readBytes.seek(readPos);
readMutex.unlock();
if (q->isReadNotificationEnabled())
emit q->readReady();
ComPtr<IAsyncBufferOperation> op;
hr = buffer->inputStream()->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op);
if (FAILED(hr)) {
qErrnoWarning(hr, "Could not read into socket stream buffer.");
return S_OK;
}
hr = op->put_Completed(Callback<SocketReadCompletedHandler>(this, &QNativeSocketEnginePrivate::handleReadyRead).Get());
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to set socket read callback.");
return S_OK;
}
return S_OK;
} }
return E_FAIL; HRESULT QNativeSocketEnginePrivate::handleWriteCompleted(IAsyncOperationWithProgress<UINT32, UINT32> *op, AsyncStatus status)
{
if (status == Error) {
ComPtr<IAsyncInfo> info;
HRESULT hr = op->QueryInterface(IID_PPV_ARGS(&info));
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to cast operation.");
return S_OK;
}
HRESULT errorCode;
hr = info->get_ErrorCode(&errorCode);
if (FAILED(hr)) {
qErrnoWarning(hr, "Failed to get error code.");
return S_OK;
}
qErrnoWarning(errorCode, "A socket error occurred.");
return S_OK;
}
return S_OK;
} }
HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args) HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args)
@ -1174,7 +1255,7 @@ HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, I
Q_Q(QNativeSocketEngine); Q_Q(QNativeSocketEngine);
Q_ASSERT(udp == socket); Q_ASSERT(udp == socket);
pendingDatagrams.append(args); pendingDatagrams.append(args);
q->readNotification(); emit q->readReady();
return S_OK; return S_OK;
} }

View File

@ -52,6 +52,8 @@
// //
// We mean it. // We mean it.
// //
#include <QtCore/QEventLoop>
#include <QtCore/QBuffer>
#include "QtNetwork/qhostaddress.h" #include "QtNetwork/qhostaddress.h"
#include "private/qabstractsocketengine_p.h" #include "private/qabstractsocketengine_p.h"
#include <wrl.h> #include <wrl.h>
@ -127,6 +129,11 @@ public:
bool isExceptionNotificationEnabled() const; bool isExceptionNotificationEnabled() const;
void setExceptionNotificationEnabled(bool enable); void setExceptionNotificationEnabled(bool enable);
signals:
void connectionReady();
void readReady();
void writeReady();
private: private:
Q_DECLARE_PRIVATE(QNativeSocketEngine) Q_DECLARE_PRIVATE(QNativeSocketEngine)
Q_DISABLE_COPY(QNativeSocketEngine) Q_DISABLE_COPY(QNativeSocketEngine)
@ -191,17 +198,22 @@ private:
ABI::Windows::Networking::Sockets::IDatagramSocket *udp; ABI::Windows::Networking::Sockets::IDatagramSocket *udp;
}; };
Microsoft::WRL::ComPtr<ABI::Windows::Networking::Sockets::IStreamSocketListener> tcpListener; Microsoft::WRL::ComPtr<ABI::Windows::Networking::Sockets::IStreamSocketListener> tcpListener;
Microsoft::WRL::ComPtr<ABI::Windows::Storage::Streams::IBuffer> inputBuffer; Microsoft::WRL::ComPtr<ABI::Windows::Storage::Streams::IBuffer> readBuffer;
QBuffer readBytes;
QMutex readMutex;
QList<ABI::Windows::Networking::Sockets::IDatagramSocketMessageReceivedEventArgs *> pendingDatagrams; QList<ABI::Windows::Networking::Sockets::IDatagramSocketMessageReceivedEventArgs *> pendingDatagrams;
QList<ABI::Windows::Networking::Sockets::IStreamSocket *> pendingConnections; QList<ABI::Windows::Networking::Sockets::IStreamSocket *> pendingConnections;
QList<ABI::Windows::Networking::Sockets::IStreamSocket *> currentConnections; QList<ABI::Windows::Networking::Sockets::IStreamSocket *> currentConnections;
QEventLoop eventLoop;
HRESULT handleBindCompleted(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus);
HRESULT handleNewDatagram(ABI::Windows::Networking::Sockets::IDatagramSocket *socket, HRESULT handleNewDatagram(ABI::Windows::Networking::Sockets::IDatagramSocket *socket,
ABI::Windows::Networking::Sockets::IDatagramSocketMessageReceivedEventArgs *args); ABI::Windows::Networking::Sockets::IDatagramSocketMessageReceivedEventArgs *args);
HRESULT handleClientConnection(ABI::Windows::Networking::Sockets::IStreamSocketListener *tcpListener, HRESULT handleClientConnection(ABI::Windows::Networking::Sockets::IStreamSocketListener *tcpListener,
ABI::Windows::Networking::Sockets::IStreamSocketListenerConnectionReceivedEventArgs *args); ABI::Windows::Networking::Sockets::IStreamSocketListenerConnectionReceivedEventArgs *args);
static HRESULT interruptEventDispatcher(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus); HRESULT handleConnectToHost(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus);
static HRESULT handleReadyRead(ABI::Windows::Foundation::IAsyncOperationWithProgress<ABI::Windows::Storage::Streams::IBuffer *, UINT32> *asyncInfo, ABI::Windows::Foundation::AsyncStatus); HRESULT handleReadyRead(ABI::Windows::Foundation::IAsyncOperationWithProgress<ABI::Windows::Storage::Streams::IBuffer *, UINT32> *asyncInfo, ABI::Windows::Foundation::AsyncStatus);
HRESULT handleWriteCompleted(ABI::Windows::Foundation::IAsyncOperationWithProgress<UINT32, UINT32> *, ABI::Windows::Foundation::AsyncStatus);
}; };
QT_END_NAMESPACE QT_END_NAMESPACE