[+] Net: TCP servers can now be multi-threaded

[+] Net: Added missing UDP send datagram
[*] IO bug fixes
This commit is contained in:
Reece Wilson 2022-11-17 20:56:41 +00:00
parent 898c0ced37
commit f86665fd36
17 changed files with 316 additions and 58 deletions

View File

@ -36,6 +36,7 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServerDriver> pDriver;
AuUInt uMaxConnections {};
AuUInt uMaxAcceptBacklog {};
bool bMultiThreadTCP {};
};
struct NetSocketBindEx : NetSocketBind

View File

@ -312,7 +312,7 @@ namespace Aurora::Async
&& (asyncLoop->WaitAny(0))
)
{
PollInternal(block);
PollInternal(false);
success = true;
}
else
@ -438,6 +438,13 @@ namespace Aurora::Async
break;
}
if (state->pendingWorkItems.empty() && (
(this->GetThreadState()->asyncLoop->GetSourceCount() > 1) ||
this->GetThreadState()->asyncLoop->CommitPending())) //(this->ToKernelWorkQueue()->IsSignaledPeek()))
{
return false;
}
} while (state->pendingWorkItems.empty());
if (group->workQueue.empty())
@ -640,7 +647,7 @@ namespace Aurora::Async
worker->shuttingdown = true;
}
if (groupId != 0)
if (!group->IsSysThread())
{
worker->threadObject->SendExitSignal();
threads.push_back(worker->threadObject);
@ -941,11 +948,18 @@ namespace Aurora::Async
}
auto threadState = AuMakeShared<ThreadState>();
if (!threadState)
{
SysPushErrorMem();
return {};
}
threadState->parent = group;
threadState->running = AuThreadPrimitives::EventUnique(true, false, true);
threadState->syncSema = AuThreadPrimitives::SemaphoreUnique(0);
threadState->id = workerId;
threadState->asyncLoop = AuStaticCast<AsyncLoop>(AuLoop::NewLoopQueue());
threadState->asyncLoop = AuMakeShared<AsyncLoop>();
threadState->asyncLoop->pParent = threadState.get();
threadState->rateLimiter.SetNextStep(1'000'000); // 1MS in nanoseconds
threadState->runMode = ERunMode::eEfficient;

View File

@ -27,7 +27,7 @@ namespace Aurora::Async
auto ret = LoopQueue::AddCallback(source, subscriber);
if (ret)
{
this->commitPending_ = true;
Schedule();
}
return ret;
}
@ -37,7 +37,7 @@ namespace Aurora::Async
auto ret = LoopQueue::AddCallbackEx(source, subscriber);
if (ret)
{
this->commitPending_ = true;
Schedule();
}
return ret;
}
@ -47,14 +47,29 @@ namespace Aurora::Async
auto ret = LoopQueue::AddCallback(subscriber);
if (ret)
{
this->commitPending_ = true;
Schedule();
}
return ret;
}
bool AsyncLoop::Commit()
void AsyncLoop::Schedule()
{
this->commitPending_ = true;
if (AuThreads::GetThread() != this->pParent->threadObject.get())
{
this->pParent->parent.lock()->cvVariable->Broadcast();
}
}
bool AsyncLoop::CommitPending()
{
return this->commitPending_;
}
bool AsyncLoop::Commit()
{
Schedule();
return true;
}
}

View File

@ -14,6 +14,7 @@ namespace Aurora::Async
{
struct AsyncLoop : AuLoop::LoopQueue
{
ThreadState *pParent;
void OnFrame();
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSource> &source, const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;
@ -21,6 +22,9 @@ namespace Aurora::Async
virtual bool AddCallback (const AuSPtr<AuLoop::ILoopSourceSubscriber> &subscriber) override;
virtual bool Commit () override;
void Schedule();
bool CommitPending();
private:
bool commitPending_ {};
};

View File

@ -19,12 +19,14 @@ namespace Aurora::IO
IOProcessor::IOProcessor(AuUInt threadId,
bool bTickOnly,
AuAsync::WorkerPId_t worker,
const AuSPtr<AuLoop::ILoopQueue> &pLoopQueue) :
const AuSPtr<AuLoop::ILoopQueue> &pLoopQueue,
bool bIsNoQueue) :
mutliplexIOAndTimer(!bTickOnly),
pLoopQueue(pLoopQueue),
asyncWorker(worker),
threadId(threadId),
streamProcessors(this)
streamProcessors(this),
bIsNoQueue(bIsNoQueue)
{
}
@ -54,6 +56,21 @@ namespace Aurora::IO
}
this->ToQueue()->SourceAdd(this->items.cvEvent);
if (!this->bIsNoQueue)
{
auto pAS = AuMakeShared<AuLoop::ILoopSourceSubscriberFunctional>([pThat = AuSharedFromThis()](const AuSPtr<AuLoop::ILoopSource> &pSource) -> bool
{
pThat->ManualTick();
return false;
});
if (!this->ToQueue()->AddCallback(this->items.cvEvent, pAS))
{
return {};
}
}
this->ToQueue()->Commit();
return true;
}
@ -461,7 +478,9 @@ namespace Aurora::IO
{
AU_LOCK_GUARD(this->items.mutex); // < critical section / reentrant | can nest submission
this->items.cvEvent->Set();
return AuTryInsert(this->workUnits, work);
auto bRet = AuTryInsert(this->workUnits, work);
//this->pLoopQueue->Commit(); // required to wake up async threads
return bRet;
}
bool IOProcessor::AddEventListener(const AuSPtr<IIOProcessorEventListener> &eventListener)
@ -659,6 +678,12 @@ namespace Aurora::IO
}
auto item = AuMakeShared<IOProcessorItem>();
if (!item)
{
SysPushErrorMemory();
return {};
}
item->pParent = this;
item->pListener = pListener;
item->pItem = pItem;
@ -850,9 +875,27 @@ namespace Aurora::IO
return this->threadId == AuThreads::GetThreadId();
}
static AuSPtr<IIOProcessor> NewIOProcessorEx(bool tickOnly, const AuSPtr<AuLoop::ILoopQueue> &queue, bool bIsNoQueue)
{
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuAsync::WorkerPId_t {}, queue, bIsNoQueue);
if (!processor)
{
SysPushErrorMem();
return {};
}
if (!processor->Init())
{
SysPushErrorNested();
return {};
}
return processor;
}
AUKN_SYM AuSPtr<IIOProcessor> NewIOProcessor(bool tickOnly, const AuSPtr<AuLoop::ILoopQueue> &queue)
{
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuAsync::WorkerPId_t {}, queue);
auto processor = AuMakeShared<IOProcessor>(0, tickOnly, AuAsync::WorkerPId_t {}, queue, false);
if (!processor)
{
SysPushErrorMem();
@ -889,7 +932,7 @@ namespace Aurora::IO
return {};
}
auto processor = AuMakeShared<IOProcessor>(AuUInt(thread.get()), tickOnly, id, queue);
auto processor = AuMakeShared<IOProcessor>(AuUInt(thread.get()), tickOnly, id, queue, false);
if (!processor)
{
SysPushErrorMem();
@ -914,6 +957,6 @@ namespace Aurora::IO
return {};
}
return NewIOProcessor(tickOnly, loop);
return NewIOProcessorEx(tickOnly, loop, true);
}
}

View File

@ -16,7 +16,7 @@ namespace Aurora::IO
struct IOProcessor : IIOProcessor, AuEnableSharedFromThis<IOProcessor>, AuAsync::IWorkItemHandler
{
~IOProcessor();
IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr<AuLoop::ILoopQueue> &loop);
IOProcessor(AuUInt threadId, bool tickOnly, AuAsync::WorkerPId_t worker, const AuSPtr<AuLoop::ILoopQueue> &loop, bool bIsNoQueue);
bool Init();
@ -91,6 +91,7 @@ namespace Aurora::IO
AuUInt64 refreshRateNs {};
AuUInt64 minFrameDeltaNs {};
bool bIsNoQueue;
void UpdateTimers();

View File

@ -24,14 +24,15 @@ namespace Aurora::IO::Loop
}
#endif
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<AuIO::IAsyncTransaction> &fileTransaction)
AUKN_SYM AuSPtr<ILoopSource> NewLSFile(const AuSPtr<AuIO::IAsyncTransaction> &pFileTransaction)
{
if (!fileTransaction)
if (!pFileTransaction)
{
SysPushErrorArg();
return {};
}
return fileTransaction->NewLoopSource();
return pFileTransaction->NewLoopSource();
}
AUKN_SYM AuSPtr<ILoopSource> NewStdIn()

View File

@ -21,7 +21,8 @@ namespace Aurora::IO::Net
pWorker,
{},
{},
4096),
4096,
true),
Socket(pInterface, pWorker, {}, -1)
{
}

View File

@ -8,6 +8,7 @@
#include "Networking.hpp"
#include "AuNetDatagramServer.hpp"
#include "AuNetEndpoint.hpp"
#include "AuNetError.hpp"
#if defined(AURORA_IS_MODERNNT_DERIVED)
#include "AuNetStream.NT.hpp"
@ -24,7 +25,8 @@ namespace Aurora::IO::Net
pWorker,
{},
{},
4096),
4096,
true),
pDriver_(pDriver)
{
@ -47,8 +49,55 @@ namespace Aurora::IO::Net
}
}
void DatagramServer::SendPacket(const NetEndpoint &destination, const AuSPtr<AuMemoryViewRead> &pPacketData, const AuSPtr<AuAsync::PromiseCallback<AuNullS, NetError>> &pCalback)
void DatagramServer::SendPacket(const NetEndpoint &destination,
const AuSPtr<AuMemoryViewRead> &pPacketData,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, NetError>> &pCalback2)
{
AuSPtr<AuIO::IAsyncFinishedSubscriberFunctional> pCallback;
if (pCalback2)
{
pCallback = AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([uSourceLength = pPacketData->length,
pCalback2 = AuSPtr<AuAsync::PromiseCallback<AuNullS, NetError>>(pCalback2)](AuUInt64 uOffset, AuUInt32 uLength)
{
if (uSourceLength != uLength)
{
AuNet::NetError er;
er.netError = AuNet::ENetworkError::eSocketBufferOverflow;
pCalback2->OnFailure(&er);
return;
}
pCalback2->OnSuccess((void *)nullptr);
});
}
auto pTransaction = AuMakeShared<NtAsyncNetworkTransaction>(this);
if (!pTransaction || (pCalback2 && !pCallback))
{
SysPushErrorMemory("Dropping datagram");
if (pCalback2)
{
AuNet::NetError er;
er.netError = AuNet::ENetworkError::eResourceConstraint;
pCalback2->OnFailure(&er);
}
return;
}
pTransaction->bDatagramMode = true;
AuMemcpy(&pTransaction->netEndpoint, &destination, sizeof(NetEndpoint));
pTransaction->SetCallback(pCallback);
if (!pTransaction->StartWrite(0, pPacketData))
{
NetError error;
NetError_SetCurrent(error);
pCalback2->OnFailure(&error);
}
}
AuSPtr<IDatagramDriver> DatagramServer::GetDatagramDriver()
@ -63,14 +112,14 @@ namespace Aurora::IO::Net
DeoptimizeEndpoint(endpoint);
endpoint.transportProtocol = ETransportProtocol::eProtocolUDP;
auto pByteByffer = this->socketChannel_.AsReadableByteBuffer();
auto pEndRead = pByteByffer->writePtr;
auto pByteBuffer = this->socketChannel_.AsReadableByteBuffer();
auto pEndRead = pByteBuffer->writePtr;
if (this->pDriver_)
{
try
{
this->pDriver_->OnPacket(endpoint, pByteByffer);
this->pDriver_->OnPacket(endpoint, pByteBuffer);
}
catch (...)
{
@ -83,7 +132,7 @@ namespace Aurora::IO::Net
}
}
pByteByffer->readPtr = pEndRead;
pByteBuffer->readPtr = pEndRead;
#endif
}

View File

@ -16,12 +16,14 @@ namespace Aurora::IO::Net
NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 maxConnections) :
AuUInt32 maxConnections,
bool bMultiThreadTCP) :
SocketServer(pInterface,
pWorker,
pDriver,
pSocketDriverFactory,
maxConnections),
maxConnections,
bMultiThreadTCP),
Socket(pInterface,
pWorker,
AuSPtr<ISocketDriver>{},

View File

@ -17,7 +17,8 @@ namespace Aurora::IO::Net
struct NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 maxConnections);
AuUInt32 maxConnections,
bool bMultiThreadTCP);
virtual void DoNonblockingReadTick() override;
@ -31,6 +32,5 @@ namespace Aurora::IO::Net
protected:
SocketServerAcceptReadOperation acceptOperation_;
AuSPtr<ISocketDriverFactory> pSocketDriverFactory_;
};
}

View File

@ -17,14 +17,16 @@ namespace Aurora::IO::Net
NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pFactory,
AuUInt32 maxConnections)
AuUInt32 maxConnections,
bool bMultiThreadTCP)
: Socket(pInterface,
pWorker,
AuSPtr<ISocketDriver>{},
-1),
pDriver_(pDriver),
pFactory_(pFactory),
uMaxConnections_(maxConnections)
uMaxConnections_(maxConnections),
bMultiThreadTCP(bMultiThreadTCP)
{
}

View File

@ -23,7 +23,8 @@ namespace Aurora::IO::Net
struct NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pFactory,
AuUInt32 maxConnections);
AuUInt32 maxConnections,
bool bMultiThreadTCP);
void Init(const NetEndpoint &localAddress);
void Listen(const NetEndpoint &localAddress, bool bBind = true, bool bListen = true);
@ -46,6 +47,8 @@ namespace Aurora::IO::Net
virtual bool BeginAcceptLoop() = 0;
virtual void DetroyServer() = 0;
const bool bMultiThreadTCP;
protected:
// INTERFACE: base os socket

View File

@ -9,6 +9,9 @@
#include "AuNetSocketServer.hpp"
#include "AuNetSocket.hpp"
#include "AuNetEndpoint.hpp"
#include "AuNetInterface.hpp"
#include "AuNetWorker.hpp"
#include "AuNetError.hpp"
namespace Aurora::IO::Net
{
@ -32,19 +35,66 @@ namespace Aurora::IO::Net
void SocketServerAcceptReadOperation::OnOverlappedComplete()
{
if (!this->nextSocketPtr)
{
return;
}
SOCKET hListenHandle = (SOCKET)this->pParent_->ToPlatformHandle();
int ret = ::setsockopt(this->nextSocket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char *)&hListenHandle,
sizeof(SOCKET));
int error = WSAGetLastError();
//AuLogDbg("Error {}", error);
SysAssert(ret != -1);
auto dwError = WSAGetLastError();
if (ret == -1)
{
SysPushErrorNet("Couldn't enable socket after overlapped accept");
NetError error;
error.osError = dwError;
this->nextSocketPtr->SendErrorNoStream(error);
return;
}
UpdateNextSocketAddresses();
if (this->pParent_->bMultiThreadTCP)
{
auto pCallback = AuMakeShared<IIOProcessorWorkUnitFunctional>([socket = this->nextSocketPtr]()
{
if (!socket->SendPreestablish())
{
SysPushErrorNet("Couldn't preestablish next socket");
socket->SendErrorNoStream({});
return;
}
socket->DoMain();
},
[]()
{
});
if (!pCallback)
{
SysPushErrorMemory();
SysPushErrorNet("Memory");
this->nextSocketPtr->SendErrorNoStream({});
return;
}
if (!this->nextSocketPtr->ToWorker()->ToProcessor()->SubmitIOWorkItem(pCallback))
{
SysPushErrorMemory();
SysPushErrorNet("couldnt schedule read tick on thread");
this->nextSocketPtr->SendErrorNoStream({});
}
}
else
{
this->nextSocketPtr->DoMain();
}
// accept next
this->pParent_->ScheduleAcceptTick(); // We **cannot** readd the current event in the trigger callback
@ -137,6 +187,12 @@ namespace Aurora::IO::Net
WSA_FLAG_OVERLAPPED
);
if (nextSocket == INVALID_SOCKET)
{
SysPushErrorNet("No socket");
return false;
}
auto pFactory = this->pParent_->GetFactory();
if (!bool(pFactory))
{
@ -151,8 +207,18 @@ namespace Aurora::IO::Net
return false;
}
NetWorker *pWorker;
if (this->pParent_->bMultiThreadTCP)
{
pWorker = this->pInterface_->TryScheduleEx().get();
}
else
{
pWorker = this->pParent_->ToWorkerEx();
}
nextSocketPtr = AuMakeShared<Socket>(this->pInterface_,
this->pParent_->ToWorkerEx(),
pWorker,
pNewDriver,
(AuUInt)nextSocket);
if (!bool(nextSocketPtr))
@ -162,13 +228,20 @@ namespace Aurora::IO::Net
return false;
}
if (this->pParent_->bMultiThreadTCP)
{
// Defer SendPreestablish until we're done to minimize RPCs
}
else
{
if (!nextSocketPtr->SendPreestablish())
{
SysPushErrorNet("Couldn't preestablish next socket");
return false;
}
}
return nextSocket != INVALID_SOCKET;
return true;
}
void SocketServerAcceptReadOperation::UpdateNextSocketAddresses()

View File

@ -80,9 +80,19 @@ namespace Aurora::IO::Net
return false;
}
NetWorker *pWorker;
if (this->pParent_->bMultiThreadTCP)
{
pWorker = this->pInterface_->TryScheduleEx().get();
}
else
{
pWorker = this->pParent_->ToWorkerEx();
}
// Create socket
this->nextSocketPtr = AuMakeShared<Socket>(this->pInterface_,
this->pParent_->ToWorkerEx(),
pWorker,
pNewDriver,
(AuUInt)this->nextSocket);
if (!bool(this->nextSocketPtr))
@ -93,13 +103,6 @@ namespace Aurora::IO::Net
return false;
}
// Preinit
if (!nextSocketPtr->SendPreestablish())
{
SysPushErrorNet("Couldn't preestablish next socket");
return false;
}
// brrrrrrrrrr
AuMemcpy(this->nextSocketPtr->remoteEndpoint_.hint, addressRemote, addressLength);
this->nextSocketPtr->UpdateLocalEndpoint();
@ -108,8 +111,52 @@ namespace Aurora::IO::Net
// Deoptimize (sockaddr -> binary) endpoints
UpdateNextSocketAddresses();
if (this->pParent_->bMultiThreadTCP)
{
auto pCallback = AuMakeShared<IIOProcessorWorkUnitFunctional>([=]()
{
// Preinit
if (!nextSocketPtr->SendPreestablish())
{
SysPushErrorNet("Couldn't establish TCP socket on alternative thread");
nextSocketPtr->CloseSocket();
return;
}
// Start the channels
this->nextSocketPtr->DoMain();
},
[]()
{
});
if (!pCallback)
{
SysPushErrorMemory();
SysPushErrorNet("Memory");
return false;
}
if (!pWorker->ToProcessor()->SubmitIOWorkItem(pCallback))
{
SysPushErrorMemory();
SysPushErrorNet("couldnt schedule accept on thread");
return false;
}
}
else
{
// Preinit
if (!nextSocketPtr->SendPreestablish())
{
SysPushErrorNet("Couldn't preestablish next socket");
return false;
}
// Start the channels
this->nextSocketPtr->DoMain();
}
// And we don't need this reference anymore...
this->nextSocketPtr.reset();

View File

@ -135,7 +135,8 @@ namespace Aurora::IO::Net
pWorker.get(),
netBind.pDriver,
netBind.pFactory,
uMaxSockets);
uMaxSockets,
netBind.bMultiThreadTCP);
if (!pSocket)
{
SysPushErrorNet("No Memory");
@ -146,7 +147,6 @@ namespace Aurora::IO::Net
{
pSocket->FinishConstructAsync();
NetEndpoint endpoint;
endpoint.ip = netBind.ip;
endpoint.uPort = netBind.uPort;

View File

@ -135,7 +135,8 @@ namespace Aurora::Threading::Primitives
}
}
}
while (AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState);
while (iCurState == -1 ||
AuAtomicCompareExchange(&this->state_, iCurState + 1, iCurState) != iCurState);
#endif
return true;
@ -203,7 +204,7 @@ namespace Aurora::Threading::Primitives
return;
}
auto val = --this->state_;
auto val = AuAtomicSub(&this->state_, 1);
if ((val == 1) && (this->bElevaterPending_))
{
@ -221,6 +222,7 @@ namespace Aurora::Threading::Primitives
AU_LOCK_GUARD(this->mutex_);
this->state_ = 0;
this->condition_->Broadcast();
this->reentrantWriteLockHandle_ = 0;
}
bool RWLockImpl::UpgradeReadToWrite(AuUInt64 timeout)
@ -252,8 +254,8 @@ namespace Aurora::Threading::Primitives
return false;
}
this->condition_->Broadcast();
this->state_ = 1;
this->condition_->Broadcast();
return true;
}