[*] QOL / Hardening / Optimizations / Bug fixes

(but not really. this is just some patchwork)
This commit is contained in:
Reece Wilson 2024-10-02 00:24:34 +01:00
parent ebec613f66
commit bac7b8d098
27 changed files with 729 additions and 132 deletions

View File

@ -15,8 +15,8 @@ namespace Aurora::IO::Net
struct NetDatagramBind
{
IPAddress ip;
AuUInt16 uPort;
AuUInt32 uDefaultInputStreamSize;
AuUInt16 uPort {};
AuUInt32 uDefaultInputStreamSize {};
AuSPtr<IDatagramDriver> pDriver;
};

View File

@ -208,6 +208,42 @@ namespace Aurora::IO::Net
*/
virtual AuUInt GetOutputBufferSize() = 0;
/**
* Recommended: 0, consume everything within the constraints of GetInputBufferSize/SpecifyInputBufferSize.
* use SingleFrame stack pieces to recursively tick a stack of protocol handlers.
* use DynamicBuffer protocol stack pieces to dynamically scale the output size.
*
* When not zero, this is the packet length per io processor frame.
* The io read is capped to segment of this size (uPageSize out of GetInputBufferSize/SpecifyInputBufferSize).
* In theory it is possible to boost max connection throughput (fair bandwidth between higher socket count) at the cost of lower bandwidth, increased tickrate, and higher cpu usage.
*
* Let say you can afford to buffer 10 frames.
* Should an aggressively written application dequeue of all of that at once, you would need to:
* >allocate x10 the amount of memory in a single io tick,
* >need to worry about how much cpu work time a single request or batch of requests take,
* >peak memory usage requirements for decompression and encryption handlers (you need x10 the peak memory usage).
* uPageSize can retard connections down into keeping their pending to-read memory elsewhere in the network stack.
*
* if you know the MTU is 32k, SpecifyPageLength(32k) for 1:1 op/tick.
* if you know the target is one tcp frame tick, SpecifyPageLength(64k) for 1:1 op/tick.
* udp may need splitting up across 576byte frames.
* you may want to bulk read multiple frames from kernel allocated network packets, over hammering tickrate.
* on the otherhand, you may want to limit how many segments you dequeue in a single io processor tick and ::read/ReadFromEx/ReadFileEx/io_submit([pread])(uNBytes) to a multiplier of your max expected packet size.
*/
virtual void SpecifyPageLength(AuUInt uPageSize) = 0;
/**
* @brief
* @return
*/
virtual AuUInt SpecifyPageLength() = 0;
//
// BREAK! Ending preestablishment bias
// BREAK! Ending preestablishment bias
//
/**
* @brief
* @return

View File

@ -88,6 +88,7 @@ namespace Aurora::Memory
AuUInt8 flagNoRealloc : 1; /// Prevents a subset of free options, specifically realloc, operations
AuUInt8 flagAlwaysExpandable : 1; /// Internal flag. Do not use.
AuUInt8 flagReserveA : 1; /// Placeholder
AuUInt8 flagReservedB;
///////////////////////////////////////////////////////////////////////
// Special flags/values
@ -465,18 +466,7 @@ namespace Aurora::Memory
{
AuSPtr<void> memory;
inline SharedByteBuffer(AuSPtr<MemoryViewWrite> pWriteView) : ByteBuffer()
{
this->allocSize = 0;
this->base = (AuUInt8 *)pWriteView->ptr;
this->length = pWriteView->length;
this->readPtr = this->base;
this->writePtr = this->base + this->length;
this->flagNoFree = true;
this->memory = pWriteView;
}
inline SharedByteBuffer(AuSPtr<void> pRAIIParentOwner, MemoryViewWrite view) :
inline SharedByteBuffer(const MemoryViewWrite &view, AuSPtr<void> pRAIIParentOwner) :
ByteBuffer(),
__view(view)
{
@ -484,11 +474,50 @@ namespace Aurora::Memory
this->base = (AuUInt8 *)view.ptr;
this->length = view.length;
this->readPtr = this->base;
this->writePtr = this->base + this->length;
this->writePtr = this->base;
this->flagNoFree = true;
this->memory = pRAIIParentOwner;
}
inline SharedByteBuffer(const MemoryViewWrite &view) :
ByteBuffer(),
__view(view)
{
this->allocSize = 0;
this->base = (AuUInt8 *)view.ptr;
this->length = view.length;
this->readPtr = this->base;
this->writePtr = this->base;
this->flagNoFree = true;
}
inline SharedByteBuffer(const MemoryViewRead &view, AuSPtr<void> pRAIIParentOwner) :
ByteBuffer(),
__view(*(MemoryViewWrite *)&view)
{
this->allocSize = 0;
this->base = (AuUInt8 *)view.ptr;
this->length = view.length;
this->readPtr = this->base;
this->writePtr = this->base + this->length;
this->flagNoFree = true;
this->flagWriteError = true;
this->memory = pRAIIParentOwner;
}
inline SharedByteBuffer(const MemoryViewRead &view) :
ByteBuffer(),
__view(*(MemoryViewWrite *)&view)
{
this->allocSize = 0;
this->base = (AuUInt8 *)view.ptr;
this->length = view.length;
this->readPtr = this->base;
this->writePtr = this->base + this->length;
this->flagNoFree = true;
this->flagWriteError = true;
}
private:
MemoryViewWrite __view;
};

View File

@ -274,7 +274,7 @@ namespace Aurora::Memory
return true;
}
auto newLength = AuMax(length, AuPageRoundUp(this->allocSize + (this->allocSize / 3), AuUInt(64)));
auto newLength = AuMax(length, AuPageRoundUp(this->allocSize + (this->allocSize / 3), AuUInt(128)));
AuUInt8 *pNext {};
if (this->alignment)

View File

@ -197,7 +197,7 @@ namespace Aurora::Memory
ByteBuffer::ByteBuffer(ByteBuffer &&buffer) :
flagCircular {}, flagExpandable {}, flagReadError {}, flagWriteError {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA{}, flagReservedB {}
{
this->base = buffer.base;
this->length = buffer.length;
@ -210,6 +210,7 @@ namespace Aurora::Memory
this->flagReadError = buffer.flagReadError;
this->flagWriteError = buffer.flagWriteError;
this->alignment = buffer.alignment;
this->flagReservedB = buffer.flagReservedB;
buffer.base = {};
buffer.length = {};
buffer.allocSize = {};
@ -223,7 +224,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(const ByteBuffer &buffer, bool preservePointers) :
flagCircular {}, flagExpandable { }, flagReadError {}, flagWriteError {},
flagCircular {}, flagExpandable { }, flagReadError {}, flagWriteError {}, flagReservedB {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}
{
if (buffer.length)
@ -262,7 +263,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(const void *in, AuUInt length, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
this->scaleSize = kBufferInitialPower;
@ -285,7 +286,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(const MemoryViewRead &readView, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
this->scaleSize = kBufferInitialPower;
@ -303,7 +304,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(const AuList<AuUInt8> &vector, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
this->scaleSize = kBufferInitialPower;
@ -321,7 +322,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(const MemoryViewRead &readView, AuUInt uAlignment, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
this->scaleSize = kBufferInitialPower;
@ -344,7 +345,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(AuUInt length, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
this->scaleSize = kBufferInitialPower;
@ -366,7 +367,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer(AuUInt length, AuUInt alignment, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
if (!length)
@ -390,7 +391,7 @@ namespace Aurora::Memory
template<typename T>
ByteBuffer::ByteBuffer(T *base, T *end, bool circular, bool expandable) :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0)
{
if (!base)
@ -419,7 +420,7 @@ namespace Aurora::Memory
}
ByteBuffer::ByteBuffer() :
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {},
flagNoFree {}, flagNoRealloc {}, flagAlwaysExpandable {}, flagReserveA {}, flagReservedB {},
flagCircular(0), flagExpandable(true), flagReadError(0), flagWriteError(0)
{
this->base = {};
@ -444,6 +445,7 @@ namespace Aurora::Memory
this->flagNoRealloc = other.flagNoRealloc;
this->flagNoFree = other.flagNoFree;
this->scaleSize = other.scaleSize;
this->flagReservedB = other.flagReservedB;
other.base = {};
other.length = {};
other.allocSize = {};
@ -483,6 +485,7 @@ namespace Aurora::Memory
this->flagCircular = buffer.flagCircular;
this->flagExpandable = buffer.flagExpandable;
this->flagAlwaysExpandable = buffer.flagAlwaysExpandable;
this->flagReservedB = buffer.flagReservedB;
return *this;
}

View File

@ -277,7 +277,12 @@ namespace Aurora
struct IOConfig
{
AuUInt32 uProtocolStackDefaultBufferSize { 64 * 1024 };
#if defined(AURORA_IS_SERVER)
bool bIsVeryLargeIOApplication { true };
#else
bool bIsVeryLargeIOApplication { false };
#endif
// On Win32, NewLSTimer has very bad resolution.
// On linux, timerfd is good enough.

View File

@ -9,7 +9,51 @@
namespace Aurora::Compression
{
static const AuUInt64 kChunkSize = 128 * 1024;
#if defined(AURORA_IS_SERVER) || defined(AURORA_16_KIB_IN_OUT_BUFFERS_32K)
// servers can have a multiple of these per client, if streams are not to be recycled.
// XMPP, for example, does not reset. zlib and others expect genuine compression streams.
// HTTP, libdeflate, and others expect flush framesm and then for the stateless stream to be recreated each IO tick.
// let's assume we are a server;
// most of our IO is done under protocol stacks that'll handle almost all of the buffering,
// changing this value by 2-8 will apply to the law of large numbers on a server,
// changing this value by an order of magnitude is a cost of 10x memory per client (tho you can afford the memory these days),
// the underlying compressor object may not even be accessing these temp[kChunkSize] optimizations - some can directly read from AuByteBuffers,
// some compression libraries are internally alloc heavy, which delgates the usage area to malloc/free callbacks...
// ...these, like zstd, demand almost zero buffered data. a frame fragment is almost nothing.
// therefore, it makes sense to keep these values down as much as possible
static const AuUInt64 kChunkSize = 16 * 1024;
#elif defined(AURORA_IS_CLIENT)
// for builds explicitly earmarked to go out to end users, 2x the usual buffer size
static const AuUInt64 kChunkSize = 256 * 1024;
#else
// and general purpose.
static const AuUInt64 kChunkSize = 128 * 1024;
// also note: zlib docs state up to 256k is efficient, but you can start at around 16k
// https://www.zlib.net/zlib_how.html Last modified 8 February 2023. approx 30 Oct 2004 - 11 Dec 2005.
// general infrastructure and requirements havent changed in 2 decades.
// in 2001 and 2021 you can afford to buy a server with a few tens of gigabytes of ram
// in 2001 and 2021 you can afford to buy a gigabit switch
// in 2001 and 2021 you're still running on deflate algorithms
// in 2004 and 2024 you're still struggling to parallelize compression without N x tar balls on N x cores (N ~= 2-6 would be reasonable)
// in 2012-2014 you could get yourself an ivybridge or a sandbridge, in 2024 you could get yourself an AMD page aligned atomic acker or Oxide Inside 14th gen.
// with ~4th gen being, in theory, multiple single digits slower than 14th gen. and, yes, comparing 4th gen to 9th gen compilation times will result in 4-8x boosts,
// on a very fundamental level, we havent moved that much in terms of single threaded performance.
// that also applies to compression. it's stil the same old slow task calculating bit patterns, looking through big precomputed (or not) datasets, and making in-order & non-parallelizable changes
// With hardware not progressing **that** much, the fundamental algorithms staying the exact same, and the same libraries being used, call me a bit insane to think maybe an article from 2005 isnt that far off from reality.
// 16k*128k (clients) ~= 2gb (x2 for local in/out? [= 4GiB] x4 for additional in/out stream pairing? [= 8GiB]) of ram for a heavyweight server (plus many other user implementation overheads)
// 16k per stream / 256MB on an embedded tablet or games console
// 16k per stream / 256 - 2048MB per app / 1024 - 4096 per home appliance mid 2000s (laptop, pc, etc)
// 16k per stream / 256 - 2048MB per app / 4096 - 32768 per home appliance mid 2020s (laptop, pc, etc)
// 64k (2^16) IPv4 UDP ports, including system region and others
// 2x IPv6 address = 128k
// AAA software can see peaks of hundreds of thousands to millions per clients per platform, so x2, x10, x20, x100 this (more responsive stateful, proprietary, xmpp, etc protocols)
// WebShid "technology" works on stateless packets, so your memory requirements are basically just this in the network stream buffers, and with the rest being temporary allocations (more expensive & wasteful for stream-like connections).
// Seems fair...? seems like the world we should be still living in...? maybe we can 10x or 100x it no matter the era for increased throughput...? would zlib even care...?
#endif
// sometimes on stack
static const AuUInt64 kChunkSize2 = 128 * 1024;
/// * compression type + bits -> internal zlib windowBits
static bool CompressionLevelFromExternalApi(const DecompressInfo &info, AuInt8 &out)

View File

@ -256,8 +256,8 @@ namespace Aurora::Compression
{
int ret, flush;
z_stream strm {};
unsigned char in[kChunkSize];
unsigned char out[kChunkSize];
unsigned char in[kChunkSize2];
unsigned char out[kChunkSize2];
if (!stream.pWritePipe)
{
@ -348,8 +348,8 @@ namespace Aurora::Compression
{
int ret;
z_stream strm {};
unsigned char in[kChunkSize];
unsigned char out[kChunkSize];
unsigned char in[kChunkSize2];
unsigned char out[kChunkSize2];
if (!stream.pWritePipe)
{
@ -437,8 +437,8 @@ namespace Aurora::Compression
#if defined(_AUHAS_BZIP2)
int ret, flush;
bz_stream strm {};
char in[kChunkSize];
char out[kChunkSize];
char in[kChunkSize2];
char out[kChunkSize2];
if (!stream.pWritePipe)
{
@ -462,7 +462,7 @@ namespace Aurora::Compression
do
{
AuIO::EStreamError error;
AuUInt read{ kChunkSize };
AuUInt read{ kChunkSize2 };
if (((error = stream.pReadPipe->Read(AuMemory::MemoryViewStreamWrite(in, read))) != AuIO::EStreamError::eErrorNone) &&
(error != AuIO::EStreamError::eErrorEndOfStream))
{
@ -528,8 +528,8 @@ namespace Aurora::Compression
#if defined(_AUHAS_BZIP2)
int ret;
bz_stream strm {};
char in[kChunkSize];
char out[kChunkSize];
char in[kChunkSize2];
char out[kChunkSize2];
if (!stream.pWritePipe)
{
@ -832,7 +832,7 @@ namespace Aurora::Compression
}
DecompressInfo meta3 = meta2;
meta3.uInternalStreamSize = kChunkSize * 10; // stupid, but required for BaseStream::Write to not blow up. not optimizing old apis.
meta3.uInternalStreamSize = kChunkSize2 * 10; // stupid, but required for BaseStream::Write to not blow up. not optimizing old apis.
auto pCompressor = DecompressorUnique(stream.pReadPipe, meta3);
if (!pCompressor)
{
@ -840,13 +840,13 @@ namespace Aurora::Compression
return false;
}
AuByteBuffer buffer(kChunkSize * 2, true);
AuByteBuffer buffer(kChunkSize2 * 2, true);
AuStreamReadWrittenPair_t pair;
do
{
auto view = buffer.GetNextLinearRead();
view.length = AuMin<AuUInt>(view.length, kChunkSize / 2);
view.length = AuMin<AuUInt>(view.length, kChunkSize2 / 2);
auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), true);
while (pair.first || pair.second)
@ -914,7 +914,7 @@ namespace Aurora::Compression
}
CompressInfo meta3 = meta2;
meta3.uInternalStreamSize = kChunkSize / 2;
meta3.uInternalStreamSize = kChunkSize2 / 2;
auto pCompressor = CompressorUnique(stream.pReadPipe, meta3);
if (!pCompressor)
{
@ -922,13 +922,13 @@ namespace Aurora::Compression
return false;
}
AuByteBuffer buffer(kChunkSize / 2, true);
AuByteBuffer buffer(kChunkSize2 / 2, true);
AuStreamReadWrittenPair_t pair;
do
{
auto view = buffer.GetNextLinearRead();
view.length = AuMin(view.length, kChunkSize / 2);
view.length = AuMin(view.length, kChunkSize2 / 2);
auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), true);
while (pair.first || pair.second)
@ -968,7 +968,7 @@ namespace Aurora::Compression
}
auto view = buffer.GetNextLinearRead();
view.length = AuMin<AuUInt>(view.length, kChunkSize / 2);
view.length = AuMin<AuUInt>(view.length, kChunkSize2 / 2);
auto pair = pCompressor->ReadEx(AuMemory::MemoryViewWrite(buffer), false);
while (pair.first || pair.second)

View File

@ -12,14 +12,16 @@
namespace Aurora::Crypto::ECC
{
PrivateCurve25519Impl::PrivateCurve25519Impl(bool isX25519, curve25519_key &&key) : bIsX25519_(isX25519), key_(key)
PrivateCurve25519Impl::PrivateCurve25519Impl(bool isX25519, curve25519_key &&key) :
bIsX25519_(isX25519),
key_(key)
{
}
PrivateCurve25519Impl::~PrivateCurve25519Impl()
{
AuMemset(&this->key_, 'N', AuSizeOf(this->key_));
}
bool PrivateCurve25519Impl::Sign(const Memory::MemoryViewRead &plainText,

View File

@ -18,6 +18,7 @@ namespace Aurora::Crypto::ECC
PublicCurve25519Impl::~PublicCurve25519Impl()
{
AuMemset(&this->key_, 'N', AuSizeOf(this->key_));
}
bool PublicCurve25519Impl::Verify(const Memory::MemoryViewRead &hash,

View File

@ -17,7 +17,9 @@
namespace Aurora::Crypto::ECC
{
PrivateECCImpl::PrivateECCImpl(EECCCurve type, ecc_key &key) : _key(key), _type(type)
PrivateECCImpl::PrivateECCImpl(EECCCurve type, ecc_key &key) :
_key(key),
_type(type)
{
}
@ -25,6 +27,7 @@ namespace Aurora::Crypto::ECC
PrivateECCImpl::~PrivateECCImpl()
{
ecc_free(&this->_key);
AuMemset(&this->_key, 'N', AuSizeOf(this->_key));
}
EECCCurve PrivateECCImpl::GetType()

View File

@ -23,7 +23,8 @@ namespace Aurora::Crypto::ECC
PublicECCImpl::~PublicECCImpl()
{
ecc_free(&_key);
ecc_free(&this->_key);
AuMemset(&this->_key, 'N', AuSizeOf(this->_key));
}
EECCCurve PublicECCImpl::GetType()

View File

@ -13,7 +13,8 @@
namespace Aurora::Crypto::RSA
{
PrivateRSA::PrivateRSA(rsa_key &key) : key_(key)
PrivateRSA::PrivateRSA(const rsa_key &key) :
key_(key)
{
}
@ -21,6 +22,7 @@ namespace Aurora::Crypto::RSA
PrivateRSA::~PrivateRSA()
{
::rsa_free(&this->key_);
AuMemset(&this->key_, 'N', AuSizeOf(this->key_));
}
bool PrivateRSA::Sign(const Memory::MemoryViewRead & payload,

View File

@ -11,7 +11,7 @@ namespace Aurora::Crypto::RSA
{
struct PrivateRSA : IRSAPrivate
{
PrivateRSA(rsa_key &key);
PrivateRSA(const rsa_key &key);
~PrivateRSA();
bool Sign(const Memory::MemoryViewRead & payload,

View File

@ -12,7 +12,8 @@
namespace Aurora::Crypto::RSA
{
PublicRSA::PublicRSA(rsa_key &key) : key_(key)
PublicRSA::PublicRSA(const rsa_key &key) :
key_(key)
{
}
@ -20,6 +21,7 @@ namespace Aurora::Crypto::RSA
PublicRSA::~PublicRSA()
{
::rsa_free(&this->key_);
AuMemset(&this->key_, 'N', AuSizeOf(this->key_));
}
bool PublicRSA::Verify(const AuMemoryViewRead &payload,

View File

@ -12,7 +12,7 @@ namespace Aurora::Crypto::RSA
{
struct PublicRSA : IRSAPublic
{
PublicRSA(rsa_key &key);
PublicRSA(const rsa_key &key);
~PublicRSA();
bool Verify(const Memory::MemoryViewRead & payload,
@ -33,6 +33,5 @@ namespace Aurora::Crypto::RSA
private:
rsa_key key_;
bool bOwned_;
};
}

View File

@ -338,7 +338,7 @@ namespace Aurora::IO::FS
}
}
AUKN_SYM AuUInt64 GetDeviceSizeInBytes(const AuString &physicalDevicePath)
AUKN_SYM AuUInt64 GetDeviceSizeInBytes(const AuROString &physicalDevicePath)
{
DISK_GEOMETRY_EX geo {};
DWORD cbBytesReturned;

View File

@ -271,6 +271,16 @@ namespace Aurora::IO::Net
return pProtocol;
}
void SocketChannel::SpecifyPageLength(AuUInt uPageSize)
{
this->uBytesPerFrame = uPageSize;
}
AuUInt SocketChannel::SpecifyPageLength()
{
return this->uBytesPerFrame;
}
void SocketChannel::SpecifyRecvProtocol(const AuSPtr<Protocol::IProtocolStack> &pRecvProtocol)
{
this->pRecvProtocol = pRecvProtocol;
@ -629,7 +639,14 @@ namespace Aurora::IO::Net
if (pCallbackOptional)
{
pCallbackOptional->OnSuccess((void *)nullptr);
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
return;
@ -651,10 +668,14 @@ namespace Aurora::IO::Net
if (bHasCurrentSuccess)
{
if (pCallbackOptional)
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
return;
}
@ -668,7 +689,14 @@ namespace Aurora::IO::Net
{
if (auto pOutput = AuExchange(this->pRetargetOutput, {}))
{
pOutput->OnFailure((void *)nullptr);
try
{
pOutput->OnFailure((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
if (this->pRetargetInput == pOutput)
{
AuResetMember(this->pRetargetInput);
@ -686,7 +714,14 @@ namespace Aurora::IO::Net
{
if (auto pInput = AuExchange(this->pRetargetInput, {}))
{
pInput->OnFailure((void *)nullptr);
try
{
pInput->OnFailure((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
if (this->pRetargetOutput == pInput)
{
AuResetMember(this->pRetargetOutput);
@ -715,7 +750,14 @@ namespace Aurora::IO::Net
if (auto pOutput = AuExchange(this->pRetargetOutput, {}))
{
pOutput->OnFailure((void *)nullptr);
try
{
pOutput->OnFailure((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
if (this->pRetargetInput == pOutput)
{
AuResetMember(this->pRetargetInput);
@ -730,7 +772,14 @@ namespace Aurora::IO::Net
{
if (this->pRetargetInput != pOutput)
{
pOutput->OnSuccess((void *)nullptr);
try
{
pOutput->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
}
@ -750,7 +799,14 @@ namespace Aurora::IO::Net
if (auto pInput = AuExchange(this->pRetargetInput, {}))
{
pInput->OnFailure((void *)nullptr);
try
{
pInput->OnFailure((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
if (this->pRetargetOutput == pInput)
{
@ -767,7 +823,14 @@ namespace Aurora::IO::Net
{
if (this->pRetargetOutput != pInput)
{
pInput->OnSuccess((void *)nullptr);
try
{
pInput->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
}

View File

@ -69,6 +69,8 @@ namespace Aurora::IO::Net
AuUInt GetNextFrameTargetLength() override;
void SpecifyPageLength(AuUInt uPageSize) override;
AuUInt SpecifyPageLength() override;
void SpecifyRecvProtocol(const AuSPtr<Protocol::IProtocolStack> &pRecvProtocol) override;
@ -106,6 +108,7 @@ namespace Aurora::IO::Net
AuUInt uBytesToFlip { 0 };
AuUInt uBytesInputBuffer { 0 };
AuUInt uBytesOutputBuffer { 0 };
AuUInt uBytesPerFrame { 0 };
AuUInt uBytesInputBufferRetarget { 0 };
AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> pRetargetInput;

View File

@ -86,6 +86,15 @@ namespace Aurora::IO::Net
req.bIsStream = true;
req.pListener = sharedThis;
req.uBufferLengthOrZero = AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->uBytesInputBuffer;
if (auto uFrame = AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->uBytesPerFrame)
{
req.uPageLengthOrZero = uFrame;
req.bReadEntireAllocation = false;
}
else
{
req.bReadEntireAllocation = true;
}
this->pNetReader = this->pParent_->ToWorker()->ToProcessor()->ToPipeProcessor()->NewAIOPipe(req);
}

View File

@ -126,10 +126,12 @@ namespace Aurora::IO::Net
auto pBase = (AuUInt8 *)this->pOutSendPointer_;
auto uLen = (AuUInt8 *)this->outputBuffer_.base + this->outputBuffer_.length - pBase;
// do not lock!
auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() };
this->pOutSendPointer_ = (AuUInt8 *)this->pOutSendPointer_ + uLen;
this->outputWriteQueue_.Push(mem);
AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u);
}
else
@ -138,10 +140,12 @@ namespace Aurora::IO::Net
auto pWriteHead = (AuUInt8 *)this->outputBuffer_.writePtr;
auto uLen = pWriteHead - pBase;
// do not lock!
auto mem = AuMemoryViewRead { AuMemoryViewRead { pBase, (AuUInt)uLen }, this->pParent_->SharedFromThis() };
this->pOutSendPointer_ = pWriteHead;
this->outputWriteQueue_.Push(mem);
AuAtomicAdd(&this->outputBuffer_.uInUseCounter, 1u);
}
}
}
@ -149,9 +153,10 @@ namespace Aurora::IO::Net
void SocketChannelOutput::WriteTick()
{
bool bShouldShutdown {};
{
AU_LOCK_GUARD(this->lock_);
bShutdownOnComplete = WriteTickLocked();
bShouldShutdown = WriteTickLocked();
}
if (bShouldShutdown)
@ -187,19 +192,16 @@ namespace Aurora::IO::Net
}
}
if (this->outputWriteQueue_.IsEmpty())
if (this->CanResize() &&
this->pParent_)
{
if (this->pParent_)// &&
///this->outputBuffer_.outputChannel.AsWritableByteBuffer()->GetNextLinearRead().length == 0)
{
this->pParent_->socketChannel_.DoReallocWriteTick();
}
this->pParent_->socketChannel_.DoReallocWriteTick();
}
// do not forcefully flush preemptive hello packets until the socket has properly connected
if (!this->pParent_->bHasConnected_)
{
return true;
return false;
}
if (auto pFrameToSend = this->outputWriteQueue_.Dequeue())
@ -222,6 +224,8 @@ namespace Aurora::IO::Net
this->pParent_->SendErrorBeginShutdown({});
return false;
}
AuAtomicAdd(&this->uCompleteCounter_, 1u);
}
}
else
@ -256,11 +260,14 @@ namespace Aurora::IO::Net
bool SocketChannelOutput::CanResize()
{
return this->outputWriteQueue_.IsEmpty();
return this->outputWriteQueue_.IsEmpty() && AuAtomicLoad(&this->uCompleteCounter_) == 0;
}
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
{
AuAtomicSub(&this->outputBuffer_.uInUseCounter, 1u);
AuAtomicSub(&this->uCompleteCounter_, 1u);
if (!this->pParent_)
{
return;

View File

@ -45,6 +45,7 @@ namespace Aurora::IO::Net
void * pOutSendPointer_ {};
AuByteBuffer outputBuffer_;
NetWriteQueue outputWriteQueue_;
AuUInt32 uCompleteCounter_ {};
AuThreadPrimitives::SpinLock lock_;
};
}

View File

@ -143,6 +143,7 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServer> NetSrvSockets::NewServer(const NetSocketBind &netBind)
{
auto uMaxSockets = netBind.uMaxConnections ? netBind.uMaxConnections : 512;
auto uMaxDatagramSize = netBind.uDefaultInputStreamSize ? netBind.uDefaultInputStreamSize : kDefaultStreamSize;
auto pWorker = this->pParent_->TryScheduleEx();
if (!pWorker)
@ -162,7 +163,7 @@ namespace Aurora::IO::Net
AuSToMS<AuUInt32>(60),
netBind.bMultiThreaded,
false,
kDefaultStreamSize);
uMaxDatagramSize);
if (!pSocket)
{
SysPushErrorNet("No Memory");
@ -235,7 +236,8 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServer> NetSrvSockets::NewServerEx(const NetSocketBindEx &netBindEx)
{
auto uMaxSockets = netBindEx.uMaxConnections ? netBindEx.uMaxConnections : 512;
auto uMaxSockets = netBindEx.uMaxConnections ? netBindEx.uMaxConnections : 512;
auto uMaxDatagramSize = netBindEx.uDefaultInputStreamSize ? netBindEx.uDefaultInputStreamSize : kDefaultStreamSize;
auto pWorker = this->pParent_->TryScheduleEx();
if (!pWorker)
@ -255,7 +257,7 @@ namespace Aurora::IO::Net
AuSToMS<AuUInt32>(netBindEx.uUDPTimeoutMs),
netBindEx.bMultiThreaded,
false,
kDefaultStreamSize);
uMaxDatagramSize);
if (!pSocket)
{
SysPushErrorNet("No Memory");

View File

@ -21,6 +21,15 @@ static const auto kDefaultBufferSize = 64 * 1024;
namespace Aurora::IO::Net
{
static bool ForceResize(AuByteBuffer &buffer, AuUInt uLength)
{
auto old = buffer.flagNoRealloc;
buffer.flagNoRealloc = false;
bool bRet = buffer.Resize(uLength);
buffer.flagNoRealloc = old;
return bRet;
}
NetDatagramSocketServerChannel::NetDatagramSocketServerChannel(NetDatagramSocketServerSession *pParent) :
pParent_(pParent),
outputBuffer(kDefaultBufferSize)
@ -81,8 +90,10 @@ namespace Aurora::IO::Net
}
}
void NetDatagramSocketServerChannel::OnComplete()
void NetDatagramSocketServerChannel::OnComplete(AuUInt32 uNBytes)
{
AuAtomicSub(&this->outputBuffer.uInUseCounter, 1u);
if (AuAtomicSub(&this->uFence_, 1u))
{
if (this->bRateLimitPerTick)
@ -109,21 +120,36 @@ namespace Aurora::IO::Net
{
if (this->outputBuffer.Resize(this->uNextOutputLengthRealloc_))
{
if (this->pCallbackOptional_)
try
{
this->pCallbackOptional_->OnSuccess((void *)nullptr);
if (this->pCallbackOptional_)
{
this->pCallbackOptional_->OnSuccess((void *)nullptr);
}
}
catch (...)
{
SysPushErrorCatch();
}
}
else
{
if (this->pCallbackOptional_)
try
{
this->pCallbackOptional_->OnFailure((void *)nullptr);
if (this->pCallbackOptional_)
{
this->pCallbackOptional_->OnFailure((void *)nullptr);
}
}
catch (...)
{
SysPushErrorCatch();
}
}
AuResetMember(this->pCallbackOptional_);
AuResetMember(this->uNextOutputLengthRealloc_);
AuResetMember(this->uNextInputLengthRealloc_);
}
if (this->bRateLimitPerTick)
@ -149,13 +175,9 @@ namespace Aurora::IO::Net
auto pWriteTransaction = this->ToWriteTransaction();
struct DatagramMemory : AuMemoryViewRead
{
AuSPtr<void> pHold;
};
if (auto uDelta = uOffset - uStartOffset)
{
// do not lock!
auto mem = AuMemoryViewRead { AuMemoryViewRead { this->outputBuffer.readPtr, uDelta }, this->pParent_->SharedFromThis() };
this->sendStats_.AddBytes(uDelta);
@ -171,12 +193,13 @@ namespace Aurora::IO::Net
pWriteTransaction->SetCallback(AuMakeSharedThrow<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 uOffset, AuUInt32 uLength)
{
this->OnComplete();
this->OnComplete(uLength);
}));
if (pWriteTransaction->StartWrite(0, mem))
{
AuAtomicAdd(&this->uFence_, 1u);
AuAtomicAdd(&this->outputBuffer.uInUseCounter, 1u);
}
else
{
@ -229,12 +252,77 @@ namespace Aurora::IO::Net
bool NetDatagramSocketServerChannel::SpecifyBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
return {};
if (!this->pParent_->bIsReady)
{
this->uBytesOutputBuffer = uBytes;
this->uBytesInputBuffer = uBytes;
if (pCallbackOptional)
{
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
auto pWorker = this->pParent_->pWorker;
if (!pWorker)
{
return false;
}
if (pWorker->IsOnThread())
{
this->DoBufferResizeOnThread(true,
true,
uBytes,
pCallbackOptional);
return true;
}
auto pThat = AuSPtr<NetDatagramSocketServerChannel>(this->pParent_->SharedFromThis(), this);
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->DoBufferResizeOnThread(true,
true,
uBytes,
pCallbackOptional);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return false;
}
return true;
}
bool NetDatagramSocketServerChannel::SpecifyOutputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
if (!this->pParent_->bIsReady)
{
this->uBytesOutputBuffer = uBytes;
if (pCallbackOptional)
{
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
auto pWorker = this->pParent_->pWorker;
if (!pWorker)
{
@ -255,9 +343,9 @@ namespace Aurora::IO::Net
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->DoBufferResizeOnThread(false,
true,
uBytes,
pCallbackOptional);
true,
uBytes,
pCallbackOptional);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return false;
@ -269,7 +357,53 @@ namespace Aurora::IO::Net
bool NetDatagramSocketServerChannel::SpecifyInputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
return {};
if (!this->pParent_->bIsReady)
{
this->uBytesInputBuffer = uBytes;
if (pCallbackOptional)
{
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
catch (...)
{
SysPushErrorCatch();
}
}
return true;
}
auto pWorker = this->pParent_->pWorker;
if (!pWorker)
{
return false;
}
if (pWorker->IsOnThread())
{
this->DoBufferResizeOnThread(true,
false,
uBytes,
pCallbackOptional);
return true;
}
auto pThat = AuSPtr<NetDatagramSocketServerChannel>(this->pParent_->SharedFromThis(), this);
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->DoBufferResizeOnThread(true,
false,
uBytes,
pCallbackOptional);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return false;
}
return true;
}
bool NetDatagramSocketServerChannel::SpecifyManualWrite(bool bEnableDirectAIOWrite)
@ -292,6 +426,16 @@ namespace Aurora::IO::Net
return true /*technically not nodelay. if users care to ask for nagle, no, you're sending frames. */;
}
void NetDatagramSocketServerChannel::SpecifyPageLength(AuUInt uPageSize)
{
}
AuUInt NetDatagramSocketServerChannel::SpecifyPageLength()
{
return this->pParent_->ToParent()->uDefaultPacketSize;
}
AuUInt NetDatagramSocketServerChannel::GetInputBufferSize()
{
if (this->pBuffer)
@ -470,7 +614,10 @@ namespace Aurora::IO::Net
if (!this->bOwnsBuffer)
{
AuSPtr<AuByteBuffer> pNewBuffer;
if (!((pNewBuffer = AuMakeShared<AuByteBuffer>(*pBuffer)) && (pNewBuffer->IsValid())))
pNewBuffer = AuMakeShared<AuByteBuffer>(*pBuffer);
if (!pNewBuffer || !pNewBuffer->IsValid())
{
SysPushErrorMemory();
this->pParent_->OnError(AuNet::ENetworkError::eResourceConstraint);
@ -492,6 +639,82 @@ namespace Aurora::IO::Net
this->pBuffer = pBuffer;
this->bOwnsBuffer = bOwn;
}
this->DoReallocRecvTick();
}
void NetDatagramSocketServerChannel::DoReallocRecvTick()
{
if (this->uNextInputLengthRealloc_)
{
bool bFailed {};
if (this->bOwnsBuffer && this->pBuffer)
{
if (!ForceResize(*this->pBuffer, this->uNextInputLengthRealloc_))
{
bFailed = true;
}
}
else
{
AuSPtr<AuByteBuffer> pNewBuffer;
if (this->pBuffer)
{
pNewBuffer = AuMakeShared<AuByteBuffer>(*this->pBuffer);
if (pNewBuffer && pNewBuffer->IsValid() && this->uNextInputLengthRealloc_ > pNewBuffer->size())
{
pNewBuffer->flagNoRealloc = false;
pNewBuffer->flagWriteError = pNewBuffer->Resize(this->uNextInputLengthRealloc_);
}
else
{
AuResetMember(pNewBuffer);
}
}
else
{
pNewBuffer = AuMakeShared<AuByteBuffer>(this->uNextInputLengthRealloc_);
}
if (!pNewBuffer || !pNewBuffer->IsValid())
{
bFailed = true;
}
else
{
pNewBuffer->flagCircular = true;
this->pBuffer = pNewBuffer;
this->bOwnsBuffer = true;
}
}
try
{
if (this->pCallbackOptional_)
{
if (!bFailed)
{
this->pCallbackOptional_->OnSuccess((void *)nullptr);
}
else
{
this->pCallbackOptional_->OnFailure((void *)nullptr);
}
}
}
catch (...)
{
SysPushErrorCatch();
}
AuResetMember(this->uNextInputLengthRealloc_);
AuResetMember(this->uNextOutputLengthRealloc_);
AuResetMember(this->pCallbackOptional_);
}
}
void NetDatagramSocketServerChannel::DoSendTick()
@ -527,7 +750,7 @@ namespace Aurora::IO::Net
if (this->pBuffer)
{
if ((this->pBuffer->readPtr == this->pBuffer->writePtr) ||
((!this->pBuffer->flagCircular) &&
((!this->pBuffer->flagCircular) && // ok?
(this->pBuffer->readPtr == this->pBuffer->base + this->pBuffer->length)))
{
this->pBuffer.reset();;
@ -542,25 +765,111 @@ namespace Aurora::IO::Net
{
bool bFail {};
if (this->uFence_)
if (bOutput && (AuAtomicLoad(&this->uFence_) || this->heads_.size()))
{
if (this->pCallbackOptional_)
try
{
this->pCallbackOptional_->OnFailure((void *)nullptr);
if (this->pCallbackOptional_)
{
this->pCallbackOptional_->OnFailure((void *)nullptr);
}
}
catch (...)
{
SysPushErrorCatch();
}
this->uNextOutputLengthRealloc_ = uBytes;
if (bOutput)
{
this->uNextOutputLengthRealloc_ = uBytes;
}
if (bInput)
{
this->uNextInputLengthRealloc_ = uBytes;
}
this->pCallbackOptional_ = pCallbackOptional;
return;
}
if (bInput)
{
bFail = true;
if (this->bOwnsBuffer &&
this->pBuffer)
{
// consumer read views should lock this into failing
// internally, we bypass read/writes views.
if (!ForceResize(*this->pBuffer, uBytes))
{
bFail = true;
}
}
else if (!this->bOwnsBuffer)
{
AuSPtr<AuByteBuffer> pNewBuffer;
if (this->pBuffer)
{
if (this->uNextInputLengthRealloc_ == this->pBuffer->size())
{
// NOP
}
else
{
bFail = true;
pNewBuffer = AuMakeShared<AuByteBuffer>(*this->pBuffer);
if (pNewBuffer && pNewBuffer->IsValid())
{
if (this->uNextInputLengthRealloc_ > pNewBuffer->size())
{
pNewBuffer->flagNoRealloc = false;
if (pNewBuffer->Resize(this->uNextInputLengthRealloc_))
{
pNewBuffer->flagCircular = true;
this->pBuffer = pNewBuffer;
this->bOwnsBuffer = true;
bFail = false;
}
}
}
}
}
else
{
pNewBuffer = AuMakeShared<AuByteBuffer>(this->uNextInputLengthRealloc_);
if (pNewBuffer && pNewBuffer->IsValid())
{
pNewBuffer->flagCircular = true;
this->pBuffer = pNewBuffer;
this->bOwnsBuffer = true;
}
else
{
bFail = true;
}
}
}
else
{
bFail = true;
}
}
else if (bOutput)
{
if (!this->outputBuffer.Resize(uBytes))
if (this->heads_.size())
{
bFail = true;
}
else if (AuAtomicLoad(&this->uFence_))
{
bFail = true;
}
else if (!this->outputBuffer.Resize(uBytes))
{
bFail = true;
}
@ -568,13 +877,20 @@ namespace Aurora::IO::Net
if (pCallbackOptional)
{
if (!bFail)
try
{
pCallbackOptional->OnSuccess((void *)nullptr);
if (!bFail)
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
else
{
pCallbackOptional->OnFailure((void *)nullptr);
}
}
else
catch (...)
{
pCallbackOptional->OnFailure((void *)nullptr);
SysPushErrorCatch();
}
}
}

View File

@ -76,6 +76,9 @@ namespace Aurora::IO::Net
AuSPtr<ISocketChannelLimits> GetChannelLimits() override;
void SpecifyPageLength(AuUInt uPageSize) override;
AuUInt SpecifyPageLength() override;
SocketStats &GetSendStatsEx();
SocketStats &GetRecvStatsEx();
@ -87,6 +90,7 @@ namespace Aurora::IO::Net
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional);
void ExpandRecv(const AuSPtr<AuByteBuffer> &pBuffer, bool bOwn);
void DoReallocRecvTick();
void DoSendTick();
void UpdateReadPointers();
@ -96,7 +100,7 @@ namespace Aurora::IO::Net
void SchedWriteTick();
void WriteTick();
void OnComplete();
void OnComplete(AuUInt32 uNBytes);
AuSPtr<AuByteBuffer> pBuffer;
@ -106,6 +110,7 @@ namespace Aurora::IO::Net
AuUInt uBytesToFlip { 0 };
AuUInt uBytesInputBuffer { 0 };
AuUInt uBytesOutputBuffer { 0 };
AuUInt uSpecifyPageLength { 0 };
AuSPtr<Protocol::IProtocolStack> pRecvProtocol;
AuSPtr<Protocol::IProtocolStack> pSendProtocol;
@ -118,7 +123,9 @@ namespace Aurora::IO::Net
AuList<AuUInt> heads_;
AuThreadPrimitives::Mutex mutex_;
AuUInt32 uFence_ {};
AuUInt uBytesCompleted_ {};
AuUInt32 uNextOutputLengthRealloc_ {};
AuUInt32 uNextInputLengthRealloc_ {};
AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> pCallbackOptional_;
bool bRateLimitPerTick { false };
};

View File

@ -47,14 +47,6 @@ namespace Aurora::IO::Net
this->pDriver = pDriverFactory->NewSocketDriver();
if (!this->channel.outputBuffer.Resize(this->ToParent()->uDefaultPacketSize))
{
return;
}
//this->channel.outputBuffer.flagCircular = true;
this->channel.outputBuffer.flagExpandable = false;
if (this->pDriver)
{
try
@ -67,6 +59,77 @@ namespace Aurora::IO::Net
this->bIsReady = true;
// strictly uBytesInputBuffer bytebuffer ring bytes
if (auto uBytes = this->channel.uBytesInputBuffer)
{
// [+] Realtime
// [-] Slower read
auto pBytebuffer = AuMakeShared<AuMemory::ByteBuffer>(uBytes);
if (pBytebuffer)
{
// TODO: not even TCP uses circular buffers
#if 1
// (datagram channel) DoEndReadTick will not break me with this flag
// > keep alive
// > do not fail to segmented buffer space
pBytebuffer->flagCircular = true;
#endif
pBytebuffer->flagNoRealloc = true;
this->channel.outputBuffer.flagExpandable = false;
}
if (!pBytebuffer || !(*pBytebuffer))
{
this->Destroy();
return;
}
this->channel.pBuffer = pBytebuffer;
this->channel.bOwnsBuffer = true;
}
else
{
// [+] Faster read
// [+] Per frame optimized
// [-] Dynamic memory allocation
// [old behaviour] dynamically scale if no channel resize call was made
// pass the buck onto the user to prevent abuse
}
{
if (auto uBytes = this->channel.uBytesOutputBuffer)
{
if (!this->channel.outputBuffer.Resize(uBytes))
{
this->Destroy();
return;
}
}
else
{
if (!this->channel.outputBuffer.Resize(this->ToParent()->uDefaultPacketSize))
{
this->Destroy();
return;
}
}
// ring buffers dont scale so well to arbitrary send order acknowledgement
// oh well.
// we would need a third lock head to prevent overwrite into pending submit regions.
// the main issue is for linear timelines we can just += -= pointer read/write heads.
// when those guarantees are chucked away, in this example, we cannot increment the read head until all of the requests have completed.
// if another write tick comes along and we're not done? guess what? we're resending from frame[0] again
// if an async write acknowledgement comes in out of order? guess what? now we've probably seeked to the wrong read head location
// it's just easier to not use ring buffers here, and reset the heads to the base pointer when starved of pending async io.
this->channel.outputBuffer.flagExpandable = false;
this->channel.outputBuffer.flagNoRealloc = true;
// On tcp implementation parity, we would need to flip these two flagCircular states to be inline with AuNetSocketChannelOutput.cpp and AuNetSocketChannelInput.cpp
}
this->channel.InsertFrameFence();
this->pDriver->OnEstablish();
@ -230,7 +293,7 @@ namespace Aurora::IO::Net
{
this->bIsDead = true;
if (pWorker->IsOnThread())
if (this->pWorker->IsOnThread())
{
if (auto pDriver = this->pDriver)
{
@ -247,7 +310,7 @@ namespace Aurora::IO::Net
return;
}
pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis()](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
this->pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis()](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->Destroy();
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)

View File

@ -132,6 +132,11 @@ namespace Aurora::IO::Protocol
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
if (!pNew->pOuputWriter)
{
SysPushErrorNet("Out of memory");
return {};
}
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
@ -297,6 +302,11 @@ namespace Aurora::IO::Protocol
// Circular ref
pNew->pOuputWriter = AuMakeShared<AuIO::Buffered::BlobWriter>(AuSPtr<Memory::ByteBuffer>(pNew, &pNew->outputBuffer));
if (!pNew->pOuputWriter)
{
SysPushErrorNet("Out of memory");
return {};
}
struct StreamWrapper : IStreamWriter, IProtocolNext, AuEnableSharedFromThis<StreamWrapper>
{
@ -756,22 +766,11 @@ namespace Aurora::IO::Protocol
if (target >= pCurrent->outputBuffer.allocSize ||
((pCurrent->outputBuffer.length > target) && (bytesRem < pCurrent->uStartingSize)))
{
AuByteBuffer replacement(target, true, false);
if (!replacement)
if (pCurrent->outputBuffer.Resize(target))
{
this->Terminate();
return false;
pCurrent->outputBuffer.flagAlwaysExpandable =
pCurrent->outputBuffer.flagExpandable = 1;
}
if (!replacement.WriteFrom(pCurrent->outputBuffer))
{
this->Terminate();
return false;
}
pCurrent->outputBuffer = AuMove(replacement);
pCurrent->outputBuffer.flagAlwaysExpandable =
pCurrent->outputBuffer.flagExpandable = 1;
}
}
}