AuroraRuntime/Include/Aurora/IO/Net/Net.hpp
Reece cf70f0d45c [*/+/-] MEGA COMMIT. ~2 weeks compressed.
The intention is to quickly improve and add util apis, enhance functionality given current demands, go back to the build pipeline, finish that, publish runtime tests, and then use what we have to go back to to linux support with a more stable api.

[+] AuMakeSharedArray
[+] Technet ArgvQuote
[+] Grug subsystem (UNIX signal thread async safe ipc + telemetry flusher + log flusher.)
[+] auEndianness -> Endian swap utils
[+] AuGet<N>(...)
[*] AUE_DEFINE conversion for
        ECompresionType, EAnsiColor, EHashType, EStreamError, EHexDump
[+] ConsoleMessage ByteBuffer serialization
[+] CmdLine subsystem for parsing command line arguments and simple switch/flag checks
[*] Split logger from console subsystem
[+] StartupParameters -> A part of a clean up effort under Process
[*] Refactor SysErrors header + get caller hack
[+] Atomic APIs
[+] popcnt
[+] Ring Buffer sink
[+] Added more standard errors
        Catch,
        Submission,
        LockError,
        NoAccess,
        ResourceMissing,
        ResourceLocked,
        MalformedData,
        InSandboxContext,
        ParseError

[+] Added ErrorCategorySet, ErrorCategoryClear, GetStackTrace
[+] IExitSubscriber, ETriggerLevel
[*] Write bias the high performance RWLockImpl read-lock operation operation
[+] ExitHandlerAdd/ExitHandlerRemove (exit subsystem)
[*] Updated API style
        Digests
[+] CpuId::CpuBitCount
[+] GetUserProgramsFolder
[+] GetPackagePath
[*] Split IStreamReader with an inl file
[*] BlobWriter/BlobReader/BlobArbitraryReader can now take shared pointers to bytebuffers. default constructor allocates a new scalable bytebuffer
[+] ICharacterProvider
[+] ICharacterProviderEx
[+] IBufferedCharacterConsumer
[+] ProviderFromSharedString
[+] ProviderFromString
[+] BufferConsumerFromProvider
[*] Parse Subsystem uses character io bufferer
[*] Rewritten NT's high perf semaphore to use userland SRW/ConVars [like mutex, based on generic semaphore]
[+] ByteBuffer::ResetReadPointer
[*] Bug fix bytebuffer base not reset on free and some scaling issues
[+] ProcessMap -> Added kSectionNameStack, kSectionNameFile, kSectionNameHeap for Section
[*] ProcessMap -> Refactor Segment to Section. I was stupid for keeping a type conflict hack API facing
[+] Added 64 *byte* fast RNG seeds
[+] File Advisorys/File Lock Awareness
[+] Added extended IAuroraThread from OS identifier caches for debug purposes
[*] Tweaked how memory is reported on Windows. Better consistency of what values mean across functions.
[*] Broke AuroraUtils/Typedefs out into a separate library
[*] Update build script
[+] Put some more effort into adding detail to the readme before rewriting it, plus, added some media
[*] Improved public API documentation
[*] Bug fix `SetConsoleCtrlHandler`
[+] Locale TimeDateToFileNameISO8601
[+] Console config stdOutShortTime
[*] Begin using internal UTF8/16 decoders when platform support isnt available (instead of stl)
[*] Bug fixes in decoders
[*] Major bug fix, AuMax
[+] RateLimiter
[+] Binary file sink
[+] Log directory sink
[*] Data header usability (more operators)
[+] AuRemoveRange
[+] AuRemove
[+] AuTryRemove
[+] AuTryRemoveRange
[+] auCastUtils
[+] Finish NewLSWin32Source
[+] AuTryFindByTupleN, AuTryRemoveByTupleN
[+] Separated AuRead/Write types, now in auTypeUtils
[+] Added GetPosition/SetPosition to FileWriter
[*] Fix stupid AuMin in place of AuMax in SpawnThread.Unix.Cpp
[*] Refactored Arbitrary readers to SeekingReaders (as in, they could be atomic and/or parallelized, and accept an arbitrary position as a work parameter -> not Seekable, as in, you can simply set the position)
[*] Hack back in the sched deinit
[+] File AIO loop source interop
[+] Begin to prototype a LoopQueue object I had in mind for NT, untested btw
[+] Stub code for networking
[+] Compression BaseStream/IngestableStreamBase
[*] Major: read/write locks now support write-entrant read routines.
[*] Compression subsystem now uses the MemoryView concept
[*] Rewrite the base stream compressions, made them less broken
[*] Update hashing api
[*] WriterTryGoForward and ReaderTryGoForward now revert to the previous relative index instead of panicing
[+] Added new AuByteBuffer apis
    Trim, Pad, WriteFrom, WriteString, [TODO: ReadString]
[+] Added ByteBufferPushReadState
[+] Added ByteBufferPushWriteState
[*] Move from USC-16 to full UTF-16. Win32 can handle full UTF-16.
[*] ELogLevel is now an Aurora enum
[+] Raised arbitrary limit in header to 255, the max filter buffer
[+] Explicit GZip support
[+] Explicit Zip support
[+] Added [some] compressors

et al
2022-02-17 00:11:40 +00:00

484 lines
16 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Net.hpp
Date: 2021-9-21
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IThreadPool;
}
namespace Aurora::IO::Net
{
static const AuUInt16 kMagicPortAny = 0;
struct INetworkStream;
struct IBasicSocket;
struct ISocket;
struct IServer;
AUE_DEFINE(ETransportProtocol, (
eProtocolInvalid,
eProtocolUDP,
eProtocolTCP
));
AUE_DEFINE(EIPProtocol, (
eIPProtocolInvalid,
eIPProtocolV4,
eIPProtocolV6
));
struct AUKN_SYM IPAddress
{
EIPProtocol ip;
union
{
AuUInt8 v4[4];
AuUInt16 v6[8];
};
IPAddress();
IPAddress(const AuString &parse);
AuString ToString() const;
bool IsValid() const;
inline operator bool() const
{
return IsValid();
}
inline bool operator ==(const IPAddress &cmp) const
{
if (cmp.ip != this->ip) return false;
if (cmp.ip == EIPProtocol::eIPProtocolV4)
{
return AuMemcmp(cmp.v4, this->v4, sizeof(this->v4)) == 0;
}
else
{
return AuMemcmp(cmp.v6, this->v6, sizeof(this->v6)) == 0;
}
}
};
AUE_DEFINE(ESocketInfo, (
eByDns,
eByEndpoint
));
struct SocketHostName
{
SocketHostName(const AuString &name) : info(ESocketInfo::eByDns), hostname(name), address({})
{}
SocketHostName(const IPAddress &endpoint) : info(ESocketInfo::eByEndpoint), address(endpoint), hostname()
{}
const ESocketInfo info;
const AuString hostname;
const IPAddress address;
inline bool operator ==(const SocketHostName &cmp) const
{
if (cmp.info != this->info) return false;
if (cmp.info == ESocketInfo::eByEndpoint)
{
return cmp.address == this->address;
}
else
{
return cmp.hostname == this->hostname;
}
}
};
struct IPEndpoint
{
IPAddress ip;
AuUInt16 port;
AuUInt8 hint[32] {0};
};
struct ConnectionEndpoint
{
ETransportProtocol protocol;
IPEndpoint ip;
bool tls {};
bool compressed {};
// 0 - destination is a stateless datagram server
// 1 - destination is a psuedo-stateful server
AuUInt32 UDPTimeoutInMS;
};
struct ITLSHandshakeAuthenticate
{
virtual AuSPtr<Crypto::X509::DecodedCertificate> GetCertificate() = 0;
};
AUE_DEFINE(EHandleErrorClass, (
ePinError,
eUserDeny,
eBrokenPacket,
eInvalidCipher,
eBadCert
));
struct TLSHandshakeError
{
EHandleErrorClass error;
AuSPtr<ITLSHandshakeAuthenticate> session;
AuString message;
};
AUKN_INTERFACE(IClientSubscriber,
//
AUI_METHOD(void, OnServerConnectSuccess, (const AuSPtr<ISocket> &, socket)),
AUI_METHOD(void, OnServerConnectFailed, (const AuSPtr<ISocket> &, socket)),
// DTLS/UDP/TCP/TLS -> TRUE = expects another datagram or read pump
// FALSE = end of socket life
AUI_METHOD(bool, OnSocketData, (const AuSPtr<ISocket> &, socket)),
//
AUI_METHOD(void, OnSocketError, (const AuSPtr<ISocket> &, socket)),
//
AUI_METHOD(void, OnSocketShutdown, (const AuSPtr<ISocket> &, socket))
);
AUKN_INTERFACE(IClientSubscriberTls,
AUI_METHOD(bool, OnVerifySocketCertificate, (const AuSPtr<IBasicSocket> &, socket, const AuSPtr<ITLSHandshakeAuthenticate>, session)),
AUI_METHOD(bool, OnTLSHandleError, (const AuSPtr<IBasicSocket> &, socket, const TLSHandshakeError &, error))
);
AUKN_INTERFACE(IServerSubscriber,
AUI_METHOD(bool, OnClientAccept, (const AuSPtr<IServer> &, server, const AuSPtr<ISocket> &, socket)),
AUI_METHOD(bool, OnClientDoS, (const AuSPtr<IServer> &, server, const AuSPtr<ISocket> &, socket)),
AUI_METHOD(void, OnClientError, (const AuSPtr<IServer> &, server, const AuSPtr<ISocket> &, socket)),
AUI_METHOD(void, OnClientShutdown, (const AuSPtr<IServer> &, server, const AuSPtr<ISocket> &, socket)),
AUI_METHOD(bool, OnReadFrame, (const AuSPtr<IServer> &, server, const AuList<AuSPtr<ISocket>> &, sockets)),
AUI_METHOD(void, OnShutdown, (const AuSPtr<IServer> &, server))
);
AUKN_INTERFACE(IServerSubscriberTls,
AUI_METHOD(bool, OnClientTLSReport, (const AuSPtr<IServer> &, server, const AuSPtr<ISocket> &, socket, const TLSHandshakeError &, error))
);
// TODO: We should introduce another std:: customer overloadable type reproducing hardcoded ascii and an int, basically std::error_code
// Maybe AuErrorCode = [std::, my_fav_stl::]error_code
// AuError = something more flexable
using Error_t = std::error_code;
struct IBasicSocketPrivateContext
{
// force vtbl
virtual ~IBasicSocketPrivateContext()
{}
};
struct SocketStatStream
{
AuUInt64 total;
AuUInt64 averageBytesPerSecond; // interpolated, behind 1 frame
AuUInt64 extrapolatedBytesPerSecond; // this uses an extrapolated time point to predict the network bandwidth of one whole second of data
};
struct SocketStat
{
AuUInt32 timeStartMs;
AuUInt32 uptimeMs;
AuUInt64 bandwidthCost;
SocketStatStream rx;
SocketStatStream rt;
};
enum class EUnderlyingModel
{
eAsync,
eBlocking
};
struct IBasicSocket
{
virtual bool IsActive() = 0;
virtual Error_t GetLastError() = 0;
virtual void Shutdown() = 0;
virtual AuSPtr<IBasicSocketPrivateContext> SetContext(const AuSPtr<IBasicSocketPrivateContext> &newContext) = 0;
virtual AuSPtr<IBasicSocketPrivateContext> GetContext() = 0;
virtual bool GetLocalEndpoint(ConnectionEndpoint &out) = 0;
virtual SocketStat GetStats() = 0;
virtual EUnderlyingModel GetThreadModel() = 0;
};
AUKN_INTERFACE(ISocketSubmissionComplete,
AUI_METHOD(void, OnWriteFinished, (AuUInt, fence))
);
struct StreamConfig
{
bool enableBufferedInput {true};
bool enableBufferedOutput {false};
// 30000 * (4096 * 10) / 1024 / 1024 / 1024
// = 1.1GB per ~1/2 of 2^16
// Plenty of overhead for cpu blts to go brr without becoming
// cpu bound. If you're expecting 30k connections, you can
// lose 1GB to our extended seak interface by default
AuUInt32 bufferedReadSize {4096 * 10}; // see: enableBufferedInput
AuUInt32 bufferedWriteSize {4096 * 10}; // see: enableBufferedOutput
};
struct ISocketChannel
{
// If memory.ptr is a nullptr, this method immediately returns with the expected write length in memory.out
//
// If all is true and the internal buffer is not saturated enough yet, no data is read and
// zero readable bytes are returned.
//
// If all is false, copies memory.length into memory.ptr, up to memory.length
//
// psuedocode:
// If all is false,
// memory.outVariable = readableBytes
// if memory.ptr,
// begin copy from the internal upto max(memory.length, outVariable) into memory.ptr,
// end
// else
// return readExactly(memory)
//
// NOTE: BufferInputStreamAdhoc usage applys to async reads as well
virtual bool ReadAsync(const Memory::MemoryViewStreamWrite &memory, bool all = false) = 0;
// Atomic
// ReadAsync(memory, false)
// SeekAsync(-memory.outVariable)
virtual bool PeekAsync(const Memory::MemoryViewStreamWrite &memory) = 0;
// Attempts to seek backwards or forwards in the UDP or TCP packet
// If you are under the callstack of a HasXXXHasData callback, you are guaranteed (bufferedReadSize - streamPosition - streamRemaining) bytes backwards
//
virtual bool SeekAsync(int signedDistanceFromCur = 0) = 0;
// When BufferInputStreamAdhoc is called with false / in the default condition, read sync will be constrained by the async buffer
// When BufferInputStreamAdhoc is called with true, you will block until you can get the requested data (all of it if, all = true; any of it, all = false)
virtual bool ReadSync(const Memory::MemoryViewStreamWrite &memory, bool all = true) = 0;
// Writes max(cumulative memory.length per frame, (enableBufferedInput ? bufferedWriteSize : os page allowance)) to the sockets asynchronous stream
// Returns false when no data whatsoever was written, generic error condition
// Returns true when some data was collected, regardless of any errors that may have arose (defer to IServerSubscriber::OnClientError, IClientSubscriber::OnSocketError for error handling)
virtual bool WriteAsync(const Memory::MemoryViewStreamRead &memory) = 0;
// Alternative WriteAsync method that calls a submission notification callback on flush on the dispatching thread
virtual bool WriteAsync(const Memory::MemoryViewStreamRead &memory, AuUInt fence, const AuSPtr<ISocketSubmissionComplete> &input) = 0;
// When BufferInputStreamAdhoc is called with false / in the default condition, read sync will be constrained by the async buffer
// When BufferInputStreamAdhoc is called with true, you will block until you can get the requested data (all of it if, all = true; any of it, all = false)
virtual bool WriteSync(const Memory::MemoryViewStreamRead &memory) = 0;
/**
* Sets the internal application buffer size
* Noting that Linux's default network buffer looks like this:
* Minimum, Initial, Maximum: 10240 87380 12582912 | 10KB, 86KB, 12MB
*/
virtual AuUInt GetInternalInputRingBuffer() = 0;
virtual bool SetInternalInputRingBuffer(AuUInt bytes) = 0;
/**
* Defines the maximum amount of bytes each recieve frame can ingest
*/
virtual void SetRecvLength(AuUInt32 length) = 0;
virtual AuUInt32 GetRecvLength() = 0;
/**
* Enable ad-hoc input buffer ingestion into our internal buffer
* Only use when PumpRead will not suffice by design (ie: sync apps)
*
* When set to true, there is nothing preventing you from consuming a
* DoS-worthy amount of bytes from the stream per frame
*
* This does not disable enableBufferedInput, it merely allows you
* to read beyond enableBufferedInput/bufferedReadSize by accessing
* the os stream page or hitting the network device inline
*
* There may be a performance bottleneck and/or gain depending on your
* circumstances
*
* During high bandwidth transfers, you should set this to true
*
* Default: false (0)
*/
virtual void BufferInputStreamAdhoc(bool value) = 0;
virtual void ConfigureHasDataCallback(bool enabled) = 0;
virtual void ReconfigureStreams(const StreamConfig &config) = 0;
};
struct IBasicSocketThreaded : public IBasicSocket
{
virtual bool PumpRead() = 0;
virtual bool PumpWrite() = 0;
virtual bool Pump() = 0;
virtual void Run(int idx, AuUInt32 timeout) = 0;
};
struct ISocket : public IBasicSocketThreaded, public ISocketChannel
{
virtual bool GetRemoteEndpoint(ConnectionEndpoint &out) = 0;
};
struct ILocalClientSocket : public ISocket
{
// Connects to the endpoint defined in the ClientConfig
// Completion will be notified by the following callbacks;
// IClientSubscriber::OnServerConnectSuccess,
// IClientSubscriber::OnServerConnectFailed
// returns true on success
virtual bool Connect() = 0;
// Completion will be notified by the following callbacks;
// IClientSubscriber::OnServerConnectSuccess,
// IClientSubscriber::OnServerConnectFailed
//
// ...on any worker thread under a generic pump or read only pump cycle
virtual void ConnectAsync() = 0;
};
struct TlsConnect
{
Aurora::Crypto::X509::Certificate serverCertificate;
AuSPtr<IBasicSocket> socket;
};
struct SocketConfig
{
StreamConfig stream;
};
struct ServerInfo
{
ConnectionEndpoint listen;
AuUInt32 maxSessions;
SocketConfig clientDefaults;
AuSPtr<IServerSubscriber> serverSubscriber;
};
struct TLSServerInfo : ServerInfo
{
Aurora::Crypto::RSAPair cert;
AuSPtr<IServerSubscriberTls> tlsServerSubscriber;
};
struct ClientConfig : SocketConfig
{
SocketHostName socket;
//AuString service;
AuUInt16 port;
bool enableHasDataCallback {true};
AuSPtr<IClientSubscriber> clientSubscriber;
};
struct TLSClientConfig : ClientConfig
{
AuSPtr<IClientSubscriberTls> clientSubscriber;
};
struct IServer : public IBasicSocketThreaded
{
virtual bool GetLocalEndpoint(ConnectionEndpoint &out) = 0;
virtual void GetClients(AuList<AuSPtr<IBasicSocketThreaded>> &clients) = 0;
virtual bool Listen() = 0;
virtual void ReconfigureDefaultStream(const StreamConfig &config) = 0;
};
struct ISocketFactory
{
virtual bool NewServer(const ServerInfo &listen, AuSPtr<IServer> &out) = 0;
virtual bool NewTlsServer(const TLSServerInfo &keys, AuSPtr<IServer> &out) = 0;
virtual bool NewClient(const ClientConfig &info, AuSPtr<ILocalClientSocket> &out) = 0;
virtual bool NewTlsClient(const TLSClientConfig &info, AuSPtr<ILocalClientSocket> &out) = 0;
};
struct ServiceEndpoint
{
ETransportProtocol protocol;
AuString hostname;
AuString service;
};
struct INetworkInterface
{
virtual AuList<IPEndpoint> ResolveSocketSync(const SocketHostName &hostname, AuUInt16 port) = 0;
virtual AuList<IPEndpoint> ResolveServiceSync(const ServiceEndpoint &service) = 0;
virtual bool SendDatagramAsync(const ConnectionEndpoint &endpoint, const Memory::MemoryViewRead &memory) = 0;
virtual bool SendDatagramAsync(const AuSPtr<IServer> &datagramServer, const ConnectionEndpoint &endpoint, const Memory::MemoryViewRead &memory) = 0;
virtual AuSPtr<ISocketFactory> GetSocketFactory() = 0;
};
struct AUKN_SYM WorkRange
{
WorkRange();
AuUInt8 workerOffsetOrAny;
AuUInt8 workerCountOrAny;
};
struct AUKN_SYM WorkPoolGroup
{
WorkPoolGroup();
AuSPtr<Async::IThreadPool> pool; // optional -> defaults to async apps core loop
AuUInt8 asyncWorkGroup;
AuUInt8 asyncWorkPoolOffsetOrAny;
WorkRange range;
};
struct INetworkingPool
{
// A:
virtual AuUInt32 Pump(AuUInt8 workerId) = 0;
// B:
virtual AuUInt32 PumpRead(AuUInt8 workerId) = 0;
virtual AuUInt32 PumpWrite(AuUInt8 workerId) = 0;
// C:
virtual AuUInt32 PollWorker(AuUInt8 workerId) = 0;
virtual AuUInt32 RunWorker(AuUInt8 workerId, AuUInt32 timeout) = 0;
// D:
virtual bool BeginReadPollingOnWorkQueues(const WorkPoolGroup &workGroup) = 0;
virtual bool BeginSubmissionsOnOnWorkQueues(const WorkPoolGroup &workGroup) = 0;
virtual void StopPollingOnWorkQueues() = 0;
// E:
// ..
virtual AuUInt8 GetWorkers() = 0;
virtual AuSPtr<INetworkInterface> GetNetworkInterface() = 0;
virtual void Shutdown() = 0;
};
struct NetworkPool
{
AuUInt8 workers {1};
};
AUKN_SHARED_API(CreateNetworkPool, INetworkingPool, const NetworkPool & meta);
}