From bac7b8d098f2a3e676d1d0e54a7d8cbc5b68c8cc Mon Sep 17 00:00:00 2001 From: Jamie Reece Wilson Date: Wed, 2 Oct 2024 00:24:34 +0100 Subject: [PATCH] [*] QOL / Hardening / Optimizations / Bug fixes (but not really. this is just some patchwork) --- Include/Aurora/IO/Net/INetSrvDatagram.hpp | 4 +- Include/Aurora/IO/Net/ISocketChannel.hpp | 46 ++- Include/Aurora/Memory/ByteBuffer.hpp | 55 ++- Include/Aurora/Memory/ByteBuffer_Memory.inl | 2 +- Include/Aurora/Memory/ByteBuffer_Utils.inl | 23 +- Include/Aurora/RuntimeConfig.hpp | 5 + Source/Compression/AuCompression.hpp | 46 ++- Source/Compression/StreamCompression.cpp | 32 +- Source/Crypto/ECC/ECCX25519Private.cpp | 6 +- Source/Crypto/ECC/ECCX25519Public.cpp | 1 + Source/Crypto/ECC/PrivateECCImpl.cpp | 5 +- Source/Crypto/ECC/PublicECCImpl.cpp | 3 +- Source/Crypto/RSA/RSAPrivate.cpp | 4 +- Source/Crypto/RSA/RSAPrivate.hpp | 2 +- Source/Crypto/RSA/RSAPublic.cpp | 4 +- Source/Crypto/RSA/RSAPublic.hpp | 3 +- Source/IO/FS/FSPlatformDevices.NT.cpp | 2 +- Source/IO/Net/AuNetSocketChannel.cpp | 79 +++- Source/IO/Net/AuNetSocketChannel.hpp | 3 + Source/IO/Net/AuNetSocketChannelInput.cpp | 9 + Source/IO/Net/AuNetSocketChannelOutput.cpp | 25 +- Source/IO/Net/AuNetSocketChannelOutput.hpp | 1 + Source/IO/Net/AuNetSrvSockets.cpp | 8 +- .../AuNetDatagramSocketServerChannel.cpp | 374 ++++++++++++++++-- .../AuNetDatagramSocketServerChannel.hpp | 9 +- .../AuNetDatagramSocketServerSession.cpp | 83 +++- Source/IO/Protocol/AuProtocolStack.cpp | 27 +- 27 files changed, 729 insertions(+), 132 deletions(-) diff --git a/Include/Aurora/IO/Net/INetSrvDatagram.hpp b/Include/Aurora/IO/Net/INetSrvDatagram.hpp index bc647787..09bf6893 100644 --- a/Include/Aurora/IO/Net/INetSrvDatagram.hpp +++ b/Include/Aurora/IO/Net/INetSrvDatagram.hpp @@ -15,8 +15,8 @@ namespace Aurora::IO::Net struct NetDatagramBind { IPAddress ip; - AuUInt16 uPort; - AuUInt32 uDefaultInputStreamSize; + AuUInt16 uPort {}; + AuUInt32 uDefaultInputStreamSize {}; AuSPtr pDriver; }; diff --git a/Include/Aurora/IO/Net/ISocketChannel.hpp b/Include/Aurora/IO/Net/ISocketChannel.hpp index 69e7104a..a9f8cf47 100644 --- a/Include/Aurora/IO/Net/ISocketChannel.hpp +++ b/Include/Aurora/IO/Net/ISocketChannel.hpp @@ -76,16 +76,16 @@ namespace Aurora::IO::Net * @param pSendProtocol */ virtual void SpecifySendProtocol(const AuSPtr &pSendProtocol) = 0; - + /** - * @brief - * @param uNextFrameSize + * @brief + * @param uNextFrameSize */ virtual void SetNextFrameTargetLength(AuUInt uNextFrameSize) = 0; /** - * @brief - * @return + * @brief + * @return */ virtual AuUInt GetNextFrameTargetLength() = 0; @@ -208,6 +208,42 @@ namespace Aurora::IO::Net */ virtual AuUInt GetOutputBufferSize() = 0; + /** + * Recommended: 0, consume everything within the constraints of GetInputBufferSize/SpecifyInputBufferSize. + * use SingleFrame stack pieces to recursively tick a stack of protocol handlers. + * use DynamicBuffer protocol stack pieces to dynamically scale the output size. + * + * When not zero, this is the packet length per io processor frame. + * The io read is capped to segment of this size (uPageSize out of GetInputBufferSize/SpecifyInputBufferSize). + * In theory it is possible to boost max connection throughput (fair bandwidth between higher socket count) at the cost of lower bandwidth, increased tickrate, and higher cpu usage. + * + * Let say you can afford to buffer 10 frames. + * Should an aggressively written application dequeue of all of that at once, you would need to: + * >allocate x10 the amount of memory in a single io tick, + * >need to worry about how much cpu work time a single request or batch of requests take, + * >peak memory usage requirements for decompression and encryption handlers (you need x10 the peak memory usage). + * uPageSize can retard connections down into keeping their pending to-read memory elsewhere in the network stack. + * + * if you know the MTU is 32k, SpecifyPageLength(32k) for 1:1 op/tick. + * if you know the target is one tcp frame tick, SpecifyPageLength(64k) for 1:1 op/tick. + * udp may need splitting up across 576byte frames. + * you may want to bulk read multiple frames from kernel allocated network packets, over hammering tickrate. + * on the otherhand, you may want to limit how many segments you dequeue in a single io processor tick and ::read/ReadFromEx/ReadFileEx/io_submit([pread])(uNBytes) to a multiplier of your max expected packet size. + */ + virtual void SpecifyPageLength(AuUInt uPageSize) = 0; + + /** + * @brief + * @return + */ + virtual AuUInt SpecifyPageLength() = 0; + + + // + // BREAK! Ending preestablishment bias + // BREAK! Ending preestablishment bias + // + /** * @brief * @return diff --git a/Include/Aurora/Memory/ByteBuffer.hpp b/Include/Aurora/Memory/ByteBuffer.hpp index bec55fdb..10c2f875 100644 --- a/Include/Aurora/Memory/ByteBuffer.hpp +++ b/Include/Aurora/Memory/ByteBuffer.hpp @@ -88,6 +88,7 @@ namespace Aurora::Memory AuUInt8 flagNoRealloc : 1; /// Prevents a subset of free options, specifically realloc, operations AuUInt8 flagAlwaysExpandable : 1; /// Internal flag. Do not use. AuUInt8 flagReserveA : 1; /// Placeholder + AuUInt8 flagReservedB; /////////////////////////////////////////////////////////////////////// // Special flags/values @@ -465,18 +466,7 @@ namespace Aurora::Memory { AuSPtr memory; - inline SharedByteBuffer(AuSPtr pWriteView) : ByteBuffer() - { - this->allocSize = 0; - this->base = (AuUInt8 *)pWriteView->ptr; - this->length = pWriteView->length; - this->readPtr = this->base; - this->writePtr = this->base + this->length; - this->flagNoFree = true; - this->memory = pWriteView; - } - - inline SharedByteBuffer(AuSPtr pRAIIParentOwner, MemoryViewWrite view) : + inline SharedByteBuffer(const MemoryViewWrite &view, AuSPtr pRAIIParentOwner) : ByteBuffer(), __view(view) { @@ -484,11 +474,50 @@ namespace Aurora::Memory this->base = (AuUInt8 *)view.ptr; this->length = view.length; this->readPtr = this->base; - this->writePtr = this->base + this->length; + this->writePtr = this->base; this->flagNoFree = true; this->memory = pRAIIParentOwner; } + inline SharedByteBuffer(const MemoryViewWrite &view) : + ByteBuffer(), + __view(view) + { + this->allocSize = 0; + this->base = (AuUInt8 *)view.ptr; + this->length = view.length; + this->readPtr = this->base; + this->writePtr = this->base; + this->flagNoFree = true; + } + + inline SharedByteBuffer(const MemoryViewRead &view, AuSPtr pRAIIParentOwner) : + ByteBuffer(), + __view(*(MemoryViewWrite *)&view) + { + this->allocSize = 0; + this->base = (AuUInt8 *)view.ptr; + this->length = view.length; + this->readPtr = this->base; + this->writePtr = this->base + this->length; + this->flagNoFree = true; + this->flagWriteError = true; + this->memory = pRAIIParentOwner; + } + + inline SharedByteBuffer(const MemoryViewRead &view) : + ByteBuffer(), + __view(*(MemoryViewWrite *)&view) + { + this->allocSize = 0; + this->base = (AuUInt8 *)view.ptr; + this->length = view.length; + this->readPtr = this->base; + this->writePtr = this->base + this->length; + this->flagNoFree = true; + this->flagWriteError = true; + } + private: MemoryViewWrite __view; }; diff --git a/Include/Aurora/Memory/ByteBuffer_Memory.inl b/Include/Aurora/Memory/ByteBuffer_Memory.inl index 0462e7b0..88568bb7 100644 --- a/Include/Aurora/Memory/ByteBuffer_Memory.inl +++ b/Include/Aurora/Memory/ByteBuffer_Memory.inl @@ -274,7 +274,7 @@ namespace Aurora::Memory return true; } - auto newLength = AuMax(length, AuPageRoundUp(this->allocSize + (this->allocSize / 3), AuUInt(64))); + auto newLength = AuMax(length, AuPageRoundUp(this->allocSize + (this->allocSize / 3), AuUInt(128))); AuUInt8 *pNext {}; if (this->alignment) diff --git a/Include/Aurora/Memory/ByteBuffer_Utils.inl b/Include/Aurora/Memory/ByteBuffer_Utils.inl index a061a1b0..080e1f95 100644 --- a/Include/Aurora/Memory/ByteBuffer_Utils.inl +++ b/Include/Aurora/Memory/ByteBuffer_Utils.inl @@ -197,7 +197,7 @@ namespace Aurora::Memory ByteBuffer::ByteBuffer(ByteBuffer &&buffer) : flagCircular {}, flagExpandable {}, flagReadError {}, flagWriteError {}, - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {} + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA{}, flagReservedB {} { this->base = buffer.base; this->length = buffer.length; @@ -210,6 +210,7 @@ namespace Aurora::Memory this->flagReadError = buffer.flagReadError; this->flagWriteError = buffer.flagWriteError; this->alignment = buffer.alignment; + this->flagReservedB = buffer.flagReservedB; buffer.base = {}; buffer.length = {}; buffer.allocSize = {}; @@ -223,7 +224,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(const ByteBuffer &buffer, bool preservePointers) : - flagCircular {}, flagExpandable { }, flagReadError {}, flagWriteError {}, + flagCircular {}, flagExpandable { }, flagReadError {}, flagWriteError {}, flagReservedB {}, flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {} { if (buffer.length) @@ -262,7 +263,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(const void *in, AuUInt length, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->scaleSize = kBufferInitialPower; @@ -285,7 +286,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(const MemoryViewRead &readView, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->scaleSize = kBufferInitialPower; @@ -303,7 +304,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(const AuList &vector, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->scaleSize = kBufferInitialPower; @@ -321,7 +322,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(const MemoryViewRead &readView, AuUInt uAlignment, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->scaleSize = kBufferInitialPower; @@ -344,7 +345,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(AuUInt length, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->scaleSize = kBufferInitialPower; @@ -366,7 +367,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer(AuUInt length, AuUInt alignment, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { if (!length) @@ -390,7 +391,7 @@ namespace Aurora::Memory template ByteBuffer::ByteBuffer(T *base, T *end, bool circular, bool expandable) : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { if (!base) @@ -419,7 +420,7 @@ namespace Aurora::Memory } ByteBuffer::ByteBuffer() : - flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, + flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {}, flagCircular(0), flagExpandable(true), flagReadError(0), flagWriteError(0) { this->base = {}; @@ -444,6 +445,7 @@ namespace Aurora::Memory this->flagNoRealloc = other.flagNoRealloc; this->flagNoFree = other.flagNoFree; this->scaleSize = other.scaleSize; + this->flagReservedB = other.flagReservedB; other.base = {}; other.length = {}; other.allocSize = {}; @@ -483,6 +485,7 @@ namespace Aurora::Memory this->flagCircular = buffer.flagCircular; this->flagExpandable = buffer.flagExpandable; this->flagAlwaysExpandable = buffer.flagAlwaysExpandable; + this->flagReservedB = buffer.flagReservedB; return *this; } diff --git a/Include/Aurora/RuntimeConfig.hpp b/Include/Aurora/RuntimeConfig.hpp index 61ce9cdf..405e2b8a 100644 --- a/Include/Aurora/RuntimeConfig.hpp +++ b/Include/Aurora/RuntimeConfig.hpp @@ -277,7 +277,12 @@ namespace Aurora struct IOConfig { AuUInt32 uProtocolStackDefaultBufferSize { 64 * 1024 }; + + #if defined(AURORA_IS_SERVER) + bool bIsVeryLargeIOApplication { true }; + #else bool bIsVeryLargeIOApplication { false }; + #endif // On Win32, NewLSTimer has very bad resolution. // On linux, timerfd is good enough. diff --git a/Source/Compression/AuCompression.hpp b/Source/Compression/AuCompression.hpp index 1ac1a920..fc9d5ed9 100644 --- a/Source/Compression/AuCompression.hpp +++ b/Source/Compression/AuCompression.hpp @@ -9,7 +9,51 @@ namespace Aurora::Compression { - static const AuUInt64 kChunkSize = 128 * 1024; +#if defined(AURORA_IS_SERVER) || defined(AURORA_16_KIB_IN_OUT_BUFFERS_32K) + // servers can have a multiple of these per client, if streams are not to be recycled. + // XMPP, for example, does not reset. zlib and others expect genuine compression streams. + // HTTP, libdeflate, and others expect flush framesm and then for the stateless stream to be recreated each IO tick. + // let's assume we are a server; + // most of our IO is done under protocol stacks that'll handle almost all of the buffering, + // changing this value by 2-8 will apply to the law of large numbers on a server, + // changing this value by an order of magnitude is a cost of 10x memory per client (tho you can afford the memory these days), + // the underlying compressor object may not even be accessing these temp[kChunkSize] optimizations - some can directly read from AuByteBuffers, + // some compression libraries are internally alloc heavy, which delgates the usage area to malloc/free callbacks... + // ...these, like zstd, demand almost zero buffered data. a frame fragment is almost nothing. + // therefore, it makes sense to keep these values down as much as possible + static const AuUInt64 kChunkSize = 16 * 1024; +#elif defined(AURORA_IS_CLIENT) + // for builds explicitly earmarked to go out to end users, 2x the usual buffer size + static const AuUInt64 kChunkSize = 256 * 1024; +#else + // and general purpose. + static const AuUInt64 kChunkSize = 128 * 1024; + + // also note: zlib docs state up to 256k is efficient, but you can start at around 16k + // https://www.zlib.net/zlib_how.html Last modified 8 February 2023. approx 30 Oct 2004 - 11 Dec 2005. + // general infrastructure and requirements havent changed in 2 decades. + // in 2001 and 2021 you can afford to buy a server with a few tens of gigabytes of ram + // in 2001 and 2021 you can afford to buy a gigabit switch + // in 2001 and 2021 you're still running on deflate algorithms + // in 2004 and 2024 you're still struggling to parallelize compression without N x tar balls on N x cores (N ~= 2-6 would be reasonable) + // in 2012-2014 you could get yourself an ivybridge or a sandbridge, in 2024 you could get yourself an AMD page aligned atomic acker or Oxide Inside 14th gen. + // with ~4th gen being, in theory, multiple single digits slower than 14th gen. and, yes, comparing 4th gen to 9th gen compilation times will result in 4-8x boosts, + // on a very fundamental level, we havent moved that much in terms of single threaded performance. + // that also applies to compression. it's stil the same old slow task calculating bit patterns, looking through big precomputed (or not) datasets, and making in-order & non-parallelizable changes + // With hardware not progressing **that** much, the fundamental algorithms staying the exact same, and the same libraries being used, call me a bit insane to think maybe an article from 2005 isnt that far off from reality. + // 16k*128k (clients) ~= 2gb (x2 for local in/out? [= 4GiB] x4 for additional in/out stream pairing? [= 8GiB]) of ram for a heavyweight server (plus many other user implementation overheads) + // 16k per stream / 256MB on an embedded tablet or games console + // 16k per stream / 256 - 2048MB per app / 1024 - 4096 per home appliance mid 2000s (laptop, pc, etc) + // 16k per stream / 256 - 2048MB per app / 4096 - 32768 per home appliance mid 2020s (laptop, pc, etc) + // 64k (2^16) IPv4 UDP ports, including system region and others + // 2x IPv6 address = 128k + // AAA software can see peaks of hundreds of thousands to millions per clients per platform, so x2, x10, x20, x100 this (more responsive stateful, proprietary, xmpp, etc protocols) + // WebShid "technology" works on stateless packets, so your memory requirements are basically just this in the network stream buffers, and with the rest being temporary allocations (more expensive & wasteful for stream-like connections). + // Seems fair...? seems like the world we should be still living in...? maybe we can 10x or 100x it no matter the era for increased throughput...? would zlib even care...? +#endif + + // sometimes on stack + static const AuUInt64 kChunkSize2 = 128 * 1024; /// * compression type + bits -> internal zlib windowBits static bool CompressionLevelFromExternalApi(const DecompressInfo &info, AuInt8 &out) diff --git a/Source/Compression/StreamCompression.cpp b/Source/Compression/StreamCompression.cpp index c4af7d45..bae80483 100644 --- a/Source/Compression/StreamCompression.cpp +++ b/Source/Compression/StreamCompression.cpp @@ -256,8 +256,8 @@ namespace Aurora::Compression { int ret, flush; z_stream strm {}; - unsigned char in[kChunkSize]; - unsigned char out[kChunkSize]; + unsigned char in[kChunkSize2]; + unsigned char out[kChunkSize2]; if (!stream.pWritePipe) { @@ -348,8 +348,8 @@ namespace Aurora::Compression { int ret; z_stream strm {}; - unsigned char in[kChunkSize]; - unsigned char out[kChunkSize]; + unsigned char in[kChunkSize2]; + unsigned char out[kChunkSize2]; if (!stream.pWritePipe) { @@ -437,8 +437,8 @@ namespace Aurora::Compression #if defined(_AUHAS_BZIP2) int ret, flush; bz_stream strm {}; - char in[kChunkSize]; - char out[kChunkSize]; + char in[kChunkSize2]; + char out[kChunkSize2]; if (!stream.pWritePipe) { @@ -462,7 +462,7 @@ namespace Aurora::Compression do { AuIO::EStreamError error; - AuUInt read{ kChunkSize }; + AuUInt read{ kChunkSize2 }; if (((error = stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) && (error != AuIO::EStreamError::eErrorEndOfStream)) { @@ -528,8 +528,8 @@ namespace Aurora::Compression #if defined(_AUHAS_BZIP2) int ret; bz_stream strm {}; - char in[kChunkSize]; - char out[kChunkSize]; + char in[kChunkSize2]; + char out[kChunkSize2]; if (!stream.pWritePipe) { @@ -832,7 +832,7 @@ namespace Aurora::Compression } DecompressInfo meta3 = meta2; - meta3.uInternalStreamSize = kChunkSize * 10; // stupid, but required for BaseStream::Write to not blow up. not optimizing old apis. + meta3.uInternalStreamSize = kChunkSize2 * 10; // stupid, but required for BaseStream::Write to not blow up. not optimizing old apis. auto pCompressor = DecompressorUnique(stream.pReadPipe, meta3); if (!pCompressor) { @@ -840,13 +840,13 @@ namespace Aurora::Compression return false; } - AuByteBuffer buffer(kChunkSize * 2, true); + AuByteBuffer buffer(kChunkSize2 * 2, true); AuStreamReadWrittenPair_t pair; do { auto view = buffer.GetNextLinearRead(); - view.length = AuMin(view.length, kChunkSize / 2); + view.length = AuMin(view.length, kChunkSize2 / 2); auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), true); while (pair.first || pair.second) @@ -914,7 +914,7 @@ namespace Aurora::Compression } CompressInfo meta3 = meta2; - meta3.uInternalStreamSize = kChunkSize / 2; + meta3.uInternalStreamSize = kChunkSize2 / 2; auto pCompressor = CompressorUnique(stream.pReadPipe, meta3); if (!pCompressor) { @@ -922,13 +922,13 @@ namespace Aurora::Compression return false; } - AuByteBuffer buffer(kChunkSize / 2, true); + AuByteBuffer buffer(kChunkSize2 / 2, true); AuStreamReadWrittenPair_t pair; do { auto view = buffer.GetNextLinearRead(); - view.length = AuMin(view.length, kChunkSize / 2); + view.length = AuMin(view.length, kChunkSize2 / 2); auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), true); while (pair.first || pair.second) @@ -968,7 +968,7 @@ namespace Aurora::Compression } auto view = buffer.GetNextLinearRead(); - view.length = AuMin(view.length, kChunkSize / 2); + view.length = AuMin(view.length, kChunkSize2 / 2); auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), false); while (pair.first || pair.second) diff --git a/Source/Crypto/ECC/ECCX25519Private.cpp b/Source/Crypto/ECC/ECCX25519Private.cpp index 2678cded..8f64a6b2 100644 --- a/Source/Crypto/ECC/ECCX25519Private.cpp +++ b/Source/Crypto/ECC/ECCX25519Private.cpp @@ -12,14 +12,16 @@ namespace Aurora::Crypto::ECC { - PrivateCurve25519Impl::PrivateCurve25519Impl(bool isX25519, curve25519_key &&key) : bIsX25519_(isX25519), key_(key) + PrivateCurve25519Impl::PrivateCurve25519Impl(bool isX25519, curve25519_key &&key) : + bIsX25519_(isX25519), + key_(key) { } PrivateCurve25519Impl::~PrivateCurve25519Impl() { - + AuMemset(&this->key_, 'N', AuSizeOf(this->key_)); } bool PrivateCurve25519Impl::Sign(const Memory::MemoryViewRead &plainText, diff --git a/Source/Crypto/ECC/ECCX25519Public.cpp b/Source/Crypto/ECC/ECCX25519Public.cpp index 38619447..55da6679 100644 --- a/Source/Crypto/ECC/ECCX25519Public.cpp +++ b/Source/Crypto/ECC/ECCX25519Public.cpp @@ -18,6 +18,7 @@ namespace Aurora::Crypto::ECC PublicCurve25519Impl::~PublicCurve25519Impl() { + AuMemset(&this->key_, 'N', AuSizeOf(this->key_)); } bool PublicCurve25519Impl::Verify(const Memory::MemoryViewRead &hash, diff --git a/Source/Crypto/ECC/PrivateECCImpl.cpp b/Source/Crypto/ECC/PrivateECCImpl.cpp index 0187f89a..b7f90f09 100644 --- a/Source/Crypto/ECC/PrivateECCImpl.cpp +++ b/Source/Crypto/ECC/PrivateECCImpl.cpp @@ -17,7 +17,9 @@ namespace Aurora::Crypto::ECC { - PrivateECCImpl::PrivateECCImpl(EECCCurve type, ecc_key &key) : _key(key), _type(type) + PrivateECCImpl::PrivateECCImpl(EECCCurve type, ecc_key &key) : + _key(key), + _type(type) { } @@ -25,6 +27,7 @@ namespace Aurora::Crypto::ECC PrivateECCImpl::~PrivateECCImpl() { ecc_free(&this->_key); + AuMemset(&this->_key, 'N', AuSizeOf(this->_key)); } EECCCurve PrivateECCImpl::GetType() diff --git a/Source/Crypto/ECC/PublicECCImpl.cpp b/Source/Crypto/ECC/PublicECCImpl.cpp index 757d97fb..a262368b 100644 --- a/Source/Crypto/ECC/PublicECCImpl.cpp +++ b/Source/Crypto/ECC/PublicECCImpl.cpp @@ -23,7 +23,8 @@ namespace Aurora::Crypto::ECC PublicECCImpl::~PublicECCImpl() { - ecc_free(&_key); + ecc_free(&this->_key); + AuMemset(&this->_key, 'N', AuSizeOf(this->_key)); } EECCCurve PublicECCImpl::GetType() diff --git a/Source/Crypto/RSA/RSAPrivate.cpp b/Source/Crypto/RSA/RSAPrivate.cpp index 7eb1b134..f68589df 100644 --- a/Source/Crypto/RSA/RSAPrivate.cpp +++ b/Source/Crypto/RSA/RSAPrivate.cpp @@ -13,7 +13,8 @@ namespace Aurora::Crypto::RSA { - PrivateRSA::PrivateRSA(rsa_key &key) : key_(key) + PrivateRSA::PrivateRSA(const rsa_key &key) : + key_(key) { } @@ -21,6 +22,7 @@ namespace Aurora::Crypto::RSA PrivateRSA::~PrivateRSA() { ::rsa_free(&this->key_); + AuMemset(&this->key_, 'N', AuSizeOf(this->key_)); } bool PrivateRSA::Sign(const Memory::MemoryViewRead & payload, diff --git a/Source/Crypto/RSA/RSAPrivate.hpp b/Source/Crypto/RSA/RSAPrivate.hpp index bb4e60f0..f617f932 100644 --- a/Source/Crypto/RSA/RSAPrivate.hpp +++ b/Source/Crypto/RSA/RSAPrivate.hpp @@ -11,7 +11,7 @@ namespace Aurora::Crypto::RSA { struct PrivateRSA : IRSAPrivate { - PrivateRSA(rsa_key &key); + PrivateRSA(const rsa_key &key); ~PrivateRSA(); bool Sign(const Memory::MemoryViewRead & payload, diff --git a/Source/Crypto/RSA/RSAPublic.cpp b/Source/Crypto/RSA/RSAPublic.cpp index d0c1e484..4a9a3825 100644 --- a/Source/Crypto/RSA/RSAPublic.cpp +++ b/Source/Crypto/RSA/RSAPublic.cpp @@ -12,7 +12,8 @@ namespace Aurora::Crypto::RSA { - PublicRSA::PublicRSA(rsa_key &key) : key_(key) + PublicRSA::PublicRSA(const rsa_key &key) : + key_(key) { } @@ -20,6 +21,7 @@ namespace Aurora::Crypto::RSA PublicRSA::~PublicRSA() { ::rsa_free(&this->key_); + AuMemset(&this->key_, 'N', AuSizeOf(this->key_)); } bool PublicRSA::Verify(const AuMemoryViewRead &payload, diff --git a/Source/Crypto/RSA/RSAPublic.hpp b/Source/Crypto/RSA/RSAPublic.hpp index 4fe9ab50..26251bb0 100644 --- a/Source/Crypto/RSA/RSAPublic.hpp +++ b/Source/Crypto/RSA/RSAPublic.hpp @@ -12,7 +12,7 @@ namespace Aurora::Crypto::RSA { struct PublicRSA : IRSAPublic { - PublicRSA(rsa_key &key); + PublicRSA(const rsa_key &key); ~PublicRSA(); bool Verify(const Memory::MemoryViewRead & payload, @@ -33,6 +33,5 @@ namespace Aurora::Crypto::RSA private: rsa_key key_; - bool bOwned_; }; } \ No newline at end of file diff --git a/Source/IO/FS/FSPlatformDevices.NT.cpp b/Source/IO/FS/FSPlatformDevices.NT.cpp index a113e782..13dc0a4a 100644 --- a/Source/IO/FS/FSPlatformDevices.NT.cpp +++ b/Source/IO/FS/FSPlatformDevices.NT.cpp @@ -338,7 +338,7 @@ namespace Aurora::IO::FS } } - AUKN_SYM AuUInt64 GetDeviceSizeInBytes(const AuString &physicalDevicePath) + AUKN_SYM AuUInt64 GetDeviceSizeInBytes(const AuROString &physicalDevicePath) { DISK_GEOMETRY_EX geo {}; DWORD cbBytesReturned; diff --git a/Source/IO/Net/AuNetSocketChannel.cpp b/Source/IO/Net/AuNetSocketChannel.cpp index 9747f331..06c07bfb 100644 --- a/Source/IO/Net/AuNetSocketChannel.cpp +++ b/Source/IO/Net/AuNetSocketChannel.cpp @@ -271,6 +271,16 @@ namespace Aurora::IO::Net return pProtocol; } + void SocketChannel::SpecifyPageLength(AuUInt uPageSize) + { + this->uBytesPerFrame = uPageSize; + } + + AuUInt SocketChannel::SpecifyPageLength() + { + return this->uBytesPerFrame; + } + void SocketChannel::SpecifyRecvProtocol(const AuSPtr &pRecvProtocol) { this->pRecvProtocol = pRecvProtocol; @@ -629,7 +639,14 @@ namespace Aurora::IO::Net if (pCallbackOptional) { - pCallbackOptional->OnSuccess((void *)nullptr); + try + { + pCallbackOptional->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } } return; @@ -651,10 +668,14 @@ namespace Aurora::IO::Net if (bHasCurrentSuccess) { - if (pCallbackOptional) + try { pCallbackOptional->OnSuccess((void *)nullptr); } + catch (...) + { + SysPushErrorCatch(); + } return; } @@ -668,7 +689,14 @@ namespace Aurora::IO::Net { if (auto pOutput = AuExchange(this->pRetargetOutput, {})) { - pOutput->OnFailure((void *)nullptr); + try + { + pOutput->OnFailure((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } if (this->pRetargetInput == pOutput) { AuResetMember(this->pRetargetInput); @@ -686,7 +714,14 @@ namespace Aurora::IO::Net { if (auto pInput = AuExchange(this->pRetargetInput, {})) { - pInput->OnFailure((void *)nullptr); + try + { + pInput->OnFailure((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } if (this->pRetargetOutput == pInput) { AuResetMember(this->pRetargetOutput); @@ -715,7 +750,14 @@ namespace Aurora::IO::Net if (auto pOutput = AuExchange(this->pRetargetOutput, {})) { - pOutput->OnFailure((void *)nullptr); + try + { + pOutput->OnFailure((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } if (this->pRetargetInput == pOutput) { AuResetMember(this->pRetargetInput); @@ -730,7 +772,14 @@ namespace Aurora::IO::Net { if (this->pRetargetInput != pOutput) { - pOutput->OnSuccess((void *)nullptr); + try + { + pOutput->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } } } } @@ -750,7 +799,14 @@ namespace Aurora::IO::Net if (auto pInput = AuExchange(this->pRetargetInput, {})) { - pInput->OnFailure((void *)nullptr); + try + { + pInput->OnFailure((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } if (this->pRetargetOutput == pInput) { @@ -767,7 +823,14 @@ namespace Aurora::IO::Net { if (this->pRetargetOutput != pInput) { - pInput->OnSuccess((void *)nullptr); + try + { + pInput->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } } } } diff --git a/Source/IO/Net/AuNetSocketChannel.hpp b/Source/IO/Net/AuNetSocketChannel.hpp index 5d134021..778df1e9 100644 --- a/Source/IO/Net/AuNetSocketChannel.hpp +++ b/Source/IO/Net/AuNetSocketChannel.hpp @@ -69,6 +69,8 @@ namespace Aurora::IO::Net AuUInt GetNextFrameTargetLength() override; + void SpecifyPageLength(AuUInt uPageSize) override; + AuUInt SpecifyPageLength() override; void SpecifyRecvProtocol(const AuSPtr &pRecvProtocol) override; @@ -106,6 +108,7 @@ namespace Aurora::IO::Net AuUInt uBytesToFlip { 0 }; AuUInt uBytesInputBuffer { 0 }; AuUInt uBytesOutputBuffer { 0 }; + AuUInt uBytesPerFrame { 0 }; AuUInt uBytesInputBufferRetarget { 0 }; AuSPtr> pRetargetInput; diff --git a/Source/IO/Net/AuNetSocketChannelInput.cpp b/Source/IO/Net/AuNetSocketChannelInput.cpp index 3622d554..5320ebf2 100644 --- a/Source/IO/Net/AuNetSocketChannelInput.cpp +++ b/Source/IO/Net/AuNetSocketChannelInput.cpp @@ -86,6 +86,15 @@ namespace Aurora::IO::Net req.bIsStream = true; req.pListener = sharedThis; req.uBufferLengthOrZero = AuStaticCast(this->pParent_->ToChannel())->uBytesInputBuffer; + if (auto uFrame = AuStaticCast(this->pParent_->ToChannel())->uBytesPerFrame) + { + req.uPageLengthOrZero = uFrame; + req.bReadEntireAllocation = false; + } + else + { + req.bReadEntireAllocation = true; + } this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req); } diff --git a/Source/IO/Net/AuNetSocketChannelOutput.cpp b/Source/IO/Net/AuNetSocketChannelOutput.cpp index d1891f56..126abf0c 100644 --- a/Source/IO/Net/AuNetSocketChannelOutput.cpp +++ b/Source/IO/Net/AuNetSocketChannelOutput.cpp @@ -126,10 +126,12 @@ namespace Aurora::IO::Net auto pBase = (AuUInt8 *)this->pOutSendPointer_; auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase; + // do not lock! auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() }; this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen; this->outputWriteQueue_.Push(mem); + AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u); } else @@ -138,10 +140,12 @@ namespace Aurora::IO::Net auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr; auto uLen = pWriteHead - pBase; + // do not lock! auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() }; this->pOutSendPointer_ = pWriteHead; this->outputWriteQueue_.Push(mem); + AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u); } } } @@ -149,9 +153,10 @@ namespace Aurora::IO::Net void SocketChannelOutput::WriteTick() { bool bShouldShutdown {}; + { AU_LOCK_GUARD(this->lock_); - bShutdownOnComplete = WriteTickLocked(); + bShouldShutdown = WriteTickLocked(); } if (bShouldShutdown) @@ -187,19 +192,16 @@ namespace Aurora::IO::Net } } - if (this->outputWriteQueue_.IsEmpty()) + if (this->CanResize() && + this->pParent_) { - if (this->pParent_)// && - ///this->outputBuffer_.outputChannel.AsWritableByteBuffer()->GetNextLinearRead().length == 0) - { - this->pParent_->socketChannel_.DoReallocWriteTick(); - } + this->pParent_->socketChannel_.DoReallocWriteTick(); } // do not forcefully flush preemptive hello packets until the socket has properly connected if (!this->pParent_->bHasConnected_) { - return true; + return false; } if (auto pFrameToSend = this->outputWriteQueue_.Dequeue()) @@ -222,6 +224,8 @@ namespace Aurora::IO::Net this->pParent_->SendErrorBeginShutdown({}); return false; } + + AuAtomicAdd(&this->uCompleteCounter_, 1u); } } else @@ -256,11 +260,14 @@ namespace Aurora::IO::Net bool SocketChannelOutput::CanResize() { - return this->outputWriteQueue_.IsEmpty(); + return this->outputWriteQueue_.IsEmpty() && AuAtomicLoad(&this->uCompleteCounter_) == 0; } void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) { + AuAtomicSub(&this->outputBuffer_.uInUseCounter, 1u); + AuAtomicSub(&this->uCompleteCounter_, 1u); + if (!this->pParent_) { return; diff --git a/Source/IO/Net/AuNetSocketChannelOutput.hpp b/Source/IO/Net/AuNetSocketChannelOutput.hpp index f684478f..d6f88d33 100644 --- a/Source/IO/Net/AuNetSocketChannelOutput.hpp +++ b/Source/IO/Net/AuNetSocketChannelOutput.hpp @@ -45,6 +45,7 @@ namespace Aurora::IO::Net void * pOutSendPointer_ {}; AuByteBuffer outputBuffer_; NetWriteQueue outputWriteQueue_; + AuUInt32 uCompleteCounter_ {}; AuThreadPrimitives::SpinLock lock_; }; } \ No newline at end of file diff --git a/Source/IO/Net/AuNetSrvSockets.cpp b/Source/IO/Net/AuNetSrvSockets.cpp index 9aabb64c..40e92944 100644 --- a/Source/IO/Net/AuNetSrvSockets.cpp +++ b/Source/IO/Net/AuNetSrvSockets.cpp @@ -143,6 +143,7 @@ namespace Aurora::IO::Net AuSPtr NetSrvSockets::NewServer(const NetSocketBind &netBind) { auto uMaxSockets = netBind.uMaxConnections ? netBind.uMaxConnections : 512; + auto uMaxDatagramSize = netBind.uDefaultInputStreamSize ? netBind.uDefaultInputStreamSize : kDefaultStreamSize; auto pWorker = this->pParent_->TryScheduleEx(); if (!pWorker) @@ -162,7 +163,7 @@ namespace Aurora::IO::Net AuSToMS(60), netBind.bMultiThreaded, false, - kDefaultStreamSize); + uMaxDatagramSize); if (!pSocket) { SysPushErrorNet("No Memory"); @@ -235,7 +236,8 @@ namespace Aurora::IO::Net AuSPtr NetSrvSockets::NewServerEx(const NetSocketBindEx &netBindEx) { - auto uMaxSockets = netBindEx.uMaxConnections ? netBindEx.uMaxConnections : 512; + auto uMaxSockets = netBindEx.uMaxConnections ? netBindEx.uMaxConnections : 512; + auto uMaxDatagramSize = netBindEx.uDefaultInputStreamSize ? netBindEx.uDefaultInputStreamSize : kDefaultStreamSize; auto pWorker = this->pParent_->TryScheduleEx(); if (!pWorker) @@ -255,7 +257,7 @@ namespace Aurora::IO::Net AuSToMS(netBindEx.uUDPTimeoutMs), netBindEx.bMultiThreaded, false, - kDefaultStreamSize); + uMaxDatagramSize); if (!pSocket) { SysPushErrorNet("No Memory"); diff --git a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.cpp b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.cpp index debaa3a9..28060219 100644 --- a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.cpp +++ b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.cpp @@ -21,6 +21,15 @@ static const auto kDefaultBufferSize = 64 * 1024; namespace Aurora::IO::Net { + static bool ForceResize(AuByteBuffer &buffer, AuUInt uLength) + { + auto old = buffer.flagNoRealloc; + buffer.flagNoRealloc = false; + bool bRet = buffer.Resize(uLength); + buffer.flagNoRealloc = old; + return bRet; + } + NetDatagramSocketServerChannel::NetDatagramSocketServerChannel(NetDatagramSocketServerSession *pParent) : pParent_(pParent), outputBuffer(kDefaultBufferSize) @@ -81,8 +90,10 @@ namespace Aurora::IO::Net } } - void NetDatagramSocketServerChannel::OnComplete() + void NetDatagramSocketServerChannel::OnComplete(AuUInt32 uNBytes) { + AuAtomicSub(&this->outputBuffer.uInUseCounter, 1u); + if (AuAtomicSub(&this->uFence_, 1u)) { if (this->bRateLimitPerTick) @@ -109,21 +120,36 @@ namespace Aurora::IO::Net { if (this->outputBuffer.Resize(this->uNextOutputLengthRealloc_)) { - if (this->pCallbackOptional_) + try { - this->pCallbackOptional_->OnSuccess((void *)nullptr); + if (this->pCallbackOptional_) + { + this->pCallbackOptional_->OnSuccess((void *)nullptr); + } + } + catch (...) + { + SysPushErrorCatch(); } } else { - if (this->pCallbackOptional_) + try { - this->pCallbackOptional_->OnFailure((void *)nullptr); + if (this->pCallbackOptional_) + { + this->pCallbackOptional_->OnFailure((void *)nullptr); + } + } + catch (...) + { + SysPushErrorCatch(); } } AuResetMember(this->pCallbackOptional_); AuResetMember(this->uNextOutputLengthRealloc_); + AuResetMember(this->uNextInputLengthRealloc_); } if (this->bRateLimitPerTick) @@ -149,13 +175,9 @@ namespace Aurora::IO::Net auto pWriteTransaction = this->ToWriteTransaction(); - struct DatagramMemory : AuMemoryViewRead - { - AuSPtr pHold; - }; - if (auto uDelta = uOffset - uStartOffset) { + // do not lock! auto mem = AuMemoryViewRead { AuMemoryViewRead { this->outputBuffer.readPtr, uDelta }, this->pParent_->SharedFromThis() }; this->sendStats_.AddBytes(uDelta); @@ -171,12 +193,13 @@ namespace Aurora::IO::Net pWriteTransaction->SetCallback(AuMakeSharedThrow([=](AuUInt64 uOffset, AuUInt32 uLength) { - this->OnComplete(); + this->OnComplete(uLength); })); if (pWriteTransaction->StartWrite(0, mem)) { AuAtomicAdd(&this->uFence_, 1u); + AuAtomicAdd(&this->outputBuffer.uInUseCounter, 1u); } else { @@ -229,12 +252,77 @@ namespace Aurora::IO::Net bool NetDatagramSocketServerChannel::SpecifyBufferSize(AuUInt uBytes, const AuSPtr> &pCallbackOptional) { - return {}; + if (!this->pParent_->bIsReady) + { + this->uBytesOutputBuffer = uBytes; + this->uBytesInputBuffer = uBytes; + + if (pCallbackOptional) + { + try + { + pCallbackOptional->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } + } + return true; + } + + auto pWorker = this->pParent_->pWorker; + if (!pWorker) + { + return false; + } + + if (pWorker->IsOnThread()) + { + this->DoBufferResizeOnThread(true, + true, + uBytes, + pCallbackOptional); + return true; + } + + auto pThat = AuSPtr(this->pParent_->SharedFromThis(), this); + + if (!pWorker->TryScheduleInternalTemplate([=](const AuSPtr> &info) + { + pThat->DoBufferResizeOnThread(true, + true, + uBytes, + pCallbackOptional); + }, AuSPtr>{})) + { + return false; + } + + return true; } bool NetDatagramSocketServerChannel::SpecifyOutputBufferSize(AuUInt uBytes, const AuSPtr> &pCallbackOptional) { + if (!this->pParent_->bIsReady) + { + this->uBytesOutputBuffer = uBytes; + + if (pCallbackOptional) + { + try + { + pCallbackOptional->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } + } + return true; + } + auto pWorker = this->pParent_->pWorker; if (!pWorker) { @@ -255,9 +343,9 @@ namespace Aurora::IO::Net if (!pWorker->TryScheduleInternalTemplate([=](const AuSPtr> &info) { pThat->DoBufferResizeOnThread(false, - true, - uBytes, - pCallbackOptional); + true, + uBytes, + pCallbackOptional); }, AuSPtr>{})) { return false; @@ -269,7 +357,53 @@ namespace Aurora::IO::Net bool NetDatagramSocketServerChannel::SpecifyInputBufferSize(AuUInt uBytes, const AuSPtr> &pCallbackOptional) { - return {}; + if (!this->pParent_->bIsReady) + { + this->uBytesInputBuffer = uBytes; + + if (pCallbackOptional) + { + try + { + pCallbackOptional->OnSuccess((void *)nullptr); + } + catch (...) + { + SysPushErrorCatch(); + } + } + return true; + } + + auto pWorker = this->pParent_->pWorker; + if (!pWorker) + { + return false; + } + + if (pWorker->IsOnThread()) + { + this->DoBufferResizeOnThread(true, + false, + uBytes, + pCallbackOptional); + return true; + } + + auto pThat = AuSPtr(this->pParent_->SharedFromThis(), this); + + if (!pWorker->TryScheduleInternalTemplate([=](const AuSPtr> &info) + { + pThat->DoBufferResizeOnThread(true, + false, + uBytes, + pCallbackOptional); + }, AuSPtr>{})) + { + return false; + } + + return true; } bool NetDatagramSocketServerChannel::SpecifyManualWrite(bool bEnableDirectAIOWrite) @@ -292,6 +426,16 @@ namespace Aurora::IO::Net return true /*technically not nodelay. if users care to ask for nagle, no, you're sending frames. */; } + void NetDatagramSocketServerChannel::SpecifyPageLength(AuUInt uPageSize) + { + + } + + AuUInt NetDatagramSocketServerChannel::SpecifyPageLength() + { + return this->pParent_->ToParent()->uDefaultPacketSize; + } + AuUInt NetDatagramSocketServerChannel::GetInputBufferSize() { if (this->pBuffer) @@ -452,7 +596,7 @@ namespace Aurora::IO::Net } - void NetDatagramSocketServerChannel::ExpandRecv(const AuSPtr &pBuffer, + void NetDatagramSocketServerChannel::ExpandRecv(const AuSPtr &pBuffer, bool bOwn) { this->recvStats_.AddBytes(pBuffer->RemainingBytes()); @@ -470,7 +614,10 @@ namespace Aurora::IO::Net if (!this->bOwnsBuffer) { AuSPtr pNewBuffer; - if (!((pNewBuffer = AuMakeShared(*pBuffer)) && (pNewBuffer->IsValid()))) + + pNewBuffer = AuMakeShared(*pBuffer); + + if (!pNewBuffer || !pNewBuffer->IsValid()) { SysPushErrorMemory(); this->pParent_->OnError(AuNet::ENetworkError::eResourceConstraint); @@ -492,6 +639,82 @@ namespace Aurora::IO::Net this->pBuffer = pBuffer; this->bOwnsBuffer = bOwn; } + + this->DoReallocRecvTick(); + } + + void NetDatagramSocketServerChannel::DoReallocRecvTick() + { + if (this->uNextInputLengthRealloc_) + { + bool bFailed {}; + + if (this->bOwnsBuffer && this->pBuffer) + { + if (!ForceResize(*this->pBuffer, this->uNextInputLengthRealloc_)) + { + bFailed = true; + } + } + else + { + AuSPtr pNewBuffer; + + if (this->pBuffer) + { + pNewBuffer = AuMakeShared(*this->pBuffer); + + if (pNewBuffer && pNewBuffer->IsValid() && this->uNextInputLengthRealloc_ > pNewBuffer->size()) + { + pNewBuffer->flagNoRealloc = false; + pNewBuffer->flagWriteError = pNewBuffer->Resize(this->uNextInputLengthRealloc_); + } + else + { + AuResetMember(pNewBuffer); + } + } + else + { + pNewBuffer = AuMakeShared(this->uNextInputLengthRealloc_); + } + + if (!pNewBuffer || !pNewBuffer->IsValid()) + { + bFailed = true; + } + else + { + pNewBuffer->flagCircular = true; + + this->pBuffer = pNewBuffer; + this->bOwnsBuffer = true; + } + } + + try + { + if (this->pCallbackOptional_) + { + if (!bFailed) + { + this->pCallbackOptional_->OnSuccess((void *)nullptr); + } + else + { + this->pCallbackOptional_->OnFailure((void *)nullptr); + } + } + } + catch (...) + { + SysPushErrorCatch(); + } + + AuResetMember(this->uNextInputLengthRealloc_); + AuResetMember(this->uNextOutputLengthRealloc_); + AuResetMember(this->pCallbackOptional_); + } } void NetDatagramSocketServerChannel::DoSendTick() @@ -527,7 +750,7 @@ namespace Aurora::IO::Net if (this->pBuffer) { if ((this->pBuffer->readPtr == this->pBuffer->writePtr) || - ((!this->pBuffer->flagCircular) && + ((!this->pBuffer->flagCircular) && // ok? (this->pBuffer->readPtr == this->pBuffer->base + this->pBuffer->length))) { this->pBuffer.reset();; @@ -542,25 +765,111 @@ namespace Aurora::IO::Net { bool bFail {}; - if (this->uFence_) + if (bOutput && (AuAtomicLoad(&this->uFence_) || this->heads_.size())) { - if (this->pCallbackOptional_) + try { - this->pCallbackOptional_->OnFailure((void *)nullptr); + if (this->pCallbackOptional_) + { + this->pCallbackOptional_->OnFailure((void *)nullptr); + } + } + catch (...) + { + SysPushErrorCatch(); } - this->uNextOutputLengthRealloc_ = uBytes; + if (bOutput) + { + this->uNextOutputLengthRealloc_ = uBytes; + } + if (bInput) + { + this->uNextInputLengthRealloc_ = uBytes; + } this->pCallbackOptional_ = pCallbackOptional; return; } if (bInput) { - bFail = true; + if (this->bOwnsBuffer && + this->pBuffer) + { + // consumer read views should lock this into failing + // internally, we bypass read/writes views. + if (!ForceResize(*this->pBuffer, uBytes)) + { + bFail = true; + } + } + else if (!this->bOwnsBuffer) + { + AuSPtr pNewBuffer; + + if (this->pBuffer) + { + if (this->uNextInputLengthRealloc_ == this->pBuffer->size()) + { + // NOP + } + else + { + bFail = true; + + pNewBuffer = AuMakeShared(*this->pBuffer); + + if (pNewBuffer && pNewBuffer->IsValid()) + { + if (this->uNextInputLengthRealloc_ > pNewBuffer->size()) + { + pNewBuffer->flagNoRealloc = false; + if (pNewBuffer->Resize(this->uNextInputLengthRealloc_)) + { + pNewBuffer->flagCircular = true; + + this->pBuffer = pNewBuffer; + this->bOwnsBuffer = true; + + bFail = false; + } + } + } + } + } + else + { + pNewBuffer = AuMakeShared(this->uNextInputLengthRealloc_); + + if (pNewBuffer && pNewBuffer->IsValid()) + { + pNewBuffer->flagCircular = true; + + this->pBuffer = pNewBuffer; + this->bOwnsBuffer = true; + } + else + { + bFail = true; + } + } + } + else + { + bFail = true; + } } else if (bOutput) { - if (!this->outputBuffer.Resize(uBytes)) + if (this->heads_.size()) + { + bFail = true; + } + else if (AuAtomicLoad(&this->uFence_)) + { + bFail = true; + } + else if (!this->outputBuffer.Resize(uBytes)) { bFail = true; } @@ -568,13 +877,20 @@ namespace Aurora::IO::Net if (pCallbackOptional) { - if (!bFail) + try { - pCallbackOptional->OnSuccess((void *)nullptr); + if (!bFail) + { + pCallbackOptional->OnSuccess((void *)nullptr); + } + else + { + pCallbackOptional->OnFailure((void *)nullptr); + } } - else + catch (...) { - pCallbackOptional->OnFailure((void *)nullptr); + SysPushErrorCatch(); } } } diff --git a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.hpp b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.hpp index b60e1292..64c9255b 100644 --- a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.hpp +++ b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerChannel.hpp @@ -76,6 +76,9 @@ namespace Aurora::IO::Net AuSPtr GetChannelLimits() override; + void SpecifyPageLength(AuUInt uPageSize) override; + AuUInt SpecifyPageLength() override; + SocketStats &GetSendStatsEx(); SocketStats &GetRecvStatsEx(); @@ -87,6 +90,7 @@ namespace Aurora::IO::Net const AuSPtr> &pCallbackOptional); void ExpandRecv(const AuSPtr &pBuffer, bool bOwn); + void DoReallocRecvTick(); void DoSendTick(); void UpdateReadPointers(); @@ -96,7 +100,7 @@ namespace Aurora::IO::Net void SchedWriteTick(); void WriteTick(); - void OnComplete(); + void OnComplete(AuUInt32 uNBytes); AuSPtr pBuffer; @@ -106,6 +110,7 @@ namespace Aurora::IO::Net AuUInt uBytesToFlip { 0 }; AuUInt uBytesInputBuffer { 0 }; AuUInt uBytesOutputBuffer { 0 }; + AuUInt uSpecifyPageLength { 0 }; AuSPtr pRecvProtocol; AuSPtr pSendProtocol; @@ -118,7 +123,9 @@ namespace Aurora::IO::Net AuList heads_; AuThreadPrimitives::Mutex mutex_; AuUInt32 uFence_ {}; + AuUInt uBytesCompleted_ {}; AuUInt32 uNextOutputLengthRealloc_ {}; + AuUInt32 uNextInputLengthRealloc_ {}; AuSPtr> pCallbackOptional_; bool bRateLimitPerTick { false }; }; diff --git a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerSession.cpp b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerSession.cpp index 31ab735f..80964766 100644 --- a/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerSession.cpp +++ b/Source/IO/Net/SocketOverDatagram/AuNetDatagramSocketServerSession.cpp @@ -47,14 +47,6 @@ namespace Aurora::IO::Net this->pDriver = pDriverFactory->NewSocketDriver(); - if (!this->channel.outputBuffer.Resize(this->ToParent()->uDefaultPacketSize)) - { - return; - } - - //this->channel.outputBuffer.flagCircular = true; - this->channel.outputBuffer.flagExpandable = false; - if (this->pDriver) { try @@ -67,6 +59,77 @@ namespace Aurora::IO::Net this->bIsReady = true; + // strictly uBytesInputBuffer bytebuffer ring bytes + if (auto uBytes = this->channel.uBytesInputBuffer) + { + // [+] Realtime + // [-] Slower read + + auto pBytebuffer = AuMakeShared(uBytes); + if (pBytebuffer) + { + // TODO: not even TCP uses circular buffers + #if 1 + // (datagram channel) DoEndReadTick will not break me with this flag + // > keep alive + // > do not fail to segmented buffer space + pBytebuffer->flagCircular = true; + #endif + pBytebuffer->flagNoRealloc = true; + this->channel.outputBuffer.flagExpandable = false; + } + + if (!pBytebuffer || !(*pBytebuffer)) + { + this->Destroy(); + return; + } + + this->channel.pBuffer = pBytebuffer; + this->channel.bOwnsBuffer = true; + } + else + { + // [+] Faster read + // [+] Per frame optimized + // [-] Dynamic memory allocation + + // [old behaviour] dynamically scale if no channel resize call was made + // pass the buck onto the user to prevent abuse + } + + { + if (auto uBytes = this->channel.uBytesOutputBuffer) + { + if (!this->channel.outputBuffer.Resize(uBytes)) + { + this->Destroy(); + return; + } + } + else + { + if (!this->channel.outputBuffer.Resize(this->ToParent()->uDefaultPacketSize)) + { + this->Destroy(); + return; + } + } + + // ring buffers dont scale so well to arbitrary send order acknowledgement + // oh well. + // we would need a third lock head to prevent overwrite into pending submit regions. + // the main issue is for linear timelines we can just += -= pointer read/write heads. + // when those guarantees are chucked away, in this example, we cannot increment the read head until all of the requests have completed. + // if another write tick comes along and we're not done? guess what? we're resending from frame[0] again + // if an async write acknowledgement comes in out of order? guess what? now we've probably seeked to the wrong read head location + // it's just easier to not use ring buffers here, and reset the heads to the base pointer when starved of pending async io. + this->channel.outputBuffer.flagExpandable = false; + this->channel.outputBuffer.flagNoRealloc = true; + + // On tcp implementation parity, we would need to flip these two flagCircular states to be inline with AuNetSocketChannelOutput.cpp and AuNetSocketChannelInput.cpp + } + this->channel.InsertFrameFence(); this->pDriver->OnEstablish(); @@ -230,7 +293,7 @@ namespace Aurora::IO::Net { this->bIsDead = true; - if (pWorker->IsOnThread()) + if (this->pWorker->IsOnThread()) { if (auto pDriver = this->pDriver) { @@ -247,7 +310,7 @@ namespace Aurora::IO::Net return; } - pWorker->TryScheduleInternalTemplate([pThat = this->SharedFromThis()](const AuSPtr> &info) + this->pWorker->TryScheduleInternalTemplate([pThat = this->SharedFromThis()](const AuSPtr> &info) { pThat->Destroy(); }, AuStaticCast>(AuMakeShared>([=](const AuSPtr &dumb) diff --git a/Source/IO/Protocol/AuProtocolStack.cpp b/Source/IO/Protocol/AuProtocolStack.cpp index e09a5a0a..b0591d92 100644 --- a/Source/IO/Protocol/AuProtocolStack.cpp +++ b/Source/IO/Protocol/AuProtocolStack.cpp @@ -132,6 +132,11 @@ namespace Aurora::IO::Protocol // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); + if (!pNew->pOuputWriter) + { + SysPushErrorNet("Out of memory"); + return {}; + } struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { @@ -297,6 +302,11 @@ namespace Aurora::IO::Protocol // Circular ref pNew->pOuputWriter = AuMakeShared(AuSPtr(pNew, &pNew->outputBuffer)); + if (!pNew->pOuputWriter) + { + SysPushErrorNet("Out of memory"); + return {}; + } struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis { @@ -756,22 +766,11 @@ namespace Aurora::IO::Protocol if (target >= pCurrent->outputBuffer.allocSize || ((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize))) { - AuByteBuffer replacement(target, true, false); - if (!replacement) + if (pCurrent->outputBuffer.Resize(target)) { - this->Terminate(); - return false; + pCurrent->outputBuffer.flagAlwaysExpandable = + pCurrent->outputBuffer.flagExpandable = 1; } - - if (!replacement.WriteFrom(pCurrent->outputBuffer)) - { - this->Terminate(); - return false; - } - - pCurrent->outputBuffer = AuMove(replacement); - pCurrent->outputBuffer.flagAlwaysExpandable = - pCurrent->outputBuffer.flagExpandable = 1; } } }