AuroraRuntime/Include/Aurora/IO/Net/Net.hpp
Reece 19ebdf3761 Preparing for initial WSA network POC rewrite, not porting gen1 code. Linux support is way behind. Will work on it soon.
[*] Minor refactors
[*] Begin refactoring the Processes subsystem
[*] Added empty files ready to move the gross c++ functional callback in the parse subsystem to a dedicated interface w/ utils
[*] Broke out Win32 Process into an NT base variant (Process.NT.cpp) with Win32 overloaded process exit pokes (Process.Win32.cpp)
2022-01-29 12:36:25 +00:00

476 lines
16 KiB
C++

/***
Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Net.hpp
Date: 2021-9-21
Author: Reece
***/
#pragma once
namespace Aurora::Async
{
struct IThreadPool;
}
namespace Aurora::IO::Net
{
static const AuUInt16 kMagicPortAny = 65535;
struct INetworkStream;
struct IBasicSocket;
struct IClientSocket;
struct IServer;
AUE_DEFINE(ETransportProtocol, (
eProtocolInvalid,
eProtocolUDP,
eProtocolTCP
));
AUE_DEFINE(EIPProtocol, (
eIPProtocolInvalid,
eIPProtocolV4,
eIPProtocolV6
));
struct AUKN_SYM IPAddress
{
EIPProtocol ip;
union
{
AuUInt8 v4[4];
AuUInt16 v6[8];
};
IPAddress();
IPAddress(const AuString &parse);
AuString ToString() const;
bool IsValid() const;
inline operator bool() const
{
return IsValid();
}
inline bool operator ==(const IPAddress &cmp) const
{
if (cmp.ip != this->ip) return false;
if (cmp.ip == EIPProtocol::eIPProtocolV4)
{
return memcmp(cmp.v4, this->v4, sizeof(this->v4)) == 0;
}
else
{
return memcmp(cmp.v6, this->v6, sizeof(this->v6)) == 0;
}
}
};
AUE_DEFINE(ESocketInfo, (
eByDns,
eByEndpoint
));
struct SocketHostName
{
SocketHostName(const AuString &name) : info(ESocketInfo::eByDns), hostname(name), address({})
{}
SocketHostName(const IPAddress &endpoint) : info(ESocketInfo::eByEndpoint), address(endpoint), hostname()
{}
const ESocketInfo info;
const AuString hostname;
const IPAddress address;
inline bool operator ==(const SocketHostName &cmp) const
{
if (cmp.info != this->info) return false;
if (cmp.info == ESocketInfo::eByEndpoint)
{
return cmp.address == this->address;
}
else
{
return cmp.hostname == this->hostname;
}
}
};
struct IPEndpoint
{
IPAddress ip;
AuUInt16 port;
};
struct ConnectionEndpoint
{
ETransportProtocol protocol;
bool tls {};
bool compressed {};
// 0 - destination is a stateless datagram server
// 1 - destination is a psuedo-stateful server
AuUInt32 UDPTimeoutInMS;
};
struct ITLSHandshakeAuthenticate
{
virtual AuSPtr<Crypto::X509::DecodedCertificate> GetCertificate() = 0;
};
AUE_DEFINE(EHandleErrorClass, (
ePinError,
eUserDeny,
eBrokenPacket,
eInvalidCipher,
eBadCert
));
struct TLSHandshakeError
{
EHandleErrorClass error;
AuSPtr<ITLSHandshakeAuthenticate> session;
AuString message;
};
AUKN_INTERFACE(IClientSubscriber,
//
AUI_METHOD(void, OnServerConnectSuccess, (const AuSPtr<IClientSocket> &, socket)),
AUI_METHOD(void, OnServerConnectFailed, (const AuSPtr<IClientSocket> &, socket)),
// DTLS/UDP/TCP/TLS -> TRUE = expects another datagram or read pump
// FALSE = end of socket life
AUI_METHOD(bool, OnSockeData, (const AuSPtr<IClientSocket> &, socket)),
//
AUI_METHOD(void, OnSocketError, (const AuSPtr<IClientSocket> &, socket)),
//
AUI_METHOD(void, OnSocketShutdown, (const AuSPtr<IClientSocket> &, socket))
);
AUKN_INTERFACE(IClientSubscriberTls,
AUI_METHOD(bool, OnVerifySocketCertificate, (const AuSPtr<IBasicSocket> &, socket, const AuSPtr<ITLSHandshakeAuthenticate>, session)),
AUI_METHOD(bool, OnTLSHandleError, (const AuSPtr<IBasicSocket> &, socket, const TLSHandshakeError &, error))
);
AUKN_INTERFACE(IServerSubscriber,
AUI_METHOD(bool, OnClientAccept, (const AuSPtr<IServer> &, server, const AuSPtr<IClientSocket> &, socket)),
AUI_METHOD(bool, OnClientDoS, (const AuSPtr<IServer> &, server, const AuSPtr<IClientSocket> &, socket)),
AUI_METHOD(void, OnClientError, (const AuSPtr<IServer> &, server, const AuSPtr<IClientSocket> &, socket)),
AUI_METHOD(void, OnClientShutdown, (const AuSPtr<IServer> &, server, const AuSPtr<IClientSocket> &, socket)),
AUI_METHOD(bool, OnReadFrame, (const AuSPtr<IServer> &, server, const AuList<AuSPtr<IClientSocket>> &, sockets)),
AUI_METHOD(void, OnShutdown, (const AuSPtr<IServer> &, server))
);
AUKN_INTERFACE(IServerSubscriberTls,
AUI_METHOD(bool, OnClientTLSReport, (const AuSPtr<IServer> &, server, const AuSPtr<IClientSocket> &, socket, const TLSHandshakeError &, error))
);
// TODO: We should introduce another std:: customer overloadable type reproducing hardcoded ascii and an int, basically std::error_code
// Maybe AuErrorCode = [std::, my_fav_stl::]error_code
// AuError = something more flexable
using Error_t = std::error_code;
struct IBasicSocketPrivateContext
{
// force vtbl
virtual ~IBasicSocketPrivateContext()
{}
};
struct SocketStatStream
{
AuUInt64 total;
AuUInt64 averageBytesPerSecond; // interpolated, behind 1 frame
AuUInt64 extrapolatedBytesPerSecond; // this uses an extrapolated time point to predict the network bandwidth of one whole second of data
};
struct SocketStat
{
AuUInt32 timeStartMs;
AuUInt32 uptimeMs;
AuUInt64 bandwidthCost;
SocketStatStream rx;
SocketStatStream rt;
};
enum class EUnderlyingModel
{
eAsync,
eBlocking
};
struct IBasicSocket
{
virtual bool IsActive() = 0;
virtual Error_t GetLastError() = 0;
virtual void Shutdown() = 0;
virtual AuSPtr<IBasicSocketPrivateContext> SetContext(const AuSPtr<IBasicSocketPrivateContext> &newContext) = 0;
virtual AuSPtr<IBasicSocketPrivateContext> GetContext() = 0;
virtual bool GetLocalEndpoint(ConnectionEndpoint &out) = 0;
virtual SocketStat GetStats() = 0;
virtual EUnderlyingModel GetThreadModel() = 0;
};
AUKN_INTERFACE(ISocketSubmissionComplete,
AUI_METHOD(void, OnWriteFinished, (AuUInt, fence))
);
struct StreamConfig
{
bool enableBufferedInput {true};
bool enableBufferedOutput {false};
// 30000 * (4096 * 10) / 1024 / 1024 / 1024
// = 1.1GB per ~1/2 of 2^16
// Plenty of overhead for cpu blts to go brr without becoming
// cpu bound. If you're expecting 30k connections, you can
// lose 1GB to our extended seak interface by default
AuUInt32 bufferedReadSize {4096 * 10}; // see: enableBufferedInput
AuUInt32 bufferedWriteSize {4096 * 10}; // see: enableBufferedOutput
};
struct IClientSocket : public IBasicSocket
{
virtual bool GetRemoteEndpoint(ConnectionEndpoint &out) = 0;
virtual bool GetLocalEndpoint(ConnectionEndpoint &out) = 0;
virtual bool PumpRead() = 0;
virtual bool PumpWrite() = 0;
virtual bool Pump() = 0;
virtual void Run(int idx, AuUInt32 timeout) = 0;
// If memory.ptr is a nullptr, this method immediately returns with the expected write length in memory.out
//
// If all is true and the internal buffer is not saturated enough yet, no data is read and
// zero readable bytes are returned.
//
// If all is false, copies memory.length into memory.ptr, up to memory.length
//
// psuedocode:
// If all is false,
// memory.outVariable = readableBytes
// if memory.ptr,
// begin copy from the internal upto max(memory.length, outVariable) into memory.ptr,
// end
// else
// return readExactly(memory)
//
// NOTE: BufferInputStreamAdhoc usage applys to async reads as well
virtual bool ReadAsync(const Memory::MemoryViewStreamWrite &memory, bool all = false) = 0;
// Atomic
// ReadAsync(memory, false)
// SeekAsync(-memory.outVariable)
virtual bool PeekAsync(const Memory::MemoryViewStreamWrite &memory) = 0;
// Attempts to seek backwards or forwards in the UDP or TCP packet
// If you are under the callstack of a HasXXXHasData callback, you are guaranteed (bufferedReadSize - streamPosition - streamRemaining) bytes backwards
//
virtual bool SeekAsync(int signedDistanceFromCur = 0) = 0;
// When BufferInputStreamAdhoc is called with false / in the default condition, read sync will be constrained by the async buffer
// When BufferInputStreamAdhoc is called with true, you will block until you can get the requested data (all of it if, all = true; any of it, all = false)
virtual bool ReadSync(const Memory::MemoryViewStreamWrite &memory, bool all = true) = 0;
// Writes max(cumulative memory.length per frame, (enableBufferedInput ? bufferedWriteSize : os page allowance)) to the sockets asynchronous stream
// Returns false when no data whatsoever was written, generic error condition
// Returns true when some data was collected, regardless of any errors that may have arose (defer to IServerSubscriber::OnClientError, IClientSubscriber::OnSocketError for error handling)
virtual bool WriteAsync(const Memory::MemoryViewStreamRead &memory) = 0;
// Alternative WriteAsync method that calls a submission notification callback on flush on the dispatching thread
virtual bool WriteAsync(const Memory::MemoryViewStreamRead &memory, AuUInt fence, const AuSPtr<ISocketSubmissionComplete> &input) = 0;
// When BufferInputStreamAdhoc is called with false / in the default condition, read sync will be constrained by the async buffer
// When BufferInputStreamAdhoc is called with true, you will block until you can get the requested data (all of it if, all = true; any of it, all = false)
virtual bool WriteSync(const Memory::MemoryViewStreamRead &memory) = 0;
/**
* Sets the internal application buffer size
* Noting that Linux's default network buffer looks like this:
* Minimum, Initial, Maximum: 10240 87380 12582912 | 10KB, 86KB, 12MB
*/
virtual AuUInt GetInternalInputRingBuffer() = 0;
virtual bool SetInternalInputRingBuffer(AuUInt bytes) = 0;
/**
* Defines the maximum amount of bytes each recieve frame can ingest
*/
virtual void SetRecvLength(AuUInt32 length) = 0;
virtual AuUInt32 GetRecvLength() = 0;
/**
* Enable ad-hoc input buffer ingestion into our internal buffer
* Only use when PumpRead will not suffice by design (ie: sync apps)
*
* When set to true, there is nothing preventing you from consuming a
* DoS-worthy amount of bytes from the stream per frame
*
* This does not disable enableBufferedInput, it merely allows you
* to read beyond enableBufferedInput/bufferedReadSize by accessing
* the os stream page or hitting the network device inline
*
* There may be a performance bottleneck and/or gain depending on your
* circumstances
*
* During high bandwidth transfers, you should set this to true
*
* Default: false (0)
*/
virtual void BufferInputStreamAdhoc(bool value) = 0;
virtual void ConfigureHasDataCallback(bool enabled) = 0;
virtual void ReconfigureStreams(const StreamConfig &config) = 0;
};
struct ILocalClientSocket : public IClientSocket
{
// Connects to the endpoint defined in the ClientConfig
// Completion will be notified by the following callbacks;
// IClientSubscriber::OnServerConnectSuccess,
// IClientSubscriber::OnServerConnectFailed
// returns true on success
virtual bool Connect() = 0;
// Completion will be notified by the following callbacks;
// IClientSubscriber::OnServerConnectSuccess,
// IClientSubscriber::OnServerConnectFailed
//
// ...on any worker thread under a generic pump or read only pump cycle
virtual void ConnectAsync() = 0;
};
struct TlsConnect
{
Aurora::Crypto::X509::Certificate serverCertificate;
AuSPtr<IBasicSocket> socket;
};
struct SocketConfig
{
StreamConfig stream;
};
struct ServerInfo
{
ConnectionEndpoint listen;
AuUInt32 maxSessions;
SocketConfig clientDefaults;
AuSPtr<IServerSubscriber> serverSubscriber;
};
struct TLSServerInfo : ServerInfo
{
Aurora::Crypto::RSAPair cert;
AuSPtr<IServerSubscriberTls> tlsServerSubscriber;
};
struct ClientConfig : SocketConfig
{
SocketHostName socket;
//AuString service;
AuUInt16 port;
bool enableHasDataCallback {true};
AuSPtr<IClientSubscriber> clientSubscriber;
};
struct TLSClientConfig : ClientConfig
{
AuSPtr<IClientSubscriberTls> clientSubscriber;
};
struct IServer : public IBasicSocket
{
virtual void GetClients(AuList<AuSPtr<IClientSocket>> &clients) = 0;
virtual bool Listen() = 0;
virtual void ReconfigureDefaultStream(const StreamConfig &config) = 0;
};
struct ISocketFactory
{
virtual bool NewServer(const ServerInfo &listen, AuSPtr<IServer> &out) = 0;
virtual bool NewTlsServer(const TLSServerInfo &keys, AuSPtr<IServer> &out) = 0;
virtual bool NewClient(const ClientConfig &info, AuSPtr<ILocalClientSocket> &out) = 0;
virtual bool NewTlsClient(const TLSClientConfig &info, AuSPtr<ILocalClientSocket> &out) = 0;
};
struct ServiceEndpoint
{
ETransportProtocol protocol;
AuString hostname;
AuString service;
};
struct INetworkInterface
{
virtual IPEndpoint ResolveSocketSync(const SocketHostName &hostname, AuUInt16 port);
virtual IPEndpoint ResolveServiceSync(const ServiceEndpoint &service);
virtual bool SendDatagramAsync(const ConnectionEndpoint &endpoint, const Memory::MemoryViewRead &memory);
virtual AuSPtr<ISocketFactory> GetSocketFactory() = 0;
};
struct AUKN_SYM WorkRange
{
WorkRange();
AuUInt8 workerOffsetOrAny;
AuUInt8 workerCountOrAny;
};
struct AUKN_SYM WorkPoolGroup
{
WorkPoolGroup();
AuSPtr<Async::IThreadPool> pool; // optional -> defaults to async apps core loop
AuUInt8 asyncWorkGroup;
AuUInt8 asyncWorkPoolOffsetOrAny;
WorkRange range;
};
struct INetworkingPool
{
// A:
virtual AuUInt32 Pump(AuUInt8 workerId) = 0;
// B:
virtual AuUInt32 PumpRead(AuUInt8 workerId) = 0;
virtual AuUInt32 PumpWrite(AuUInt8 workerId) = 0;
// C:
virtual AuUInt32 PollWorker(AuUInt8 workerId) = 0;
virtual AuUInt32 RunWorker(AuUInt8 workerId, AuUInt32 timeout) = 0;
// D: class APIs
virtual bool BeginReadPollingOnWorkQueues(const WorkPoolGroup &workGroup) = 0;
virtual bool BeginSubmissionsOnOnWorkQueues(const WorkPoolGroup &workGroup) = 0;
virtual void StopPollingOnWorkQueues() = 0;
virtual AuUInt8 GetWorkers() = 0;
virtual AuSPtr<INetworkInterface> GetNetworkInterface() = 0;
virtual void Shutdown() = 0;
};
struct NetworkPool
{
AuUInt8 workers {1};
};
AUKN_SHARED_API(CreateNetworkPool, INetworkingPool, const NetworkPool & meta);
}