[+] UDP over socket API via existing INetSrvDatagram layer

(...missing send)
[+] AuIO::Buffer::ViewReader
[+] AuIO::Buffer::ViewSeekableReadable
[+] AuIO::Buffer::ViewWriter
[*] Clean up AuCompression
[*[ AuLog messages must always crunch for memory
[*] Various bug fixes
[*] Refactor+clean up
This commit is contained in:
Reece Wilson 2022-12-12 23:50:05 +00:00
parent 6dfa806e3a
commit 267c2216b0
66 changed files with 2441 additions and 356 deletions

View File

@ -12,7 +12,7 @@ namespace Aurora::Compression
/**
Compresses an in memory blob with zstandard
*/
AUKN_SYM bool Compress(const Memory::MemoryViewRead &source, Memory::ByteBuffer &out, int uCompressionLevel = 3);
AUKN_SYM bool Compress(const Memory::MemoryViewRead &source, Memory::ByteBuffer &out, int iCompressionLevel /* -7 to 22 */ = 7);
/**
Decompresses an in memory blob with zstandard

View File

@ -11,4 +11,8 @@
// For sake of being able to mix network, file, serial, and other code, its nice to use the same reader interface for buffered and streams where read/write until EOS is required
#include "BlobSeekableReader.hpp"
#include "BlobReader.hpp"
#include "BlobWriter.hpp"
#include "BlobWriter.hpp"
#include "ViewReader.hpp"
#include "ViewSeekableReadable.hpp"
#include "ViewWriter.hpp"

View File

@ -0,0 +1,80 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ViewReader.hpp
Date: 2022-12-11
Author: Reece
***/
#pragma once
namespace Aurora::IO::Buffered
{
struct ViewReader : IStreamReader
{
AU_NO_COPY(ViewReader)
inline ViewReader(const AuSPtr<Memory::MemoryViewRead> &view) : pView_(view)
{ }
inline ViewReader()
{ }
inline ~ViewReader()
{ }
inline virtual EStreamError IsOpen() override
{
if (!this->pView_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->pView_.get()))
{
return EStreamError::eErrorStreamNotOpen;
}
return EStreamError::eErrorNone;
}
inline virtual EStreamError Read(const Memory::MemoryViewStreamWrite &parameters) override
{
parameters.outVariable = 0;
if (!this->pView_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->pView_.get()))
{
this->pView_.reset();
return this->uOffset_ ? EStreamError::eErrorStreamNotOpen : EStreamError::eErrorNone;
}
auto uToRead = AuMin(this->pView_->length - this->uOffset_, parameters.length);
parameters.outVariable = uToRead;
AuMemcpy(parameters.ptr, this->pView_->Begin<AuUInt8>() + this->uOffset_, uToRead);
this->uOffset_ += uToRead;
return parameters.outVariable == 0 ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
inline virtual void Close() override
{
this->pView_.reset();
}
inline AuUInt &GetOffset()
{
return this->uOffset_;
}
inline const AuUInt &GetOffset() const
{
return this->uOffset_;
}
private:
AuSPtr<Memory::MemoryViewRead> pView_;
AuUInt uOffset_ {};
};
}

View File

@ -0,0 +1,71 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ViewSeekableReadable.hpp
Date: 2022-12-11
Author: Reece
***/
#pragma once
namespace Aurora::IO::Buffered
{
struct ViewSeekableReadable : ISeekingReader
{
AU_NO_COPY(ViewSeekableReadable)
inline ViewSeekableReadable(const AuSPtr<Memory::MemoryViewRead> &view) : view_(view)
{ }
inline ViewSeekableReadable()
{ }
inline ~ViewSeekableReadable()
{ }
inline virtual EStreamError IsOpen() override
{
if (!this->view_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->view_.get()))
{
return EStreamError::eErrorStreamNotOpen;
}
return EStreamError::eErrorNone;
}
inline virtual EStreamError ArbitraryRead(AuUInt uOffset, const Memory::MemoryViewStreamWrite &parameters) override
{
if (!this->view_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->view_.get()))
{
return EStreamError::eErrorStreamNotOpen;
}
if (this->view_->length < uOffset)
{
return EStreamError::eErrorEndOfStream;
}
auto uToRead = AuMin(this->view_->length - uOffset, parameters.length);
parameters.outVariable = uToRead;
AuMemcpy(parameters.ptr, this->view_->Begin<AuUInt8>() + uOffset, uToRead);
return parameters.outVariable == 0 ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
inline virtual void Close() override
{
this->view_.reset();
}
private:
AuSPtr<Memory::MemoryViewRead> view_;
};
}

View File

@ -0,0 +1,77 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ViewWriter.hpp
Date: 2022-12-11
Author: Reece
***/
#pragma once
namespace Aurora::IO::Buffered
{
struct ViewWriter : IStreamWriter
{
AU_NO_COPY(ViewWriter)
inline ViewWriter(const AuSPtr<Memory::MemoryViewWrite> &view) : pView_(view)
{ }
inline ViewWriter()
{ }
inline ~ViewWriter()
{ }
inline virtual EStreamError IsOpen() override
{
if (!this->pView_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->pView_.get()))
{
return EStreamError::eErrorStreamNotOpen;
}
return EStreamError::eErrorNone;
}
inline virtual EStreamError Write(const Memory::MemoryViewStreamRead &parameters) override
{
if (!this->pView_)
{
return EStreamError::eErrorStreamNotOpen;
}
if (!(*this->pView_.get()))
{
return EStreamError::eErrorStreamNotOpen;
}
auto uToRead = AuMin(this->pView_->length - this->uOffset_, parameters.length);
parameters.outVariable = uToRead;
AuMemcpy(this->pView_->Begin<AuUInt8>() + this->uOffset_, parameters.ptr, uToRead);
this->uOffset_ += uToRead;
return parameters.outVariable == 0 ? EStreamError::eErrorEndOfStream : EStreamError::eErrorNone;
}
inline virtual void Close() override
{
this->pView_.reset();
}
inline AuUInt &GetOffset()
{
return this->uOffset_;
}
inline const AuUInt &GetOffset() const
{
return this->uOffset_;
}
private:
AuSPtr<Memory::MemoryViewWrite> pView_;
AuUInt uOffset_ {};
};
}

View File

@ -36,7 +36,7 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServerDriver> pDriver;
AuUInt uMaxConnections {};
AuUInt uMaxAcceptBacklog {};
bool bMultiThreadTCP {};
bool bMultiThreaded {};
};
struct NetSocketBindEx : NetSocketBind

View File

@ -13,6 +13,12 @@
#include "IIPCLogger.hpp"
#include "Sinks.hpp"
namespace Aurora::Debug
{
AUKN_SYM void AddMemoryCrunch();
AUKN_SYM void DecMemoryCrunch();
}
namespace Aurora::Logging
{
/// Writes a log message to the console subscribers and telemetry sinks
@ -31,13 +37,35 @@ namespace Aurora::Logging
template<typename Line_t, typename ... T>
inline void WriteLinef(AuUInt8 level, const AuString &tag, const Line_t &msg, T&& ... args)
{
WriteLine(level, ConsoleMessage(EAnsiColor::eReset, tag, fmt::format(msg, AuForward<T>(args)...)));
Aurora::Debug::AddMemoryCrunch();
try
{
WriteLine(level, ConsoleMessage(EAnsiColor::eReset, tag, fmt::format(msg, AuForward<T>(args)...)));
}
catch (...)
{
Aurora::Debug::DecMemoryCrunch();
throw;
return;
}
Aurora::Debug::DecMemoryCrunch();
}
template<typename Line_t, typename ... T>
inline void WriteLinef(AuUInt8 level, EAnsiColor color, const AuString &tag, const Line_t &msg, T&& ... args)
{
WriteLine(level, ConsoleMessage(color, tag, fmt::format(msg, AuForward<T>(args)...)));
Aurora::Debug::AddMemoryCrunch();
try
{
WriteLine(level, ConsoleMessage(color, tag, fmt::format(msg, AuForward<T>(args)...)));
}
catch (...)
{
Aurora::Debug::DecMemoryCrunch();
throw;
return;
}
Aurora::Debug::DecMemoryCrunch();
}
template<typename Line_t, typename ... T>

View File

@ -20,15 +20,23 @@ namespace Aurora::Memory
auto readView = buffer.GetNextLinearRead();
auto writeView = this->GetNextLinearWrite();
auto uMax = AuMin(readView.length, writeView.length);
uMax = AuMin(uTotal + uMax, uLength);
AuMemcpy(writeView.ptr, readView.ptr, uMax);
auto uReadNext = AuMin(readView.length, writeView.length);
this->writePtr += uMax;
buffer.readPtr += uMax;
uTotal += uTotal;
if (uLength <= uTotal + uReadNext)
{
uReadNext = AuMin(uReadNext, uLength - uTotal);
}
bSuccess = uMax;
if (uReadNext)
{
AuMemcpy(writeView.ptr, readView.ptr, uReadNext);
}
this->writePtr += uReadNext;
buffer.readPtr += uReadNext;
uTotal += uReadNext;
bSuccess = uReadNext;
}
while (bSuccess);

View File

@ -1,29 +1,30 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BaseStream.cpp
File: AuBaseStream.cpp
Date: 2022-2-14
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "IngestableReadBase.hpp"
#include "BaseStream.hpp"
#include "AuCompression.hpp"
#include "AuIngestableReadBase.hpp"
#include "AuBaseStream.hpp"
namespace Aurora::Compression
{
AuStreamReadWrittenPair_t BaseStream::ReadEx(const Memory::MemoryViewWrite & /*opt*/ destination, bool ingestUntilEOS)
AuStreamReadWrittenPair_t BaseStream::ReadEx(const AuMemoryViewWrite &/*optional/nullable*/destination,
bool bIngestUntilEOS)
{
AU_LOCK_GUARD(this->_spinlock);
AuUInt32 read {}, len {};
AuUInt32 dwRead {}, dwBytesWritten {};
if (!destination.length && !destination.ptr)
{
return {0, this->pOutputBuffer_->RemainingBytes()};
}
if (ingestUntilEOS)
if (bIngestUntilEOS)
{
while (this->pOutputBuffer_->RemainingBytes() < destination.length)
{
@ -39,12 +40,12 @@ namespace Aurora::Compression
break;
}
read += toRead;
dwRead += toRead;
}
}
len = this->pOutputBuffer_->Read(destination.ptr, destination.length, destination.ptr == nullptr);
return {read, len};
dwBytesWritten = this->pOutputBuffer_->Read(destination.ptr, destination.length, destination.ptr == nullptr);
return {dwRead, dwBytesWritten};
}
AuUInt32 BaseStream::GetAvailableProcessedBytes()
@ -53,7 +54,7 @@ namespace Aurora::Compression
return this->pOutputBuffer_->RemainingBytes(true);
}
AuUInt32 BaseStream::Read(const Memory::MemoryViewWrite & /*opt*/ destination)
AuUInt32 BaseStream::Read(const AuMemoryViewWrite & /*opt*/ destination)
{
AU_LOCK_GUARD(this->_spinlock);
if (!destination.length && !destination.ptr)
@ -63,32 +64,32 @@ namespace Aurora::Compression
return this->pOutputBuffer_->Read(destination.ptr, destination.length, destination.ptr == nullptr);
}
bool BaseStream::GoBackByProcessedN(AuUInt32 offset)
bool BaseStream::GoBackByProcessedN(AuUInt32 dwOffset)
{
AU_LOCK_GUARD(this->_spinlock);
return this->pOutputBuffer_->ReaderTryGoBack(offset);
return this->pOutputBuffer_->ReaderTryGoBack(dwOffset);
}
bool BaseStream::GoForwardByProcessedN(AuUInt32 offset)
bool BaseStream::GoForwardByProcessedN(AuUInt32 dwOffset)
{
AU_LOCK_GUARD(this->_spinlock);
if (!offset)
if (!dwOffset)
{
return true;
}
return this->pOutputBuffer_->ReaderTryGoForward(offset);
return this->pOutputBuffer_->ReaderTryGoForward(dwOffset);
}
AuStreamReadWrittenPair_t BaseStream::Ingest(AuUInt32 bytesFromUnprocessedInputSource)
AuStreamReadWrittenPair_t BaseStream::Ingest(AuUInt32 dwBytesFromUnprocessedInputSource)
{
AU_LOCK_GUARD(this->_spinlock);
if (!bytesFromUnprocessedInputSource)
if (!dwBytesFromUnprocessedInputSource)
{
return {};
}
return Ingest_s(bytesFromUnprocessedInputSource);
return Ingest_s(dwBytesFromUnprocessedInputSource);
}
bool BaseStream::IsValid()
@ -113,13 +114,23 @@ namespace Aurora::Compression
}
}
bool BaseStream::Write(const void *a, AuUInt32 length)
bool BaseStream::Write(const void *pDest, AuUInt32 dwLength)
{
return this->pOutputBuffer_->Write(reinterpret_cast<const AuUInt8 *>(a), length) == length;
return this->pOutputBuffer_->Write(AuReinterpretCast<const AuUInt8 *>(pDest), dwLength) == dwLength;
}
AuUInt32 BaseStream::GetInternalBufferSize()
{
return this->pOutputBuffer_->allocSize;
}
bool BaseStream::Flush()
{
return false;
}
bool BaseStream::Finish()
{
return false;
}
}

View File

@ -1,13 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BaseStream.hpp
File: AuBaseStream.hpp
Date: 2022-2-14
Author: Reece
***/
#pragma once
#include "IngestableReadBase.hpp"
#include "AuIngestableReadBase.hpp"
namespace Aurora::Compression
{
@ -24,27 +24,20 @@ namespace Aurora::Compression
virtual bool Init(const AuSPtr<IO::IStreamReader> &reader) = 0;
virtual AuUInt32 GetAvailableProcessedBytes() override;
virtual AuStreamReadWrittenPair_t ReadEx(const Memory::MemoryViewWrite & /*opt*/ destination, bool ingestUntilEOS) override;
virtual AuUInt32 Read(const Memory::MemoryViewWrite & /*opt*/ destination) override;
virtual AuStreamReadWrittenPair_t ReadEx(const AuMemoryViewWrite & /*opt*/ destination, bool bIngestUntilEOS) override;
virtual AuUInt32 Read(const AuMemoryViewWrite & /*opt*/ destination) override;
virtual bool GoBackByProcessedN(AuUInt32 offset) override;
virtual bool GoForwardByProcessedN(AuUInt32 offset) override;
virtual bool GoBackByProcessedN(AuUInt32 dwOffset) override;
virtual bool GoForwardByProcessedN(AuUInt32 dwOffset) override;
virtual AuUInt32 GetInternalBufferSize() override;
bool Write(const void *a, AuUInt32 length);
bool Write(const void *pDest, AuUInt32 dwLength);
virtual AuStreamReadWrittenPair_t Ingest(AuUInt32 bytesFromUnprocessedInputSource) override;
inline virtual bool Flush() override
{
return false;
}
inline virtual bool Finish() override
{
return false;
}
virtual bool Flush() override;
virtual bool Finish() override;
bool IsValid();
@ -52,7 +45,7 @@ namespace Aurora::Compression
void SetBuffer(const AuSPtr<Memory::ByteBuffer> &pBuffer);
protected:
virtual AuStreamReadWrittenPair_t Ingest_s(AuUInt32 bytesFromUnprocessedInputSource) = 0;
virtual AuStreamReadWrittenPair_t Ingest_s(AuUInt32 dwBytesFromUnprocessedInputSource) = 0;
AuSPtr<Memory::ByteBuffer> pOutputBuffer_;
Memory::ByteBuffer _outbufferOwned;

View File

@ -1,14 +1,14 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BlockCompressor.cpp
File: AuBlockCompressor.cpp
Date: 2021-6-17
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "BlockCompressor.hpp"
#include "BaseStream.hpp"
#include "AuCompression.hpp"
#include "AuBlockCompressor.hpp"
#include "AuBaseStream.hpp"
#if defined(_AUHAS_ZSTD)
#include "Compressors/ZSTDCompressor.hpp"

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BlockDecompressor.hpp
File: AuBlockCompressor.hpp
Date: 2021-6-17
Author: Reece
***/

View File

@ -1,14 +1,14 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BlockDecompressor.cpp
File: AuBlockDecompressor.cpp
Date: 2021-6-17
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "BlockDecompressor.hpp"
#include "BaseStream.hpp"
#include "AuCompression.hpp"
#include "AuBlockDecompressor.hpp"
#include "AuBaseStream.hpp"
#if defined(_AUHAS_ZSTD)
#include "Compressors/ZSTDDecompressor.hpp"

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: BlockCompressor.hpp
File: AuBlockDecompressor.hpp
Date: 2021-6-17
Author: Reece
***/

View File

@ -0,0 +1,88 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuCompression.cpp
Date: 2021-6-17
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "AuCompression.hpp"
#include "zstd.h"
namespace Aurora::Compression
{
AUKN_SYM bool Compress(const AuMemoryViewRead &source, AuByteBuffer &out, int iCompressionLevel /* -7 to 22 */)
{
if (!out.GetOrAllocateLinearWriteable(source.length))
{
SysPushErrorMemory();
return {};
}
auto uRet = ::ZSTD_compress(out.writePtr, source.length, source.ptr, source.length, iCompressionLevel);
if (::ZSTD_isError(uRet))
{
return false;
}
out.writePtr += uRet;
if (!out.flagCircular &&
out.flagExpandable)
{
if (!AuTryDownsize(out, out.writePtr - out.base))
{
SysPushErrorMemory();
return false;
}
}
return true;
}
AUKN_SYM bool Decompress(const AuMemoryViewRead &source, AuByteBuffer &out)
{
AuUInt32 dwRead = 0;
while (dwRead != source.length)
{
auto pBlockPointer = reinterpret_cast<const AuUInt8 *>(source.ptr) + dwRead;
auto uDeflatedLength = ZSTD_findFrameCompressedSize(pBlockPointer, source.length - dwRead);
auto uInflatedLength = ZSTD_getFrameContentSize(pBlockPointer, source.length - dwRead);
if (uDeflatedLength == ZSTD_CONTENTSIZE_ERROR)
{
return false;
}
if (uInflatedLength == ZSTD_CONTENTSIZE_UNKNOWN)
{
return false;
}
if (::ZSTD_isError(uInflatedLength))
{
return false;
}
if (!out.GetOrAllocateLinearWriteable(uInflatedLength))
{
SysPushErrorMemory();
return false;
}
auto uRet = ::ZSTD_decompress(out.writePtr, uInflatedLength, source.ptr, source.length);
if (::ZSTD_isError(uRet))
{
return false;
}
out.writePtr += uRet;
dwRead += AuUInt32(uDeflatedLength);
}
return true;
}
}

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Compression.hpp
File: AuCompression.hpp
Date: 2021-6-17
Author: Reece
***/

View File

@ -1,14 +1,14 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompressionInterceptor.cpp
File: AuCompressionInterceptor.cpp
Date: 2022-9-14
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "BaseStream.hpp"
#include "CompressionInterceptor.hpp"
#include "AuCompression.hpp"
#include "AuBaseStream.hpp"
#include "AuCompressionInterceptor.hpp"
namespace Aurora::Compression
{

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: CompressionInterceptor.hpp
File: AuCompressionInterceptor.hpp
Date: 2022-9-14
Author: Reece
***/

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IngestableReadBase.hpp
File: AuIngestableReadBase.hpp
Date: 2022-2-14
Author: Reece
***/
@ -15,10 +15,10 @@ namespace Aurora::Compression
template<typename T, AuUInt Z>
inline void SetArray(T(&array)[Z]);
inline void SetPointer(void *pointer, AuUInt32 length);
inline void SetPointer(void *pointer, AuUInt32 dwLength);
template<typename T, typename Z>
inline AuUInt32 IngestForInPointer(const AuSPtr<IO::IStreamReader> &reader, T *&in, Z &inAlreadyAvailable, AuUInt32 amount);
inline AuUInt32 IngestForInPointer(const AuSPtr<IO::IStreamReader> &reader, T *&in, Z &inAlreadyAvailable, AuUInt32 dwAmount);
private:
AuUInt8 *internalInBuffer_ {};
@ -26,4 +26,4 @@ namespace Aurora::Compression
};
}
#include "IngestableReadBase.inl"
#include "AuIngestableReadBase.inl"

View File

@ -0,0 +1,68 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuIngestableReadBase.inl
Date: 2022-2-14
Author: Reece
***/
#pragma once
namespace Aurora::Compression
{
template<typename T, AuUInt Z>
void IngestableReadBase::SetArray(T(&array)[Z])
{
this->internalInBuffer_ = AuReinterpretCast<AuUInt8 *>(array);
this->internalInLength_ = Z;
}
void IngestableReadBase::SetPointer(void *pHead, AuUInt32 dwLength)
{
this->internalInBuffer_ = AuReinterpretCast<AuUInt8 *>(pHead);
this->internalInLength_ = dwLength;
}
// Given a zlib-like interface input paremeters, a stream source, and a buffer...
//
// ...reads
template<typename T, typename Z>
AuUInt32 IngestableReadBase::IngestForInPointer(const AuSPtr<IO::IStreamReader> &reader,
T *&in,
Z &inAlreadyAvailable,
AuUInt32 dwAmount)
{
if (inAlreadyAvailable > this->internalInLength_)
{
SysPanic("Invalid Buffer Position");
}
AuUInt32 dwCurrentOffset {};
if (in && inAlreadyAvailable)
{
dwCurrentOffset = AuReinterpretCast<const AuUInt8 *>(in) - this->internalInBuffer_;
}
dwCurrentOffset += inAlreadyAvailable;
AuUInt32 dwCurrentLength = this->internalInLength_ - dwCurrentOffset;
if (dwCurrentLength > this->internalInLength_)
{
SysPanic("Invalid Buffer Position");
}
auto nextStreamRead = this->internalInBuffer_ + dwCurrentOffset;
auto nextStreamSegment = this->internalInBuffer_ + inAlreadyAvailable;
AuUInt uRead = AuMin(dwAmount, dwCurrentLength);
if (reader->Read(AuMemoryViewStreamWrite(this->internalInBuffer_ + dwCurrentOffset,
uRead)) != IO::EStreamError::eErrorNone)
{
return 0;
}
in = (T *)(this->internalInBuffer_ + dwCurrentOffset);
inAlreadyAvailable += uRead;
return uRead;
}
}

View File

@ -1,89 +0,0 @@
/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Compression.cpp
Date: 2021-6-17
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "zstd.h"
namespace Aurora::Compression
{
AUKN_SYM bool Compress(const Memory::MemoryViewRead &source, Memory::ByteBuffer &out, int uCompressionLevel)
{
if (!out.GetOrAllocateLinearWriteable(source.length))
{
SysPushErrorMemory();
return {};
}
auto ret = ZSTD_compress(out.writePtr, source.length, source.ptr, source.length, uCompressionLevel);
if (ZSTD_isError(ret))
{
return false;
}
out.writePtr += ret;
if (!out.flagCircular &&
out.flagExpandable)
{
if (!AuTryDownsize(out, out.writePtr - out.base))
{
SysPushErrorMemory();
return false;
}
}
return true;
}
AUKN_SYM bool Decompress(const Memory::MemoryViewRead &source, AuByteBuffer &out)
{
AuUInt32 read = 0;
while (read != source.length)
{
auto startPtr = reinterpret_cast<const AuUInt8 *>(source.ptr) + read;
auto deflatedLength = ZSTD_findFrameCompressedSize(startPtr, source.length - read);
auto inflatedLength = ZSTD_getFrameContentSize(startPtr, source.length - read);
if (inflatedLength == ZSTD_CONTENTSIZE_ERROR)
{
return false;
}
if (inflatedLength == ZSTD_CONTENTSIZE_UNKNOWN)
{
return false;
}
if (ZSTD_isError(inflatedLength))
{
return false;
}
auto view = out.GetOrAllocateLinearWriteable(inflatedLength);
if (!view)
{
SysPushErrorMemory();
return false;
}
auto ret = ZSTD_decompress(view.ptr, inflatedLength, source.ptr, source.length);
if (ZSTD_isError(ret))
{
return false;
}
out.writePtr = (AuUInt8 *)view.ptr + ret;
read += AuUInt32(deflatedLength);
}
return true;
}
}

View File

@ -1,64 +0,0 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IngestableReadBase.inl
Date: 2022-2-14
Author: Reece
***/
#pragma once
namespace Aurora::Compression
{
template<typename T, AuUInt Z>
void IngestableReadBase::SetArray(T(&array)[Z])
{
this->internalInBuffer_ = reinterpret_cast<AuUInt8 *>(array);
this->internalInLength_ = Z;
}
void IngestableReadBase::SetPointer(void *pointer, AuUInt32 length)
{
this->internalInBuffer_ = reinterpret_cast<AuUInt8 *>(pointer);
this->internalInLength_ = length;
}
// Given a zlib-like interface input paremeters, a stream source, and a buffer...
//
// ...reads
template<typename T, typename Z>
AuUInt32 IngestableReadBase::IngestForInPointer(const AuSPtr<IO::IStreamReader> &reader, T *&in, Z &inAlreadyAvailable, AuUInt32 amount)
{
if (inAlreadyAvailable > this->internalInLength_)
{
SysPanic("Invalid Buffer Position");
}
AuUInt32 currentOffset {};
if (in && inAlreadyAvailable)
{
currentOffset = reinterpret_cast<const AuUInt8 *>(in) - this->internalInBuffer_;
}
currentOffset += inAlreadyAvailable;
AuUInt32 currentLength = this->internalInLength_ - currentOffset;
if (currentLength > this->internalInLength_)
{
SysPanic("Invalid Buffer Position");
}
auto nextStreamRead = this->internalInBuffer_ + currentOffset;
auto nextStreamSegment = this->internalInBuffer_ + inAlreadyAvailable;
AuUInt read = AuMin(amount, currentLength);
if (reader->Read(AuMemoryViewStreamWrite(this->internalInBuffer_ + currentOffset, this->internalInBuffer_ + read, read)) != IO::EStreamError::eErrorNone)
{
return 0;
}
in = (T *)(this->internalInBuffer_ + currentOffset);
inAlreadyAvailable += read;
return read;
}
}

View File

@ -4,9 +4,11 @@
File: StreamCompression.cpp
Date: 2021-6-17
Author: Reece
@warning **supported** shit api implementation. this is [somewhat] useless for large real world stuff outside of the utility it provides
***/
#include <Source/RuntimeInternal.hpp>
#include "Compression.hpp"
#include "AuCompression.hpp"
#include "StreamCompression.hpp"
#if defined(_AUHAS_BZIP2)

View File

@ -127,6 +127,7 @@ namespace Aurora::Console
{
}
ConsoleStd::Flush();
gDefaultSinks.clear();
gDefaultLogger.reset();
Logging::DeinitLoggers();

View File

@ -204,7 +204,7 @@ namespace Aurora::Grug
void GrugFlushFlushs()
{
Logging::ForceFlushFlush();
Aurora::Console::PumpOffMain();
//Console::ForceFlush();
Console::PumpOffMain();
Console::ForceFlush();
}
}

View File

@ -12,43 +12,43 @@
namespace Aurora::IO
{
AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame)
AUKN_SYM bool WaitFor(AuUInt32 dwMilliseconds, bool bWaitEntireFrame)
{
bool bHit {};
AuUInt32 targetTime = milliseconds ? AuTime::CurrentClockNS() + AuNSToMS<AuUInt64>(milliseconds) : 0;
AuUInt32 targetTime = dwMilliseconds ? AuTime::SteadyClockNS() + AuNSToMS<AuUInt64>(dwMilliseconds) : 0;
if (!milliseconds)
if (!dwMilliseconds)
{
waitEntireFrame = false;
bWaitEntireFrame = false;
}
while (true)
{
AuUInt32 sleep;
AuUInt32 dwSleep;
if (targetTime)
{
auto now = AuTime::CurrentClockNS();
auto now = AuTime::SteadyClockNS();
if (now >= targetTime)
{
break;
}
sleep = AuNSToMS<AuUInt32>(now - targetTime);
dwSleep = AuNSToMS<AuUInt32>(now - targetTime);
}
else
{
sleep = 0;
dwSleep = 0;
}
if (!UNIX::LinuxOverlappedPoll(sleep))
if (!UNIX::LinuxOverlappedPoll(dwSleep))
{
continue;
}
bHit = true;
if (!waitEntireFrame)
if (!bWaitEntireFrame)
{
break;
}

View File

@ -12,36 +12,36 @@
namespace Aurora::IO
{
AUKN_SYM bool WaitFor(AuUInt32 milliseconds, bool waitEntireFrame)
AUKN_SYM bool WaitFor(AuUInt32 dwMilliseconds, bool bWaitEntireFrame)
{
bool bHit {};
AuUInt32 targetTime = milliseconds ? AuTime::CurrentClockNS() + AuNSToMS<AuUInt64>(milliseconds) : 0;
AuUInt32 targetTime = dwMilliseconds ? AuTime::SteadyClockNS() + AuNSToMS<AuUInt64>(dwMilliseconds) : 0;
if (!milliseconds)
if (!dwMilliseconds)
{
waitEntireFrame = false;
bWaitEntireFrame = false;
}
while (true)
{
AuUInt32 sleep;
AuUInt32 dwSleep;
if (targetTime)
{
auto now = AuTime::CurrentClockNS();
auto now = AuTime::SteadyClockNS();
if (now >= targetTime)
{
break;
}
sleep = AuNSToMS<AuUInt32>(now - targetTime);
dwSleep = AuNSToMS<AuUInt32>(now - targetTime);
}
else
{
sleep = INFINITE;
dwSleep = INFINITE;
}
if (SleepEx(sleep, true) != WAIT_IO_COMPLETION)
if (SleepEx(dwSleep, true) != WAIT_IO_COMPLETION)
{
continue;
}
@ -50,7 +50,7 @@ namespace Aurora::IO
bHit = true;
if (!waitEntireFrame)
if (!bWaitEntireFrame)
{
break;
}

View File

@ -16,11 +16,10 @@
#include <Source/IO/IOPipeProcessor.hpp>
#include "AuNetWorker.hpp"
#include <Source/IO/Protocol/Protocol.hpp>
#include <Source/IO/Protocol/ProtocolStack.hpp>
#include <Source/IO/Protocol/AuProtocolStack.hpp>
namespace Aurora::IO::Net
{
static const auto kDefaultSteamSize = 32 * 1024; // ~960 clients for 30MB of 2x 32KiB streams. Seems... reasonable.
static const bool kDefaultFuckNagle = true;
SocketChannel::SocketChannel(SocketBase *pParent) :
@ -35,8 +34,8 @@ namespace Aurora::IO::Net
inputChannel(pParent, AuStaticCast<IAsyncTransaction>(AuMakeShared<LinuxAsyncNetworkTransaction>(pParent))),
#endif
uBytesInputBuffer(kDefaultSteamSize),
uBytesOutputBuffer(kDefaultSteamSize),
uBytesInputBuffer(kDefaultStreamSize),
uBytesOutputBuffer(kDefaultStreamSize),
bTcpNoDelay(kDefaultFuckNagle)
{
@ -101,7 +100,7 @@ namespace Aurora::IO::Net
}
else
{
return AuMakeShared<IO::Buffered::BlobWriter>(this->AsWritableByteBuffer());
return AuMakeSharedThrow<IO::Buffered::BlobWriter>(this->AsWritableByteBuffer());
}
}
@ -385,7 +384,50 @@ namespace Aurora::IO::Net
return;
}
// TODO (Reece): reallocate
if (bOutput)
{
if (!this->outputChannel.CanResize())
{
if (pCallbackOptional)
{
pCallbackOptional->OnFailure((void *)nullptr);
}
return;
}
AuByteBuffer newBuffer(uBytes, true, false);
if (!(newBuffer.IsValid()))
{
SysPushErrorMemory();
if (pCallbackOptional)
{
pCallbackOptional->OnFailure((void *)nullptr);
}
return;
}
auto &byteBufferRef = this->outputChannel.GetByteBuffer();
auto oldReadHead = byteBufferRef.readPtr;
if (!newBuffer.WriteFrom(byteBufferRef))
{
SysPushErrorMemory();
this->outputChannel.GetByteBuffer().readPtr = oldReadHead;
if (pCallbackOptional)
{
pCallbackOptional->OnFailure((void *)nullptr);
}
return;
}
byteBufferRef = AuMove(newBuffer);
}
if (bInput)
{
SysPushErrorUnimplemented("Cannot resize a TCP clients input stream now");
}
if (pCallbackOptional)
{

View File

@ -210,6 +210,16 @@ namespace Aurora::IO::Net
WriteTick();
}
AuByteBuffer &SocketChannelOutput::GetByteBuffer()
{
return this->outputBuffer_;
}
bool SocketChannelOutput::CanResize()
{
return this->outputWriteQueue_.IsEmpty();
}
void SocketChannelOutput::OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length)
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();

View File

@ -34,6 +34,10 @@ namespace Aurora::IO::Net
void OnAsyncFileOpFinished(AuUInt64 offset, AuUInt32 length) override;
bool bShutdownOnComplete {};
AuByteBuffer &GetByteBuffer();
bool CanResize();
private:
SocketBase * pParent_;
AuSPtr<IAsyncTransaction> pNetWriteTransaction_;

View File

@ -17,13 +17,13 @@ namespace Aurora::IO::Net
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 maxConnections,
bool bMultiThreadTCP) :
bool bMultiThreaded) :
SocketServer(pInterface,
pWorker,
pDriver,
pSocketDriverFactory,
maxConnections,
bMultiThreadTCP),
bMultiThreaded),
Socket(pInterface,
pWorker,
AuSPtr<ISocketDriver>{},

View File

@ -18,7 +18,7 @@ namespace Aurora::IO::Net
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 maxConnections,
bool bMultiThreadTCP);
bool bMultiThreaded);
virtual void DoNonblockingReadTick() override;

View File

@ -18,7 +18,7 @@ namespace Aurora::IO::Net
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pFactory,
AuUInt32 maxConnections,
bool bMultiThreadTCP)
bool bMultiThreaded)
: Socket(pInterface,
pWorker,
AuSPtr<ISocketDriver>{},
@ -26,7 +26,7 @@ namespace Aurora::IO::Net
pDriver_(pDriver),
pFactory_(pFactory),
uMaxConnections_(maxConnections),
bMultiThreadTCP(bMultiThreadTCP)
bMultiThreaded(bMultiThreaded)
{
}

View File

@ -24,7 +24,7 @@ namespace Aurora::IO::Net
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pFactory,
AuUInt32 maxConnections,
bool bMultiThreadTCP);
bool bMultiThreaded);
void Init(const NetEndpoint &localAddress);
void Listen(const NetEndpoint &localAddress, bool bBind = true, bool bListen = true);
@ -48,7 +48,7 @@ namespace Aurora::IO::Net
virtual void DetroyServer() = 0;
const bool bMultiThreadTCP;
const bool bMultiThreaded;
protected:
// INTERFACE: base os socket

View File

@ -58,7 +58,7 @@ namespace Aurora::IO::Net
UpdateNextSocketAddresses();
if (this->pParent_->bMultiThreadTCP)
if (this->pParent_->bMultiThreaded)
{
auto pCallback = AuMakeShared<IIOProcessorWorkUnitFunctional>([socket = this->nextSocketPtr]()
{
@ -208,7 +208,7 @@ namespace Aurora::IO::Net
}
NetWorker *pWorker;
if (this->pParent_->bMultiThreadTCP)
if (this->pParent_->bMultiThreaded)
{
pWorker = this->pInterface_->TryScheduleEx().get();
}
@ -228,7 +228,7 @@ namespace Aurora::IO::Net
return false;
}
if (this->pParent_->bMultiThreadTCP)
if (this->pParent_->bMultiThreaded)
{
// Defer SendPreestablish until we're done to minimize RPCs
}

View File

@ -13,6 +13,8 @@
#include "AuNetSocket.hpp"
#include "AuNetSocketServer.hpp"
#include "SocketOverDatagram/SocketOverDatagram.hpp"
namespace Aurora::IO::Net
{
NetSrvSockets::NetSrvSockets(NetInterface *pParent) :
@ -117,10 +119,7 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServer> NetSrvSockets::NewServer(const NetSocketBind &netBind)
{
if (netBind.protocol != ETransportProtocol::eProtocolTCP)
{
return {};
}
auto uMaxSockets = netBind.uMaxConnections ? netBind.uMaxConnections : 512;
auto pWorker = this->pParent_->TryScheduleEx();
if (!pWorker)
@ -129,14 +128,57 @@ namespace Aurora::IO::Net
return {};
}
auto uMaxSockets = netBind.uMaxConnections ? netBind.uMaxConnections : 512;
if (netBind.protocol == ETransportProtocol::eProtocolUDP)
{
auto pSocket = AuMakeShared<NetDatagramSocketServer>(this->pParent_,
pWorker.get(),
netBind.pDriver,
netBind.pFactory,
uMaxSockets,
0,
AuSToMS<AuUInt32>(60),
netBind.bMultiThreaded,
false,
kDefaultStreamSize);
if (!pSocket)
{
SysPushErrorNet("No Memory");
return {};
}
if (!pSocket->Init())
{
SysPushErrorNested("no socket");
return {};
}
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
NetEndpoint endpoint;
endpoint.ip = netBind.ip;
endpoint.uPort = netBind.uPort;
endpoint.transportProtocol = netBind.protocol;
pSocket->Start(endpoint);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return {};
}
return pSocket->ToSocketServer();
}
if (netBind.protocol != ETransportProtocol::eProtocolTCP)
{
SysPushErrorNet("Unknown transport protocol");
return {};
}
auto pSocket = AuMakeShared<SocketServerImpl>(this->pParent_,
pWorker.get(),
netBind.pDriver,
netBind.pFactory,
uMaxSockets,
netBind.bMultiThreadTCP);
netBind.bMultiThreaded);
if (!pSocket)
{
SysPushErrorNet("No Memory");
@ -164,6 +206,54 @@ namespace Aurora::IO::Net
AuSPtr<ISocketServer> NetSrvSockets::NewServerEx(const NetSocketBindEx &netBindEx)
{
auto uMaxSockets = netBindEx.uMaxConnections ? netBindEx.uMaxConnections : 512;
auto pWorker = this->pParent_->TryScheduleEx();
if (!pWorker)
{
SysPushErrorNet("No Worker");
return {};
}
if (netBindEx.protocol == ETransportProtocol::eProtocolUDP)
{
auto pSocket = AuMakeShared<NetDatagramSocketServer>(this->pParent_,
pWorker.get(),
netBindEx.pDriver,
netBindEx.pFactory,
uMaxSockets,
0,
AuSToMS<AuUInt32>(netBindEx.uUDPTimeoutMs),
netBindEx.bMultiThreaded,
false,
kDefaultStreamSize);
if (!pSocket)
{
SysPushErrorNet("No Memory");
return {};
}
if (!pSocket->Init())
{
SysPushErrorNested("no socket");
return {};
}
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
NetEndpoint endpoint;
endpoint.ip = netBindEx.ip;
endpoint.uPort = netBindEx.uPort;
endpoint.transportProtocol = netBindEx.protocol;
pSocket->Start(endpoint);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return {};
}
return pSocket->ToSocketServer();
}
if (netBindEx.protocol == ETransportProtocol::eProtocolTCP)
{
return this->NewServer(netBindEx);

View File

@ -51,4 +51,9 @@ namespace Aurora::IO::Net
return AuUnsafeRaiiToShared(&this->current_);
}
bool NetWriteQueue::IsEmpty()
{
return this->views_.empty();
}
}

View File

@ -14,7 +14,8 @@ namespace Aurora::IO::Net
bool Push(const AuSPtr<AuMemoryViewRead> &read);
void NotifyBytesWritten(AuUInt written);
AuSPtr<AuMemoryViewRead> Dequeue();
bool IsEmpty();
private:
AuList<AuTuple<AuUInt, AuSPtr<AuMemoryViewRead>>> views_;
AuMemoryViewRead current_;

View File

@ -21,6 +21,7 @@ namespace Aurora::IO::Net
}
this->iLastTime = uNow;
this->uLastTimeSteadyMS = AuTime::SteadyClockMS();
}
AuInt64 SocketStats::GetFirstTickTimeMS()

View File

@ -23,6 +23,7 @@ namespace Aurora::IO::Net
virtual double GetApproximatedThroughput() override;
AuUInt64 uLastTimeSteadyMS {};
private:
Utility::ThroughputCalculator calculator;
AuInt64 iFirstTime {};

View File

@ -41,5 +41,7 @@
namespace Aurora::IO::Net
{
static const auto kDefaultStreamSize = 32 * 1024; // ~960 clients for 30MB of 2x 32KiB streams. Seems... reasonable.
bool IsNetReady();
}

View File

@ -0,0 +1,54 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramEmulatorISocket.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include "AuNetDatagramEmulatorISocket.hpp"
namespace Aurora::IO::Net
{
const NetError &NetDatagramEmulatorISocket::GetError()
{
return this->pParent->lastError ?
this->pParent->lastError.value() :
this->pParent->ToParent()->GetError();
}
const NetEndpoint &NetDatagramEmulatorISocket::GetLocalEndpoint()
{
return this->pParent->ToParent()->GetLocalEndpoint();
}
void NetDatagramEmulatorISocket::Shutdown(bool bNow)
{
this->pParent->Shutdown(bNow);
}
void NetDatagramEmulatorISocket::Destroy()
{
this->pParent->Destroy();
}
AuUInt NetDatagramEmulatorISocket::ToPlatformHandle()
{
return this->pParent->ToParent()->ToPlatformHandle();
}
AuSPtr<ISocketChannel> NetDatagramEmulatorISocket::ToChannel()
{
return this->pParent->ToChannel();
}
const NetEndpoint &NetDatagramEmulatorISocket::GetRemoteEndpoint()
{
return this->pParent->endpoint;
}
AuSPtr<ISocketDriver> NetDatagramEmulatorISocket::GetUserDriver()
{
return this->pParent->GetUserDriver();
}
}

View File

@ -0,0 +1,36 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramEmulatorISocket.hpp
Date: 2022-19-10
Author: Reece
***/
#pragma once
namespace Aurora::IO::Net
{
struct NetDatagramSocketServerSession;
struct NetDatagramEmulatorISocket : ISocket
{
NetDatagramEmulatorISocket(NetDatagramSocketServerSession *pParent) : pParent(pParent)
{
}
NetDatagramSocketServerSession * const pParent;
const NetError &GetError() override;
const NetEndpoint &GetLocalEndpoint() override;
void Shutdown(bool bNow) override;
void Destroy() override;
AuUInt ToPlatformHandle() override;
AuSPtr<ISocketChannel> ToChannel() override;
const NetEndpoint &GetRemoteEndpoint() override;
AuSPtr<ISocketDriver> GetUserDriver() override;
};
}

View File

@ -0,0 +1,42 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramEmulatorISocketServer.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include "AuNetDatagramEmulatorISocketServer.hpp"
namespace Aurora::IO::Net
{
const NetError &NetDatagramEmulatorISocketServer::GetError()
{
return this->pParent->GetError();
}
const NetEndpoint &NetDatagramEmulatorISocketServer::GetLocalEndpoint()
{
return this->pParent->GetLocalEndpoint();
}
void NetDatagramEmulatorISocketServer::Shutdown(bool bNow)
{
this->pParent->Shutdown(bNow);
}
void NetDatagramEmulatorISocketServer::Destroy()
{
this->pParent->Destroy();
}
AuUInt NetDatagramEmulatorISocketServer::ToPlatformHandle()
{
return this->pParent->ToPlatformHandle();
}
AuSPtr<ISocketServerDriver> NetDatagramEmulatorISocketServer::GetServerDriver()
{
return this->pParent->GetUserServerDriver();
}
}

View File

@ -0,0 +1,32 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramEmulatorISocketServer.hpp
Date: 2022-19-10
Author: Reece
***/
#pragma once
namespace Aurora::IO::Net
{
struct NetDatagramSocketServerSession;
struct NetDatagramEmulatorISocketServer : ISocketServer
{
NetDatagramEmulatorISocketServer(NetDatagramSocketServer *pParent) : pParent(pParent)
{
}
NetDatagramSocketServer * const pParent;
const NetError &GetError() override;
const NetEndpoint &GetLocalEndpoint() override;
void Shutdown(bool bNow) override;
void Destroy() override;
AuUInt ToPlatformHandle() override;
AuSPtr<ISocketServerDriver> GetServerDriver() override;
};
}

View File

@ -0,0 +1,112 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketEviction.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include "AuNetDatagramSocketEviction.hpp"
namespace Aurora::IO::Net
{
NetDatagramSocketEviction::NetDatagramSocketEviction(NetDatagramSocketServer *pParent,
INetWorker *pWorker) :
pParent_(pParent),
pWorker_(pWorker)
{
}
void NetDatagramSocketEviction::OnIOTick()
{
auto sessions = this->pParent_->GetAllSessions();
auto uSocketTimeoutRecvMS = this->pParent_->uSocketTimeoutRecvMS;
auto uSocketTimeoutAnyMS = this->pParent_->uSocketTimeoutAnyMS;
auto uNow = AuTime::SteadyClockMS();
AuBST<AuSPtr<NetWorker>, AuList<AuSPtr<NetDatagramSocketServerSession>>> abortions;
for (const auto &pSession : sessions)
{
auto uLastRecv = pSession->channel.GetRecvStatsEx().uLastTimeSteadyMS;
auto uLastSend = pSession->channel.GetSendStatsEx().uLastTimeSteadyMS;
if ((uSocketTimeoutRecvMS && (uLastRecv + uSocketTimeoutRecvMS < uNow)) ||
(uSocketTimeoutAnyMS && ((uLastRecv + uSocketTimeoutAnyMS < uNow) || (uLastSend + uSocketTimeoutAnyMS < uNow))))
{
abortions[pSession->pWorker].push_back(pSession);
}
}
for (const auto &[pWorker, list] : abortions)
{
pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
for (const auto pDriver : list)
{
pDriver->Shutdown(false);
}
}, {});
}
}
void NetDatagramSocketEviction::EvictAll()
{
AuBST<AuSPtr<NetWorker>, AuList<AuSPtr<NetDatagramSocketServerSession>>> abortions;
auto sessions = this->pParent_->GetAllSessions();
for (const auto &pSession : sessions)
{
abortions[pSession->pWorker].push_back(pSession);
}
for (const auto &[pWorker, list] : abortions)
{
pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
for (const auto pDriver : list)
{
pDriver->Shutdown(false);
}
}, {});
}
}
void NetDatagramSocketEviction::OnIOComplete()
{
}
void NetDatagramSocketEviction::OnIOFailure()
{
}
bool NetDatagramSocketEviction::Init()
{
auto pTimer = AuLoop::NewLSTimer(AuTime::CurrentClockMS(), 5'000, 0);
if (!pTimer)
{
SysPushErrorNet("no timer");
return {};
}
this->pWorkItem_ = this->pWorker_->ToProcessor()->StartSimpleLSWatch(pTimer, AuSPtr<IIOSimpleEventListener>(this->pParent_->SharedFromThis(), this));
if (!this->pWorkItem_)
{
SysPushErrorNet("no timer watch");
return {};
}
return true;
}
void NetDatagramSocketEviction::Stop()
{
if (this->pWorkItem_)
{
this->pWorkItem_->StopWatch();
}
}
}

View File

@ -0,0 +1,32 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerDriver.hpp
Date: 2022-19-09
Author: Reece
***/
#pragma once
namespace Aurora::IO::Net
{
struct NetDatagramSocketServer;
struct NetDatagramSocketEviction : IIOSimpleEventListener
{
NetDatagramSocketEviction(NetDatagramSocketServer *pParent,
INetWorker *pWorker);
void OnIOTick() override;
void OnIOFailure() override;
void OnIOComplete() override;
bool Init();
void Stop();
void EvictAll();
private:
NetDatagramSocketServer *pParent_;
INetWorker *pWorker_;
AuSPtr<IIOProcessorItem> pWorkItem_;
};
}

View File

@ -0,0 +1,358 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServer.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include "AuNetDatagramSocketServer.hpp"
#include "../AuNetEndpoint.hpp"
#include "../AuNetWorker.hpp"
namespace Aurora::IO::Net
{
NetDatagramSocketServer::NetDatagramSocketServer(struct NetInterface *pInterface,
struct NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 uMaxConnections,
AuUInt32 uSocketTimeoutAnyMS,
AuUInt32 uSocketTimeoutRecvMS,
bool bMultiThread,
bool bProtectMainDispatcher,
AuUInt32 uDefaultPacketSize) :
driver(this),
serverImplementor(this),
pWorker_(pWorker),
pInterface_(pInterface),
pUserServerDriver_(pDriver),
bMultiThread_(bMultiThread),
bProtectMainDispatcher_(bProtectMainDispatcher),
pSocketDriverFactory_(pSocketDriverFactory),
eviction(this, pWorker),
uSocketTimeoutAnyMS(uSocketTimeoutAnyMS),
uSocketTimeoutRecvMS(uSocketTimeoutRecvMS),
uDefaultPacketSize(uDefaultPacketSize)
{
}
NetDatagramSocketServer::~NetDatagramSocketServer()
{
}
bool NetDatagramSocketServer::Init()
{
this->mutex_ = AuThreadPrimitives::MutexUnique();
return bool(this->mutex_);
}
void NetDatagramSocketServer::Start(const NetEndpoint &local)
{
this->localEndpoint = local;
AuNet::NetDatagramBind nsDatagramBind;
nsDatagramBind.ip = this->localEndpoint.ip;
nsDatagramBind.uPort = this->localEndpoint.uPort;
nsDatagramBind.pDriver = this->ToDriver();
this->pDatagramServer_ = this->pInterface_->GetDatagramService()->NewDatagramServer(nsDatagramBind);
if (!this->pDatagramServer_)
{
this->OnFatalErrorReported(AuNet::NetError(AuNet::ENetworkError::eResourceConstraint));
}
if (!this->eviction.Init())
{
this->OnFatalErrorReported(AuNet::NetError(AuNet::ENetworkError::eResourceConstraint));
}
}
void NetDatagramSocketServer::OnListen()
{
if (auto pDriver = this->GetUserServerDriver())
{
try
{
pDriver->OnBind();
}
catch (...)
{
SysPushErrorCatch();
}
}
}
void NetDatagramSocketServer::DispatchNew(AuSPtr<NetDatagramSocketServerSession> pSession)
{
// TODO: dispatch new across thread
pSession->OnInit(this->pInterface_->TryScheduleEx());
}
void NetDatagramSocketServer::Cleanup(const AuSPtr<NetDatagramSocketServerSession> &pSession)
{
auto &endpoint = pSession->endpoint;
auto uHashCode = endpoint.ip.HashCode() ^
AuHashCode(endpoint.uPort) ^
AuHashCode(endpoint.uIPv6Scope);
{
AU_LOCK_GUARD(this->mutex_);
auto itr = this->sessions.find(uHashCode);
if (itr == this->sessions.end())
{
SysPushErrorNet("Removal of untracked session");
AuTelemetry::Mayday();
}
else
{
AuRemoveIf(itr->second, [=](const AuSPtr<NetDatagramSocketServerSession> &pPointer) -> bool
{
return (pPointer->endpoint.ip == endpoint.ip &&
pPointer->endpoint.uPort == endpoint.uPort);
});
if (!itr->second.size())
{
this->sessions.erase(itr);
}
}
}
}
AuSPtr<NetDatagramSocketServerSession> NetDatagramSocketServer::GetSession(const NetEndpoint &endpoint)
{
auto uHashCode = endpoint.ip.HashCode() ^
AuHashCode(endpoint.uPort) ^
AuHashCode(endpoint.uIPv6Scope);
if (!this->mutex_)
{
return {};
}
{
AU_LOCK_GUARD(this->mutex_);
auto itr = this->sessions.find(uHashCode);
if (itr == this->sessions.end())
{
auto pNew = AuMakeShared<NetDatagramSocketServerSession>(this->SharedFromThis());
pNew->endpoint = endpoint;
OptimizeEndpoint(pNew->endpoint);
if (!pNew)
{
SysPushErrorMemory();
return {};
}
if (!AuTryInsert(this->sessions, uHashCode, AuList<AuSPtr<NetDatagramSocketServerSession>> { pNew }))
{
SysPushErrorMemory();
return {};
}
this->DispatchNew(pNew);
return pNew;
}
else
{
for (const auto &pPointer: itr->second)
{
if (pPointer->endpoint.ip == endpoint.ip &&
pPointer->endpoint.uPort == endpoint.uPort)
{
return pPointer;
}
}
auto pNew = AuMakeShared<NetDatagramSocketServerSession>(this->SharedFromThis());
pNew->endpoint = endpoint;
OptimizeEndpoint(pNew->endpoint);
if (!pNew)
{
SysPushErrorMemory();
return {};
}
this->DispatchNew(pNew);
return pNew;
}
}
return {};
}
AuList<AuSPtr<NetDatagramSocketServerSession>> NetDatagramSocketServer::GetAllSessions()
{
AuList<AuSPtr<NetDatagramSocketServerSession>> ret;
{
AU_LOCK_GUARD(this->mutex_);
for (const auto &[a, list] : this->sessions)
{
ret.insert(ret.end(), list.begin(), list.end());
}
}
return ret;
}
void NetDatagramSocketServer::Shutdown(bool bNow)
{
this->eviction.EvictAll();
if (bNow)
{
if (this->pDatagramServer_)
{
this->pDatagramServer_->Destroy();
this->pDatagramServer_.reset();
}
this->Destroy();
}
else
{
// TODO:
}
}
void NetDatagramSocketServer::Destroy()
{
this->eviction.Stop();
if (auto pDriver = this->pUserServerDriver_)
{
if (this->pWorker_ && !this->pWorker_->IsOnThread())
{
if (!this->pWorker_->TryScheduleInternalTemplate<AuNullS>([pThat = SharedFromThis()](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
if (pThat->pUserServerDriver_)
{
pThat->pUserServerDriver_->OnFinalize();
}
pThat->pUserServerDriver_.reset();
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
}
}
else
{
try
{
pDriver->OnFinalize();
}
catch (...)
{
SysPushErrorCatch();
}
this->pUserServerDriver_.reset();
}
}
if (this->pDatagramServer_)
{
this->pDatagramServer_->Destroy();
this->pDatagramServer_.reset();
}
if (this->pSocketDriverFactory_)
{
this->pSocketDriverFactory_.reset();
}
if (this->pDatagramServer_)
{
this->pDatagramServer_->Destroy();
this->pDatagramServer_.reset();
}
}
const NetError &NetDatagramSocketServer::GetError()
{
return this->lastError;
}
const NetEndpoint &NetDatagramSocketServer::GetLocalEndpoint()
{
return this->localEndpoint;
}
void NetDatagramSocketServer::OnError(const NetError &error)
{
this->lastError = error;
if (auto pDriver = this->GetUserServerDriver())
{
try
{
pDriver->OnError({}, error);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
void NetDatagramSocketServer::OnFatalErrorReported(const NetError &error)
{
if (AuExchange(this->bFatalReported_, true))
{
return;
}
this->lastError = error;
if (auto pDriver = this->GetUserServerDriver())
{
try
{
pDriver->OnFatalErrorReported(error);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
AuUInt NetDatagramSocketServer::ToPlatformHandle()
{
return this->uSocketHandle;
}
AuSPtr<ISocketServer> NetDatagramSocketServer::ToSocketServer()
{
return AuSPtr<ISocketServer>(AuSharedFromThis(), &this->serverImplementor);
}
AuSPtr<NetDatagramSocketServerDriver> NetDatagramSocketServer::ToDriver()
{
return AuSPtr<NetDatagramSocketServerDriver>(AuSharedFromThis(), &this->driver);
}
AuSPtr<IDatagramServer> NetDatagramSocketServer::ToServer()
{
return this->pDatagramServer_;
}
AuSPtr<ISocketServerDriver> NetDatagramSocketServer::GetUserServerDriver()
{
return this->pUserServerDriver_;
}
AuSPtr<ISocketDriverFactory> NetDatagramSocketServer::GetFactoryDriver()
{
return this->pSocketDriverFactory_;
}
}

View File

@ -0,0 +1,86 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServer.hpp
Date: 2022-19-09
Author: Reece
***/
#pragma once
#include "AuNetDatagramSocketServerDriver.hpp"
#include "AuNetDatagramEmulatorISocketServer.hpp"
#include "AuNetDatagramSocketEviction.hpp"
namespace Aurora::IO::Net
{
struct NetDatagramSocketServerSession;
struct NetDatagramSocketServer : AuEnableSharedFromThis<NetDatagramSocketServer>, ISocketBase
{
NetDatagramSocketServer(struct NetInterface *pInterface,
struct NetWorker *pWorker,
const AuSPtr<ISocketServerDriver> &pDriver,
const AuSPtr<ISocketDriverFactory> &pSocketDriverFactory,
AuUInt32 uMaxConnections,
AuUInt32 uSocketTimeoutAnyMS,
AuUInt32 uSocketTimeoutRecvMS,
bool bMultiThread,
bool bProtectMainDispatcher,
AuUInt32 uDefaultPacketSize);
~NetDatagramSocketServer();
bool Init();
void Start(const NetEndpoint &local);
void OnListen();
AuSPtr<NetDatagramSocketServerSession> GetSession(const NetEndpoint &endpoint);
void Cleanup(const AuSPtr<NetDatagramSocketServerSession> &pSession);
AuList<AuSPtr<NetDatagramSocketServerSession>> GetAllSessions();
AuSPtr<NetDatagramSocketServerDriver> ToDriver();
AuSPtr<ISocketServer> ToSocketServer();
const NetError &GetError() override;
const NetEndpoint &GetLocalEndpoint() override;
void OnError(const NetError &error);
void OnFatalErrorReported(const NetError &error);
void Shutdown(bool bNow) override;
void Destroy() override;
AuUInt ToPlatformHandle() override;
AuSPtr<ISocketServerDriver> GetUserServerDriver();
AuSPtr<ISocketDriverFactory> GetFactoryDriver();
AuSPtr<IDatagramServer> ToServer();
void DispatchNew(AuSPtr<NetDatagramSocketServerSession> pSession);
const AuUInt32 uSocketTimeoutAnyMS;
const AuUInt32 uSocketTimeoutRecvMS;
const AuUInt32 uDefaultPacketSize;
private:
NetError lastError;
NetDatagramSocketServerDriver driver;
NetEndpoint localEndpoint {};
NetDatagramEmulatorISocketServer serverImplementor;
AuHashMap<AuUInt, AuList<AuSPtr<NetDatagramSocketServerSession>>> sessions;
AuList<AuSPtr<INetWorker>> workers;
NetInterface *const pInterface_;
NetWorker *const pWorker_;
AuSPtr<ISocketServerDriver> pUserServerDriver_;
AuUInt uSocketHandle {};
bool bMultiThread_ {};
bool bProtectMainDispatcher_ {};
AuUInt64 uProtectedWorkerId_ {};
void *pProtectedWorkerPtr_ {};
bool bFatalReported_ {};
AuSPtr<IDatagramServer> pDatagramServer_;
AuSPtr<ISocketDriverFactory> pSocketDriverFactory_;
NetDatagramSocketEviction eviction;
AuThreadPrimitives::MutexUnique_t mutex_;
};
}

View File

@ -0,0 +1,376 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerChannel.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include <Source/IO/Protocol/AuProtocolStack.hpp>
#if defined(AURORA_IS_MODERNNT_DERIVED)
#include "../AuNetStream.NT.hpp"
#else
#include "../AuNetStream.Linux.hpp"
#endif
#include "../AuNetDatagramServer.hpp"
namespace Aurora::IO::Net
{
NetDatagramSocketServerChannel::NetDatagramSocketServerChannel(NetDatagramSocketServerSession *pParent) :
pParent_(pParent)
{
}
AuSPtr<ISocket> NetDatagramSocketServerChannel::ToParent()
{
return this->pParent_->ToSocket();
}
void NetDatagramSocketServerChannel::ScheduleOutOfFrameWrite()
{
this->DoSendTick();
}
AuSPtr<IAsyncTransaction> NetDatagramSocketServerChannel::ToReadTransaction()
{
return {};
}
AuSPtr<IAsyncTransaction> NetDatagramSocketServerChannel::ToWriteTransaction()
{
auto pTransaction = AuMakeShared<NtAsyncNetworkTransaction>(AuDynamicCast<AuNet::DatagramServerImpl>(this->pParent_->ToParent()->ToServer()).get());
if (!pTransaction)
{
SysPushErrorMemory();
return {};
}
pTransaction->bDatagramMode = true;
AuMemcpy(&pTransaction->netEndpoint, &this->pParent_->endpoint, sizeof(NetEndpoint));
return pTransaction;
}
bool NetDatagramSocketServerChannel::SpecifyTCPNoDelay(bool bFuckNagle)
{
return {};
}
bool NetDatagramSocketServerChannel::SpecifyTransactionsHaveIOFence(bool bAllocateFence)
{
return {};
}
bool NetDatagramSocketServerChannel::SpecifyBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
return {};
}
bool NetDatagramSocketServerChannel::SpecifyOutputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
auto pWorker = this->pParent_->pWorker;
if (!pWorker)
{
return false;
}
if (pWorker->IsOnThread())
{
this->DoBufferResizeOnThread(false,
true,
uBytes,
pCallbackOptional);
return true;
}
auto pThat = AuSPtr<NetDatagramSocketServerChannel>(this->pParent_->SharedFromThis(), this);
if (!pWorker->TryScheduleInternalTemplate<AuNullS>([=](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->DoBufferResizeOnThread(false,
true,
uBytes,
pCallbackOptional);
}, AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>>{}))
{
return false;
}
return true;
}
bool NetDatagramSocketServerChannel::SpecifyInputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
return {};
}
bool NetDatagramSocketServerChannel::SpecifyManualWrite(bool bEnableDirectAIOWrite)
{
return bEnableDirectAIOWrite;
}
bool NetDatagramSocketServerChannel::SpecifyManualRead(bool bEnableDirectAIORead)
{
return false;
}
bool NetDatagramSocketServerChannel::SpecifyPerTickAsyncReadLimit(AuUInt uBytes)
{
return {};
}
bool NetDatagramSocketServerChannel::GetCurrentTCPNoDelay()
{
return true /*technically not nodelay. if users care to ask for nagle, no, you're sending frames. */;
}
AuUInt NetDatagramSocketServerChannel::GetInputBufferSize()
{
if (this->pBuffer)
{
return this->pBuffer->allocSize;
}
return this->pParent_->ToParent()->uDefaultPacketSize;
}
AuUInt NetDatagramSocketServerChannel::GetOutputBufferSize()
{
return this->outputBuffer.allocSize;
}
AuSPtr<IStreamReader> NetDatagramSocketServerChannel::AsStreamReader()
{
if (this->pRecvProtocol)
{
return this->pRecvProtocol->AsStreamReader();
}
else
{
return AuMakeSharedThrow<AuIO::Buffered::BlobReader>(this->pBuffer);
}
}
AuSPtr<Memory::ByteBuffer> NetDatagramSocketServerChannel::AsReadableByteBuffer()
{
if (this->pRecvProtocol)
{
return this->pRecvProtocol->AsReadableByteBuffer();
}
else
{
return this->pBuffer;
}
}
AuSPtr<IStreamWriter> NetDatagramSocketServerChannel::AsStreamWriter()
{
if (this->pSendProtocol)
{
return this->pSendProtocol->AsStreamWriter();
}
else
{
return AuMakeSharedThrow<IO::Buffered::BlobWriter>(this->AsWritableByteBuffer());
}
}
AuSPtr<Memory::ByteBuffer> NetDatagramSocketServerChannel::AsWritableByteBuffer()
{
if (this->pSendProtocol)
{
return this->pSendProtocol->AsWritableByteBuffer();
}
else
{
return AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(), &this->outputBuffer);
}
}
AuSPtr<Protocol::IProtocolStack> NetDatagramSocketServerChannel::NewProtocolRecvStack()
{
auto pStack = AuMakeShared<Protocol::ProtocolStack>();
if (!pStack)
{
SysPushErrorMem();
return {};
}
return pStack;
}
AuSPtr<Protocol::IProtocolStack> NetDatagramSocketServerChannel::NewProtocolSendStack()
{
auto pBaseProtocol = Protocol::NewBufferedProtocolStack(this->uBytesOutputBuffer);
if (!pBaseProtocol)
{
return {};
}
auto pProtocol = AuStaticCast<Protocol::ProtocolStack>(pBaseProtocol);
if (!pProtocol)
{
return {};
}
pProtocol->pDrainBuffer = AuSPtr<Memory::ByteBuffer>(this->pParent_->SharedFromThis(), &this->outputBuffer);
return pProtocol;
}
void NetDatagramSocketServerChannel::SpecifyRecvProtocol(const AuSPtr<Protocol::IProtocolStack> &pRecvProtocol)
{
if (this->pRecvProtocol)
{
AuStaticCast<AuIO::Protocol::ProtocolStack>(this->pRecvProtocol)->bOwnsSource = false;
AuStaticCast<AuIO::Protocol::ProtocolStack>(this->pRecvProtocol)->pSourceBufer = {};
}
this->pRecvProtocol = pRecvProtocol;
}
void NetDatagramSocketServerChannel::SpecifySendProtocol(const AuSPtr<Protocol::IProtocolStack> &pSendProtocol)
{
this->pSendProtocol = pSendProtocol;
}
AuSPtr<ISocketStats> NetDatagramSocketServerChannel::GetRecvStats()
{
return AuSPtr<ISocketStats>(this->pParent_->SharedFromThis(), &this->recvStats_);
}
AuSPtr<ISocketStats> NetDatagramSocketServerChannel::GetSendStats()
{
return AuSPtr<ISocketStats>(this->pParent_->SharedFromThis(), &this->sendStats_);
}
SocketStats &NetDatagramSocketServerChannel::GetSendStatsEx()
{
return this->sendStats_;
}
SocketStats &NetDatagramSocketServerChannel::GetRecvStatsEx()
{
return this->recvStats_;
}
void NetDatagramSocketServerChannel::Establish()
{
}
void NetDatagramSocketServerChannel::ExpandRecv(const AuSPtr<AuByteBuffer> &pBuffer,
bool bOwn)
{
this->recvStats_.AddBytes(pBuffer->RemainingBytes());
if (this->pBuffer)
{
if (!this->bOwnsBuffer)
{
AuSPtr<AuByteBuffer> pNewBuffer;
if (!((pNewBuffer = AuMakeShared<AuByteBuffer>(*pBuffer)) && (pNewBuffer->IsValid())))
{
SysPushErrorMemory();
this->pParent_->OnError(AuNet::ENetworkError::eResourceConstraint);
return;
}
this->pBuffer = pNewBuffer;
this->bOwnsBuffer = true;
}
if (!this->pBuffer->WriteFrom(*pBuffer))
{
this->pParent_->OnError(AuNet::ENetworkError::eResourceConstraint);
return;
}
}
else
{
this->pBuffer = pBuffer;
this->bOwnsBuffer = bOwn;
}
}
void NetDatagramSocketServerChannel::DoSendTick()
{
if (this->pSendProtocol)
{
this->pSendProtocol->DoTick();
}
// TODO: ...
}
void NetDatagramSocketServerChannel::UpdateReadPointers()
{
if (this->pRecvProtocol)
{
AuStaticCast<AuIO::Protocol::ProtocolStack>(this->pRecvProtocol)->bOwnsSource = false;
AuStaticCast<AuIO::Protocol::ProtocolStack>(this->pRecvProtocol)->pSourceBufer = this->pBuffer;
}
}
void NetDatagramSocketServerChannel::DoReadTick()
{
if (this->pRecvProtocol)
{
this->pRecvProtocol->DoTick();
}
}
void NetDatagramSocketServerChannel::DoEndReadTick()
{
if (this->pBuffer)
{
if ((this->pBuffer->readPtr == this->pBuffer->writePtr) ||
((!this->pBuffer->flagCircular) &&
(this->pBuffer->readPtr == this->pBuffer->base + this->pBuffer->length)))
{
this->pBuffer.reset();;
}
}
}
void NetDatagramSocketServerChannel::DoBufferResizeOnThread(bool bInput,
bool bOutput,
AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional)
{
if (bOutput)
{
AuByteBuffer newBuffer(uBytes, true, false);
if (!(newBuffer.IsValid()))
{
SysPushErrorMemory();
if (pCallbackOptional)
{
pCallbackOptional->OnFailure((void *)nullptr);
}
return;
}
if (!newBuffer.WriteFrom(this->outputBuffer))
{
SysPushErrorMemory();
if (pCallbackOptional)
{
pCallbackOptional->OnFailure((void *)nullptr);
}
return;
}
this->outputBuffer = AuMove(newBuffer);
}
if (pCallbackOptional)
{
pCallbackOptional->OnSuccess((void *)nullptr);
}
}
}

View File

@ -0,0 +1,104 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerChannel.hpp
Date: 2022-19-09
Author: Reece
***/
#pragma once
#include <Source/IO/Net/AuSocketStats.hpp>
namespace Aurora::IO::Net
{
struct NetDatagramSocketServerSession;
struct NetDatagramSocketServerChannel : ISocketChannel
{
NetDatagramSocketServerChannel(NetDatagramSocketServerSession *pParent);
AuSPtr<ISocket> ToParent() override;
void ScheduleOutOfFrameWrite() override;
AuSPtr<IAsyncTransaction> ToReadTransaction() override;
AuSPtr<IAsyncTransaction> ToWriteTransaction() override;
bool SpecifyTCPNoDelay(bool bFuckNagle) override;
bool SpecifyTransactionsHaveIOFence(bool bAllocateFence) override;
bool SpecifyBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional) override;
bool SpecifyOutputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional) override;
bool SpecifyInputBufferSize(AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional) override;
bool SpecifyManualWrite(bool bEnableDirectAIOWrite) override;
bool SpecifyManualRead(bool bEnableDirectAIORead) override;
bool SpecifyPerTickAsyncReadLimit(AuUInt uBytes) override;
bool GetCurrentTCPNoDelay() override;
AuUInt GetInputBufferSize() override;
AuUInt GetOutputBufferSize() override;
AuSPtr<IStreamReader> AsStreamReader() override;
AuSPtr<Memory::ByteBuffer> AsReadableByteBuffer() override;
AuSPtr<IStreamWriter> AsStreamWriter() override;
AuSPtr<Memory::ByteBuffer> AsWritableByteBuffer() override;
AuSPtr<Protocol::IProtocolStack> NewProtocolRecvStack() override;
AuSPtr<Protocol::IProtocolStack> NewProtocolSendStack() override;
void SpecifyRecvProtocol(const AuSPtr<Protocol::IProtocolStack> &pRecvProtocol) override;
void SpecifySendProtocol(const AuSPtr<Protocol::IProtocolStack> &pSendProtocol) override;
AuSPtr<ISocketStats> GetRecvStats() override;
AuSPtr<ISocketStats> GetSendStats() override;
SocketStats &GetSendStatsEx();
SocketStats &GetRecvStatsEx();
void Establish();
void DoBufferResizeOnThread(bool bInput,
bool bOutput,
AuUInt uBytes,
const AuSPtr<AuAsync::PromiseCallback<AuNullS, AuNullS>> &pCallbackOptional);
void ExpandRecv(const AuSPtr<AuByteBuffer> &pBuffer, bool bOwn);
void DoSendTick();
void UpdateReadPointers();
void DoReadTick();
void DoEndReadTick();
AuSPtr<AuByteBuffer> pBuffer;
AuByteBuffer outputBuffer;
bool bOwnsBuffer {};
AuUInt uBytesToFlip { 0 };
AuUInt uBytesInputBuffer { 0 };
AuUInt uBytesOutputBuffer { 0 };
AuSPtr<Protocol::IProtocolStack> pRecvProtocol;
AuSPtr<Protocol::IProtocolStack> pSendProtocol;
private:
NetDatagramSocketServerSession *pParent_;
SocketStats sendStats_;
SocketStats recvStats_;
};
}

View File

@ -0,0 +1,50 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerDriver.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
#include "AuNetDatagramSocketServerDriver.hpp"
#include "AuNetDatagramSocketServer.hpp"
namespace Aurora::IO::Net
{
NetDatagramSocketServerDriver::NetDatagramSocketServerDriver(NetDatagramSocketServer *pParent) : pParent(pParent)
{
}
void NetDatagramSocketServerDriver::OnBind()
{
this->pParent->OnListen();
}
void NetDatagramSocketServerDriver::OnPacket(const NetEndpoint &sender, const AuSPtr<AuByteBuffer> &pBuffer)
{
auto pSender = this->pParent->GetSession(sender);
if (!pSender)
{
this->OnFatalErrorReported({});
return;
}
pSender->OnPacket(pBuffer);
}
void NetDatagramSocketServerDriver::OnError(const NetError &error)
{
this->pParent->OnError(error);
}
void NetDatagramSocketServerDriver::OnFatalErrorReported(const NetError &error)
{
this->pParent->OnFatalErrorReported(error);
}
void NetDatagramSocketServerDriver::OnFinalize()
{
this->pParent->Destroy();
}
}

View File

@ -0,0 +1,26 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerDriver.hpp
Date: 2022-19-09
Author: Reece
***/
#pragma once
namespace Aurora::IO::Net
{
struct NetDatagramSocketServer;
struct NetDatagramSocketServerDriver : IDatagramDriver
{
NetDatagramSocketServerDriver(NetDatagramSocketServer *pParent);
NetDatagramSocketServer *const pParent;
void OnBind() override;
void OnPacket(const NetEndpoint &sender, const AuSPtr<AuByteBuffer> &pBuffer) override;
void OnError(const NetError &error) override;
void OnFatalErrorReported(const NetError &error) override;
void OnFinalize() override;
};
}

View File

@ -0,0 +1,254 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerSession.cpp
Date: 2022-19-09
Author: Reece
***/
#include "SocketOverDatagram.hpp"
namespace Aurora::IO::Net
{
NetDatagramSocketServerSession::NetDatagramSocketServerSession(AuSPtr<NetDatagramSocketServer> pParentServer) :
pParentServer(pParentServer),
emulator(this),
channel(this),
pDriver(pDriver)
{
}
NetDatagramSocketServerSession::~NetDatagramSocketServerSession()
{
}
void NetDatagramSocketServerSession::OnInit(AuSPtr<NetWorker> pWorker)
{
this->pWorker = pWorker;
if (!pWorker->IsOnThread())
{
pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis(), pWorker](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->OnInit(pWorker);
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
{
SysPushErrorNet("Rejecting pending UDP connection");
})));
return;
}
auto pDriverFactory = this->ToParent()->GetFactoryDriver();
if (!pDriverFactory)
{
return;
}
this->pDriver = pDriverFactory->NewSocketDriver();
if (!this->channel.outputBuffer.Resize(this->ToParent()->uDefaultPacketSize))
{
return;
}
this->channel.outputBuffer.flagCircular = true;
this->channel.outputBuffer.flagExpandable = false;
if (this->pDriver)
{
try
{
if (!this->pDriver->OnPreestablish(this->ToSocket()))
{
this->Destroy();
return;
}
this->bIsReady = true;
this->pDriver->OnEstablish();
}
catch (...)
{
SysPushErrorCatch();
this->Destroy();
return;
}
}
}
void NetDatagramSocketServerSession::OnPacketOnThread(const AuSPtr<AuByteBuffer> &pBuffer, bool bOwn)
{
this->channel.ExpandRecv(pBuffer, bOwn);
this->DoTick();
}
void NetDatagramSocketServerSession::DoTick()
{
if (!this->bIsReady)
{
return;
}
this->channel.UpdateReadPointers();
this->channel.DoReadTick();
try
{
this->pDriver->OnStreamUpdated();
}
catch (...)
{
SysPushErrorCatch();
return;
}
this->channel.DoEndReadTick();
this->channel.DoSendTick();
}
void NetDatagramSocketServerSession::Shutdown(bool bNow)
{
if (bNow)
{
this->Destroy();
return;
}
if (!pWorker->IsOnThread())
{
pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis()](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->Shutdown(false);
}, {});
}
else
{
if (this->sendQueue.empty())
{
this->Destroy();
return;
}
this->bDieAfterSendAck = true;
this->DoTick();
}
}
void NetDatagramSocketServerSession::OnPacket(const AuSPtr<AuByteBuffer> &pBuffer)
{
if (this->bIsDead)
{
this->Destroy();
return;
}
if (!pWorker->IsOnThread())
{
AuSPtr<AuByteBuffer> pIPCBuffer;
if (!((pIPCBuffer = AuMakeShared<AuByteBuffer>(*pBuffer)) && (pIPCBuffer->IsValid())))
{
// meh, datagrams can be dropped at any point in the pipeline, i guess.
// there's no point in trying to send a failure message to another thread, in that we know it will probably fail
SysPushErrorMemory();
return;
}
pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis(), pIPCBuffer](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->OnPacketOnThread(pIPCBuffer, true);
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
{
SysPushErrorNet("Rejecting pending UDP connection");
})));
return;
}
else
{
this->OnPacketOnThread(pBuffer, false);
}
}
void NetDatagramSocketServerSession::OnError(const NetError &error)
{
this->bIsDead = true;
this->lastError = error;
if (auto pDriver = this->GetUserDriver())
{
try
{
pDriver->OnFatalErrorReported(error);
}
catch (...)
{
SysPushErrorCatch();
}
}
}
AuSPtr<NetDatagramSocketServer> NetDatagramSocketServerSession::ToParent()
{
return !this->pParentServer.expired() ? this->pParentServer.lock() : AuSPtr<NetDatagramSocketServer> {};
}
AuSPtr<IO::Net::ISocketDriver> NetDatagramSocketServerSession::ToDriver()
{
return this->pDriver;
}
AuSPtr<ISocketChannel> NetDatagramSocketServerSession::ToChannel()
{
return AuSPtr<ISocketChannel>(AuSharedFromThis(), &this->channel);
}
AuSPtr<ISocket> NetDatagramSocketServerSession::ToSocket()
{
return AuSPtr<ISocket>(AuSharedFromThis(), &this->emulator);
}
const NetEndpoint &NetDatagramSocketServerSession::GetRemoteEndpoint()
{
return this->endpoint;
}
AuSPtr<ISocketDriver> NetDatagramSocketServerSession::GetUserDriver()
{
return this->pDriver;
}
void NetDatagramSocketServerSession::Destroy()
{
this->bIsDead = true;
if (pWorker->IsOnThread())
{
if (auto pDriver = this->pDriver)
{
pDriver->OnFinalize();
this->pDriver.reset();
}
if (auto pServer = this->ToParent())
{
pServer->Cleanup(SharedFromThis());
}
this->pParentServer.reset();
return;
}
pWorker->TryScheduleInternalTemplate<AuNullS>([pThat = this->SharedFromThis()](const AuSPtr<AuAsync::PromiseCallback<AuNullS>> &info)
{
pThat->Destroy();
}, AuStaticCast<AuAsync::PromiseCallback<AuNullS, AuNullS>>(AuMakeShared<AuAsync::PromiseCallbackFunctional<AuNullS, AuNullS>>([=](const AuSPtr<AuNullS> &dumb)
{
})));
}
}

View File

@ -0,0 +1,68 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuNetDatagramSocketServerSession.hpp
Date: 2022-19-09
Author: Reece
***/
#pragma once
#include "AuNetDatagramEmulatorISocket.hpp"
#include "AuNetDatagramSocketServerChannel.hpp"
namespace Aurora::IO::Net
{
struct NetWorker;
struct NetDatagramSocketServerSession : AuEnableSharedFromThis<NetDatagramSocketServerSession>
{
NetDatagramSocketServerSession(AuSPtr<NetDatagramSocketServer> pParentServer);
~NetDatagramSocketServerSession();
const NetEndpoint &GetRemoteEndpoint();
AuSPtr<ISocketDriver> GetUserDriver();
AuSPtr<NetDatagramSocketServer> ToParent();
AuSPtr<ISocketDriver> ToDriver();
AuSPtr<ISocketChannel> ToChannel();
AuSPtr<ISocket> ToSocket();
void OnInit(AuSPtr<NetWorker> pWorker);
void OnPacket(const AuSPtr<AuByteBuffer> &pBuffer);
void OnPacketOnThread(const AuSPtr<AuByteBuffer> &pBuffer, bool bOwn);
void OnError(const NetError &error);
void Shutdown(bool bNow);
void Destroy();
void DoTick();
AuWPtr<NetDatagramSocketServer> pParentServer;
NetDatagramEmulatorISocket emulator;
NetDatagramSocketServerChannel channel;
NetEndpoint endpoint;
AuUInt uHash {};
AuUInt64 uLastPacketRecvMS {};
AuUInt64 uLastPacketSendMS {};
AuUInt64 uLastPacketRecvWallEpochMS {};
AuUInt64 uLastPacketSendWallEpochMS {};
AuOptional<NetError> lastError;
AuSPtr<ISocketDriver> pDriver;
bool bIsDead {};
AuSPtr<NetWorker> pWorker;
AuList<AuUInt> sendQueue;
bool bDieAfterSendAck {};
bool bIsReady {};
};
}

View File

@ -0,0 +1,3 @@
This headache reimplements the socket (including server) apis over the net datagram interface so that some TCP and UDP code can be made 1:1.
On the plus side for servers, this counts as half of your heartbeat code, all of the eviction logic, and multithread delegation implemented for you in one maintainable place.

View File

@ -0,0 +1,7 @@
#include <Source/IO/Net/Networking.hpp>
#include <Source/IO/Net/AuNetInterface.hpp>
#include <Source/IO/Net/AuNetWorker.hpp>
#include "AuNetDatagramSocketServer.hpp"
#include "AuNetDatagramSocketServerChannel.hpp"
#include "AuNetDatagramSocketServerSession.hpp"

View File

@ -1,13 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ProtocolPiece.cpp
File: AuProtocolPiece.cpp
Date: 2022-8-24
Author: Reece
***/
#include "Protocol.hpp"
#include "ProtocolPiece.hpp"
#include "ProtocolStack.hpp"
#include "AuProtocolPiece.hpp"
#include "AuProtocolStack.hpp"
#include "IProtocolNext.hpp"
namespace Aurora::IO::Protocol

View File

@ -1,13 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ProtocolStack.cpp
File: AuProtocolStack.cpp
Date: 2022-8-24
Author: Reece
***/
#include "Protocol.hpp"
#include "ProtocolStack.hpp"
#include "ProtocolPiece.hpp"
#include "AuProtocolStack.hpp"
#include "AuProtocolPiece.hpp"
#include "IProtocolNext.hpp"
#include <Source/IO/IOPipeProcessor.hpp>
@ -52,7 +52,7 @@ namespace Aurora::IO::Protocol
return this->AddInterceptorWhereEx(true, pInterceptorEx, uOutputBufferSize, false);
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(bool prepend,
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhere(bool bPrepend,
const AuSPtr<IProtocolInterceptor> &pInterceptor,
AuUInt uOutputBufferSize)
{
@ -137,7 +137,7 @@ namespace Aurora::IO::Protocol
pNew->pParent = this;
pNew->pInterceptor = pInterceptor;
if (prepend)
if (bPrepend)
{
if (this->pBottomPiece)
{
@ -159,7 +159,7 @@ namespace Aurora::IO::Protocol
return pNew;
}
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(bool prepend,
AuSPtr<IProtocolPiece> ProtocolStack::AddInterceptorWhereEx(bool bPrepend,
const AuSPtr<IProtocolInterceptorEx> &pInterceptor,
AuUInt uOutputBufferSize,
bool bMultipleTick)
@ -251,7 +251,7 @@ namespace Aurora::IO::Protocol
pNew->pParent = this;
pNew->bMultipleTick = bMultipleTick;
if (prepend)
if (bPrepend)
{
if (this->pBottomPiece)
{

View File

@ -1,7 +1,7 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ProtocolStack.hpp
File: AuProtocolStack.hpp
Date: 2022-8-24
Author: Reece
***/
@ -28,8 +28,8 @@ namespace Aurora::IO::Protocol
void Destroy() override;
AuSPtr<IProtocolPiece> AddInterceptorWhere(bool prepend, const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize);
AuSPtr<IProtocolPiece> AddInterceptorWhereEx(bool prepend, const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick);
AuSPtr<IProtocolPiece> AddInterceptorWhere(bool bPrepend, const AuSPtr<IProtocolInterceptor> &pInterceptor, AuUInt uOutputBufferSize);
AuSPtr<IProtocolPiece> AddInterceptorWhereEx(bool bPrepend, const AuSPtr<IProtocolInterceptorEx> &pInterceptor, AuUInt uOutputBufferSize, bool bMultipleTick);
AuSPtr<IStreamWriter> AsStreamWriter() override;
AuSPtr<Memory::ByteBuffer> AsWritableByteBuffer() override;

View File

@ -7,7 +7,7 @@
***/
#include "TLS.hpp"
#include "TLSContext.hpp"
#include <Source/IO/Protocol/ProtocolStack.hpp>
#include <Source/IO/Protocol/AuProtocolStack.hpp>
#include <Aurora/IO/Net/NetExperimental.hpp>
#include <Source/IO/Net/AuNetSocket.hpp>
#include <Source/Crypto/X509/x509.hpp>

View File

@ -22,7 +22,7 @@ namespace Aurora::IO
AuCtorCode_t code;
AuUInt32 count {};
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(timeout);
AuUInt64 endTime = AuTime::SteadyClockMS() + AuUInt64(timeout);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, files);
if (!code)
@ -41,7 +41,7 @@ namespace Aurora::IO
if (timeout)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
AuInt64 delta = (AuInt64)endTime - AuTime::SteadyClockMS();
if (delta <= 0)
{
break;
@ -82,7 +82,7 @@ namespace Aurora::IO
AuCtorCode_t code;
AuList<AuSPtr<IAsyncTransaction>> retTransactions;
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(requestedTimeoutMs);
AuUInt64 endTime = AuTime::SteadyClockMS() + AuUInt64(requestedTimeoutMs);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, transactions);
if (!code)
@ -102,7 +102,7 @@ namespace Aurora::IO
if (requestedTimeoutMs)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
AuInt64 delta = (AuInt64)endTime - AuTime::SteadyClockMS();
if (delta <= 0)
{
break;

View File

@ -93,98 +93,115 @@ namespace Aurora::Memory
AUKN_SYM void *_ZRealloc(void *buffer, Types::size_t length, Types::size_t align)
{
void *pRet;
if (AuDebug::IsPointerReserveRange(buffer))
{
return AuDebug::gReserveHeap->ZRealloc(buffer, length, align);
pRet = AuDebug::gReserveHeap->ZRealloc(buffer, length, align);
}
else
{
RemoveBytesFromCounter(::mi_malloc_size(buffer));
auto pRet = ::mi_rezalloc_aligned(buffer, length, align);
pRet = ::mi_rezalloc_aligned(buffer, length, align);
AddBytesToCounter(::mi_malloc_size(pRet));
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("ZAllocEx out of memory");
}
}
return pRet;
}
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("ZAllocEx out of memory");
}
}
return pRet;
}
AUKN_SYM void *_ZRealloc(void *buffer, Types::size_t length)
{
void *pRet;
if (AuDebug::IsPointerReserveRange(buffer))
{
return AuDebug::gReserveHeap->ZRealloc(buffer, length);
pRet = AuDebug::gReserveHeap->ZRealloc(buffer, length);
}
else
{
RemoveBytesFromCounter(::mi_malloc_size(buffer));
auto pRet = ::mi_rezalloc(buffer, length);
pRet = ::mi_rezalloc(buffer, length);
AddBytesToCounter(::mi_malloc_size(pRet));
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("ZAlloc out of memory");
}
}
return pRet;
}
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("ZAlloc out of memory");
}
}
return pRet;
}
AUKN_SYM void *_FRealloc(void *buffer, Types::size_t length, Types::size_t align)
{
void *pRet;
if (AuDebug::IsPointerReserveRange(buffer))
{
return AuDebug::gReserveHeap->FRealloc(buffer, length, align);
pRet = AuDebug::gReserveHeap->FRealloc(buffer, length, align);
}
else
{
RemoveBytesFromCounter(::mi_malloc_size(buffer));
auto pRet = ::mi_realloc_aligned(buffer, length, align);
pRet = ::mi_realloc_aligned(buffer, length, align);
AddBytesToCounter(::mi_malloc_size(pRet));
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("FReallocEx out of memory");
}
}
return pRet;
}
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("FReallocEx out of memory");
}
}
return pRet;
}
AUKN_SYM void *_FRealloc(void *buffer, Types::size_t length)
{
void *pRet;
if (AuDebug::IsPointerReserveRange(buffer))
{
return AuDebug::gReserveHeap->FRealloc(buffer, length);
pRet = AuDebug::gReserveHeap->FRealloc(buffer, length);
}
else
{
RemoveBytesFromCounter(::mi_malloc_size(buffer));
auto pRet = ::mi_realloc(buffer, length);
pRet = ::mi_realloc(buffer, length);
AddBytesToCounter(::mi_malloc_size(pRet));
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("FRealloc out of memory");
}
}
return pRet;
}
if (!pRet)
{
if (gRuntimeConfig.debug.bIsMemoryErrorFatal)
{
SysPanic("FRealloc out of memory");
}
}
return pRet;
}
AUKN_SYM void _Free(void *pHead)
{
if (!pHead)
{
return;
}
if (AuDebug::IsPointerReserveRange(pHead))
{
AuDebug::gReserveHeap->Free(pHead);

View File

@ -186,15 +186,9 @@ namespace Aurora::Parse
i += 2;
}
if (i < in.size())
{
if ((in[i] == 'h' /*some hex dump code and people prefer FFh over 0xFF*/))
{
i++;
}
}
HEX_GRAMMAR_CONSUME_ONE('h'); // some hex dump code and people prefer FFh over 0xFF
HEX_GRAMMAR_CONSUME_EITHER(':' /*wireshark and other networking tools*/, ',' /*latin splitter array*/);
HEX_GRAMMAR_CONSUME_EITHER(':' /*wireshark and other networking tools*/, ',' /*latin array splitter*/);
HEX_GRAMMAR_CONSUME_END_OF_LINE; // all whitespace [+newline[+all whitespace]]
// Consume end literal hint

View File

@ -15,8 +15,8 @@ namespace Aurora::Threading::Primitives
Semaphore::Semaphore(long iIntialValue)
{
this->value_ = iIntialValue;
InitializeSRWLock(&this->lock_);
InitializeConditionVariable(&this->winCond_);
::InitializeSRWLock(&this->lock_);
::InitializeConditionVariable(&this->winCond_);
}
Semaphore::~Semaphore()
@ -40,35 +40,35 @@ namespace Aurora::Threading::Primitives
return (old != 0 && AuAtomicCompareExchange(&this->value_, old - 1, old) == old);
}
bool Semaphore::Lock(AuUInt64 timeout)
bool Semaphore::Lock(AuUInt64 uTimeout)
{
AuUInt64 start = AuTime::SteadyClockMS();
AuUInt64 end = start + timeout;
AuUInt64 uStart = AuTime::SteadyClockMS();
AuUInt64 uEnd = uStart + uTimeout;
AcquireSRWLockShared(&lock_); // we use atomics. using shared is fine, let's not get congested early
::AcquireSRWLockShared(&this->lock_); // we use atomics. using shared is fine, let's not get congested early
while (!TryLock())
{
AuUInt32 timeoutMs = INFINITE;
AuUInt32 dwTimeoutMs = INFINITE;
if (timeout != 0)
if (uTimeout != 0)
{
start = Time::SteadyClockMS();
if (start >= end)
uStart = Time::SteadyClockMS();
if (uStart >= uEnd)
{
ReleaseSRWLockShared(&this->lock_);
::ReleaseSRWLockShared(&this->lock_);
return false;
}
timeoutMs = end - start;
dwTimeoutMs = uEnd - uStart;
}
if (!::SleepConditionVariableSRW(&this->winCond_, &this->lock_, AuUInt32(timeoutMs), CONDITION_VARIABLE_LOCKMODE_SHARED))
if (!::SleepConditionVariableSRW(&this->winCond_, &this->lock_, AuUInt32(dwTimeoutMs), CONDITION_VARIABLE_LOCKMODE_SHARED))
{
ReleaseSRWLockShared(&this->lock_);
::ReleaseSRWLockShared(&this->lock_);
return false;
}
}
ReleaseSRWLockShared(&this->lock_);
::ReleaseSRWLockShared(&this->lock_);
return true;
}
@ -81,10 +81,10 @@ namespace Aurora::Threading::Primitives
void Semaphore::Unlock(long count)
{
AcquireSRWLockShared(&this->lock_);
::AcquireSRWLockShared(&this->lock_);
AuAtomicAdd<AuInt32>(&this->value_, count);
::WakeAllConditionVariable(&this->winCond_);
ReleaseSRWLockShared(&this->lock_);
::ReleaseSRWLockShared(&this->lock_);
}
void Semaphore::Unlock()