[+] AuUtility::ThroughputCalculator

[+] AuNet::ISocketStats
[+] AuNet::ISocketChannel::GetRecvStats()
[+] AuNet::ISocketChannel::GetSendStats()
[+] AuIO::IOProcessor::RunTickEx(AuUInt32 dwTimeout)
[*] Refactor clock APIs
[+] Documentation in headers
[+] AuIO::IIOPipeWork::GetStartTickMS()
[+] AuIO::IIOPipeWork::GetLastTickMS()
[+] AuIO::IIOPipeWork::GetPredictedThroughput()
[+] AuIO::IIOPipeWork::GetBytesProcessed()
This commit is contained in:
Reece Wilson 2022-12-06 22:53:37 +00:00
parent fe3db644bd
commit 7be2d3fbdc
30 changed files with 793 additions and 122 deletions

View File

@ -14,18 +14,17 @@ namespace Aurora::Threading::Threads
namespace Aurora::Time
{
AUKN_SYM AuUInt64 CurrentClockMS();
AUKN_SYM AuInt64 CurrentClockMS();
}
namespace Aurora::Console
{
class ConsoleMessage
struct ConsoleMessage
{
public:
EAnsiColor color;
AuString prefix;
AuString line;
AuUInt64 time;
AuInt64 time;
AuThreadId_t tid;
inline ConsoleMessage()
@ -50,7 +49,7 @@ namespace Aurora::Console
AUKN_SYM void Read(Memory::ByteBuffer &deserialize);
AUKN_SYM void Write(Memory::ByteBuffer &serialize) const;
AUKN_SYM AuString StringifyTime(bool simple = false) const;
AUKN_SYM AuString StringifyTime(bool bSimple = false) const;
AUKN_SYM AuString StringifyTimeUTC() const;
AUKN_SYM AuString GetWrappedTag() const;
AUKN_SYM AuString ToConsole() const;

View File

@ -14,8 +14,8 @@ namespace Aurora::IO
AUKN_INTERFACE(IIOBufferedStreamAvailable,
/**
* @brief
* @return false restores the read head, will retrieve further callbacks
* true nothing
* @return false Restores the read head. This will not stop further callbacks until the IOWorkItem is explicitly aborted.
* true Nothing. If you moved the read head forward, you must return true to prevent roll-back.
*/
AUI_METHOD(bool, OnDataAvailable, (Memory::ByteBuffer &, view))
);

View File

@ -12,15 +12,39 @@ namespace Aurora::IO
struct IIOPipeWork
{
/**
* @brief
* @brief begins the operation
* @return
*/
virtual bool Start() = 0;
/**
* @brief
* @brief terminates the operation
* @return
*/
virtual bool End() = 0;
/**
* @brief
* @return
*/
virtual AuInt64 GetStartTickMS() = 0;
/**
* @brief time from the au epoch. see: AuTime::.
* @return
*/
virtual AuInt64 GetLastTickMS() = 0;
/**
* @brief time from the au epoch. see: AuTime::.
* @return
*/
virtual double GetPredictedThroughput() = 0;
/**
* @brief bytes written to the stream (this will be one frame behind in the callback routine, if in on buffered data mode)
* @return
*/
virtual AuUInt64 GetBytesProcessed() = 0;
};
}

View File

@ -17,44 +17,145 @@ namespace Aurora::IO
{
struct IIOProcessor
{
// Local thread:
// note: IOProcessors running over an AuAsync runner or is getting pumped by an external loop queue (NewIOProcessor(...), NewIOProcessorOnThread(...)) need not use the following apis:
virtual AuUInt32 TryTick () = 0;
virtual AuUInt32 RunTick () = 0;
virtual AuUInt32 TickFor (const AuSPtr<IIOProcessorItem> &ioEvent) = 0;
virtual AuUInt32 RunTickEx (AuUInt32 dwTimeout) = 0;
virtual AuUInt32 TickFor (const AuSPtr<IIOProcessorItem> &pIoEvent) = 0;
/**
* @brief force a tick
*/
virtual AuUInt32 ManualTick () = 0;
/**
* @brief to be used with bTickOnly primarily. wakes up the thread at a set frequency.
*/
virtual AuUInt64 SetRefreshRate (AuUInt64 ns) = 0;
/**
* @brief to be used with bTickOnly primarily. wakes up the thread at a set frequency.
*/
virtual bool HasRefreshRate () = 0;
/**
* @brief returns the aurora threadid of the intended caller (see: AuThreads::GetThreadId())
*/
virtual AuUInt64 GetOwnedThreadId () = 0;
// The following apis are thread safe:
/**
* @brief Used to poll if there are any items worth [try/blocking] ticking over.
* @return returns true if there are callbacks, watches, or other registered work available.
* @note you can use me as a while (AuIsThreadRunning() && pIoProcessor->[IsAlive/HasItems]()) { pIoProcessor->[RunTickEx(uTimeout); }
*/
virtual bool HasItems() = 0;
// the following are apis used to register triggers (usually loop sources) on the thread/processor/loop queue/whatever
/**
* @brief low level wait api
*/
virtual AuSPtr<IIOProcessorItem> StartIOWatch (const AuSPtr<IIOWaitableItem> &pSource, const AuSPtr<IIOEventListener> &pEventListener) = 0;
/**
* @brief low level wait api
*/
virtual AuSPtr<IIOProcessorItem> StartIOWatchEx (const AuSPtr<IIOWaitableItem> &pSource, const AuSPtr<IIOEventListener> &pEventListener, bool bSingleshot) = 0;
/**
* @brief low level wait api with simple callback
*/
virtual AuSPtr<IIOProcessorItem> StartSimpleIOWatch (const AuSPtr<IIOWaitableItem> &pSource, const AuSPtr<IIOSimpleEventListener> &pEventListener) = 0;
/**
* @brief low level wait api with simple callback
*/
virtual AuSPtr<IIOProcessorItem> StartSimpleIOWatchEx(const AuSPtr<IIOWaitableItem> &pSource, const AuSPtr<IIOSimpleEventListener> &pEventListener, bool bSingleshot) = 0;
/**
* @brief Registers a callback given a loopsource (multi-trigger)
* @param pSource
* @param pEventListener
* @return
*/
virtual AuSPtr<IIOProcessorItem> StartSimpleLSWatch (const AuSPtr<Loop::ILoopSource> &pSource, const AuSPtr<IIOSimpleEventListener> &pEventListener) = 0;
/**
* @brief Registers an optionally singleshot callback given a loopsource
* @param pSource
* @param pEventListener
* @param dwMsTimeout optional timeout
* @return
*/
virtual AuSPtr<IIOProcessorItem> StartSimpleLSWatchEx(const AuSPtr<Loop::ILoopSource> &pSource, const AuSPtr<IIOSimpleEventListener> &pEventListener, bool bSingleshot, AuUInt32 dwMsTimeout = 0) = 0;
// general purpose work callbacks
/**
* @brief Dispatches an abortable callback to the processor thread
* @param pWork
* @return
*/
virtual bool SubmitIOWorkItem (const AuSPtr<IIOProcessorWorkUnit> &pWork) = 0;
// wake up
/**
* @brief you almost certainly will not need me. so far i've come across one legitmate use case for glib-io interop and thats about it.
*/
virtual void WakeupThread () = 0;
// io sequence callbacks
/**
* @brief Register an inter-frame callback indicating point of execution throughout the [START](optional [yield])[IO][IO TICK][WORK ITEMS][EPILOGUE][END OF FRAME] frame
* @param eventListener
* @return
*/
virtual bool AddEventListener (const AuSPtr<IIOProcessorEventListener> &pEventListener) = 0;
/**
* @brief Deregisters an inter-frame callback listener
* @param pEventListener
*/
virtual void RemoveEventListener (const AuSPtr<IIOProcessorEventListener> &pEventListener) = 0;
// factories backed by this processor to provide additional functionality
/**
* @brief provides an IO pipe processor for creating pipe requests
* @return
*/
virtual AuSPtr<IIOPipeProcessor> ToPipeProcessor () = 0;
// internal and utility apis
/**
* @brief Utility proprety access to the underlying IO wait surface: a loop queue
* @return
* @warning Do not use me unless you know what you're doing.
* There is very little reason to pull the underlying loop queue from here.
* You are probably doing something wrong. IOProcessors are meant to sit on top of
* queues as an abstraction layer for intelligent work dispatch, global stream processing
* and dispatch of io work callbacks. pulling a loopqueue from such a layer of abstraction
* would be like pulling the state of an upcoming WaitMultipleObjects call; it makes no sense.
*
* [note to future deadbeat maintainers: that doesnt mean you should remove it either]
*/
virtual AuSPtr<Loop::ILoopQueue> ToQueue () = 0;
virtual bool MultiplexRefreshRateWithIOEvents() = 0;
virtual AuUInt64 GetOwnedThreadId () = 0;
virtual AuSPtr<IIOProcessorItem> StartIOWatch (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener) = 0;
virtual AuSPtr<IIOProcessorItem> StartIOWatchEx (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOEventListener> &listener, bool singleshot) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleIOWatch (const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleIOWatchEx(const AuSPtr<IIOWaitableItem> &object, const AuSPtr<IIOSimpleEventListener> &listener, bool singleshot) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleLSWatch (const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener) = 0;
virtual AuSPtr<IIOProcessorItem> StartSimpleLSWatchEx(const AuSPtr<Loop::ILoopSource> &source, const AuSPtr<IIOSimpleEventListener> &listener, bool singleshot, AuUInt32 msTimeout = 0) = 0;
virtual bool SubmitIOWorkItem (const AuSPtr<IIOProcessorWorkUnit> &work) = 0;
virtual void WakeupThread () = 0;
// Inter-frame callbacks indicating point of execution throughout the [START](optional [yield])[IO][IO TICK][WORK ITEMS][EPILOGUE][END OF FRAME] frame
virtual bool AddEventListener (const AuSPtr<IIOProcessorEventListener> &eventListener) = 0;
virtual void RemoveEventListener (const AuSPtr<IIOProcessorEventListener> &eventListener) = 0;
virtual AuSPtr<IIOPipeProcessor> ToPipeProcessor () = 0;
// Reference loop queue
virtual AuSPtr<Loop::ILoopQueue> ToQueue () = 0;
/**
* @brief resets the io processors footprint in ::ToQueue()
*/
virtual void ReleaseAllWatches () = 0;
virtual bool HasItems () = 0;
};
/**

View File

@ -193,5 +193,17 @@ namespace Aurora::IO::Net
* @return
*/
virtual AuUInt GetOutputBufferSize() = 0;
/**
* @brief
* @return
*/
virtual AuSPtr<ISocketStats> GetRecvStats() = 0;
/**
* @brief
* @return
*/
virtual AuSPtr<ISocketStats> GetSendStats() = 0;
};
}

View File

@ -0,0 +1,38 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ISocketStats.hpp
Date: 2022-12-06
Author: Reece
***/
#pragma once
namespace Aurora::IO::Net
{
struct ISocketStats
{
/**
* @brief time from the au epoch. see: AuTime::.
* @return
*/
virtual AuInt64 GetFirstTickTimeMS() = 0;
/**
* @brief time from the au epoch. see: AuTime::.
* @return
*/
virtual AuInt64 GetLastTickTimeMS() = 0;
/**
* @brief
* @return
*/
virtual AuUInt64 GetTotalBytesTransferred() = 0;
/**
* @brief
* @return bytes / second
*/
virtual double GetApproximatedThroughput() = 0;
};
}

View File

@ -21,6 +21,7 @@
#include "ISocketBase.hpp"
#include "ISocket.hpp"
#include "ISocketStats.hpp"
#include "ISocketDriver.hpp"
#include "ISocketChannel.hpp"
#include "ISocketDriverFactory.hpp"

View File

@ -29,27 +29,6 @@ namespace Aurora::Time
*/
AUKN_SYM tm NormalizeCivilTimezone(const tm &in, ETimezoneShift shiftFrom = ETimezoneShift::eUTC);
/**
Retrieves system clock in jiffies
*/
AUKN_SYM AuUInt64 CurrentClock();
/**
Retrieves system clock in milliseconds from the Aurora epoch
*/
AUKN_SYM AuUInt64 CurrentClockMS();
/**
Retrieves system clock in nanoseconds from the Aurora epoch
*/
AUKN_SYM AuUInt64 CurrentClockNS();
AUKN_SYM AuUInt64 CurrentClockSteady();
AUKN_SYM AuUInt64 CurrentClockSteadyMS();
AUKN_SYM AuUInt64 CurrentClockSteadyNS();
/**
Translates the Aurora epoch to the standard unix epoch
*/
@ -70,16 +49,58 @@ namespace Aurora::Time
*/
AUKN_SYM AuInt64 ConvertUnixToAuroraNS(AuInt64 in);
/**
Retrieves wall clock in milliseconds from the Aurora epoch
*/
AUKN_SYM AuInt64 CurrentClockMS();
/**
Retrieves wall clock in nanoseconds from the Aurora epoch
*/
AUKN_SYM AuInt64 CurrentClockNS();
/**
* @brief Steady clock in jiffies
* @return
*/
AUKN_SYM AuUInt64 SteadyClock();
/**
Returns a steady system clock of SteadyClockJiffies() with an undefined epoch.
These values should be used to drive thread primitives, IO time, and tick delta.
On a modern plaform, these should be affected by the users' calendar or NTP.
On stinkier platforms, who cares if we can run mostly bug free with an assumed-sane wall-clock, right?
*/
AUKN_SYM AuUInt64 SteadyClockMS();
/**
* @brief
* @return
*/
AUKN_SYM AuUInt64 SteadyClockNS();
/**
Retrieves the freqency of jiffies per second
*/
AUKN_SYM AuUInt64 SteadyClockJiffies();
/**
Returns a high resolution count of jiffies with an undefined epoch from a
high resolution clock.
These values should be used to drive benchmarks.
These values should not nor can be accurately converted meaningfully
*/
AUKN_SYM AuUInt64 CurrentInternalClock();
AUKN_SYM AuUInt64 CurrentInternalClockMS();
AUKN_SYM AuUInt64 CurrentInternalClockNS();
AUKN_SYM AuUInt64 HighResClock();
AUKN_SYM AuUInt64 HighResClockMS();
AUKN_SYM AuUInt64 HighResClockNS();
/**
Retrieves the freqency of jiffies per second
*/
AUKN_SYM AuUInt64 HighResClockJiffies();
/**
Let's say you're fucked and you need a ball park figure.
@ -90,23 +111,20 @@ namespace Aurora::Time
/**
Converts seconds from the Aurora epoch to time_t
@deprecated
*/
AUKN_SYM time_t SToCTime(AuInt64 time);
/**
Converts nanoseconds from the Aurora epoch to time_t
@deprecated
*/
AUKN_SYM time_t NSToCTime(AuInt64 time);
/**
Converts milliseconds from the Aurora epoch to time_t
@deprecated
*/
AUKN_SYM time_t MSToCTime(AuInt64 time);
AUKN_SYM AuInt64 CTimeToMS(time_t time);
AUKN_SYM AuInt64 CTimeToMS(time_t time);
/**
Retrieves the freqency as a fraction of: jiffies per second / 1 * nanoseconds in a second
@ -118,8 +136,4 @@ namespace Aurora::Time
*/
AUKN_SYM double CPUFrequencyDeltaMS();
/**
Retrieves the freqency of jiffies per second
*/
AUKN_SYM AuUInt64 ClockJiffies();
}

View File

@ -19,12 +19,12 @@ namespace Aurora::Time
void Start()
{
start_ = CurrentInternalClockNS();
start_ = HighResClockNS();
}
AuUInt64 End()
{
auto re = std::clamp<AuInt64>(CurrentInternalClockNS() - start_, AuInt64(0), AuNumericLimits<AuInt64>::max());
auto re = std::clamp<AuInt64>(HighResClockNS() - start_, AuInt64(0), AuNumericLimits<AuInt64>::max());
if (re == AuNumericLimits<AuInt64>::max()) return 0; // ez overflow in subtract. get out of here in 2-3 branches
return re;
}

View File

@ -34,7 +34,7 @@ namespace Aurora::Utility
inline auline void SetNextStep(AuUInt64 nsTimeStep)
{
this->nsTimeStep = nsTimeStep;
this->nextTriggerTime = nsTimeStep + Aurora::Time::CurrentInternalClockNS();
this->nextTriggerTime = nsTimeStep + Aurora::Time::HighResClockNS();
}
inline auline bool CheckExchangePass()
@ -46,7 +46,7 @@ namespace Aurora::Utility
return false;
}
if (cur <= Aurora::Time::CurrentInternalClockNS())
if (cur <= Aurora::Time::HighResClockNS())
{
if (this->nsTimeStep)
{
@ -68,7 +68,7 @@ namespace Aurora::Utility
return false;
}
if (this->nextTriggerTime <= Aurora::Time::CurrentInternalClockNS())
if (this->nextTriggerTime <= Aurora::Time::HighResClockNS())
{
return true;
}
@ -90,7 +90,7 @@ namespace Aurora::Utility
{
if (this->noCatchUp)
{
return this->nsTimeStep + Aurora::Time::CurrentInternalClockNS();
return this->nsTimeStep + Aurora::Time::HighResClockNS();
}
else
{

View File

@ -0,0 +1,143 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThroughputCalculator.hpp
Date: 2022-12-06
Author: Reece
***/
#pragma once
namespace Aurora::Utility
{
struct ThroughputCalculator
{
// call me arbitrarily
double inline OnUpdate(AuUInt uUnit)
{
this->uTotal += uUnit;
this->uTotalLifetime += uUnit;
OnTick();
return this->dCurFreq;
}
// or:
void inline AddData(AuUInt uUnit)
{
this->uTotal += uUnit;
this->uTotalLifetime += uUnit;
}
void inline AddSampleSampleTick() // at around 1 tick/sec
{
OnTick();
}
// then call me arbitrarily:
double inline GetEstimatedHertz() const
{
auto uNow = Aurora::Time::SteadyClockNS();
auto uDelta = uNow - this->uLast;
// we cannot do anything on frame zero
if (!this->dCurFreq)
{
return 0;
}
static const auto kOneSecond = AuMSToNS<AuUInt64>(AuSToMS<AuUInt64>(1));
auto dDeltaWeight = ((double)uDelta / (double)kOneSecond);
// is overshooting over 1s?
if (uDelta > kOneSecond)
{
if (uDelta > this->uLastDelta)
{
// we're in uncharted territory. delaying til frame +1 and overshooting by +1 should be fine.
return 0;
}
// # otherwise return constant last tick freqency of this->dCurFreq
//return this->dCurFreq;
// ## dNextFactor: we are at-least the amount of bytes in pending frame since last tick, over the time period of at least 1 second
return AuMax<double>(/*dNextFactor: (effectively unit/time)*/ double(this->uTotal) / dDeltaWeight, /* ordinarily a large unit-frame within the tick wouldn't jitter the throughput bc OnTick would normalized the value.
however, once over a second, it makes sense to account for pending bytes/`this->uTotal` / time into frame as a baseline.
if we're under a second since the last tick, we can simply extrapolate from the normalized frequency, to
the current frames `this->uTotal` / uDelta, in a "linear" manner. ok, not so linear, ubytes can change, but it should
give us realistic smoothed out bandwidth statistics. it's more like two fractions of time where one is inverted.
see: the last expression
*/
this->dCurFreq /* use the normalized frequency. a single small frame shouldn't significantly jitter the throughput. */);
}
// otherwise return live frequency...
if (uDelta > this->uLastDelta) // last normalized this->dCurFreq, if not current frame if suddenly peaking
{
return this->dCurFreq;
return AuMax<double>(/*dNextFactor: (effectively unit/time)*/ double(this->uTotal) / dDeltaWeight, this->dCurFreq);
}
else // fractional lerp
{
//return this->dCurFreq;
double dNextFactor = double(this->uTotal) * dDeltaWeight;
// ...by calculating the weight of the current tick in terms of delta between now and the previous frame.
// the bit we don't know will be the last frames throughput freqency added to the current throughput * delta.
// combined, we get a transition to this->uTotal whose starting point is accelerated by this->dCurFreq
return (dNextFactor) +
(this->dCurFreq * (double(1.f) - dDeltaWeight) /*last frame units/s extrapolated forward by the unit of time that hasn't passed*/);
}
}
AuUInt64 inline GetTotalStats()
{
return this->uTotalLifetime;
}
AuUInt64 inline GetLastFrameTimeSteady()
{
return this->uLast;
}
AuInt64 inline GetLastFrameTimeWall()
{
return this->uLastWall;
}
private:
void inline OnTick()
{
auto uNow = Aurora::Time::SteadyClockNS();
auto uDelta = uNow - this->uLast;
this->uLastDelta = uDelta;
this->uLast = uNow;
this->uLastWall = Aurora::Time::CurrentClockMS();
auto dAbsMax = double(this->uTotal) / (double(uDelta) / (double)AuMSToNS<AuUInt64>(AuSToMS<AuUInt64>(1)));
auto dAbsMin = this->dCurFreq * (double)0.5f;
dAbsMax = (dAbsMax * 0.5) + (dAbsMin * 0.5);// without this, our numbers are too noisey and far ahead of ThroughputCalculator.hpp.buffered.
// in either scenario, ThroughputCalculator.hpp.buffered matches system specs on IO tests more closely but it still massively overshoots.
// adding this one line of normalization gets this approximation down to system monitoring utilities somewhat accurately.
// we still overshoot. the buffered variant could beat us at some point, but this original implementation seems to be much more lightweight.
// id question if its worth a switch over. i think this is close enough & justifys its cheapness
this->dCurFreq = AuMax(dAbsMin, dAbsMax);
this->uTotal = 0;
}
AuUInt64 uTotal {};
AuUInt64 uTotalLifetime {};
AuUInt64 uLastDelta {};
AuInt64 uLastWall {};
AuUInt64 uLast {};
double dCurFreq {};
};
}

View File

@ -0,0 +1,137 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: ThroughputCalculator.hpp
Date: 2022-12-06
Author: Reece
***/
#pragma once
namespace Aurora::Utility
{
struct ThroughputCalculator
{
// call me arbitrarily
double inline OnUpdate(AuUInt uUnit)
{
AU_LOCK_GUARD(this->lock);
OnTick(uUnit);
return this->dCurFreq;
}
// then call me arbitrarily:
double inline GetEstimatedHertz()
{
AU_LOCK_GUARD(this->lock);
auto uNow = Aurora::Time::SteadyClockNS();
auto uDelta = uNow - this->uLast;
// we cannot do anything on frame zero
if (!this->dCurFreq)
{
return 0;
}
return this->dCurFreq;
}
AuUInt inline GetTotalStats()
{
return this->uTotalLifetime;
}
AuInt64 inline GetLastFrameTimeWall()
{
return this->uLastWall;
}
private:
void inline OnTick(AuUInt64 uUnit)
{
struct EntryTable
{
AuUInt32 uBits {};
AuUInt32 uTimeDelta {};
};
auto uNow = Aurora::Time::SteadyClockNS();
auto uDelta = uNow - this->uLast;
AuMemmove(uVecTable + 1, uVecTable, sizeof(uVecTable) - sizeof(*uVecTable));
auto pTable = ((EntryTable *)uVecTable);
bool bFirst = !this->uLast;
this->uLast = uNow;
if (bFirst)
{
return;
}
this->uLastWall = Aurora::Time::CurrentClockMS();
static const auto kOneSecond = AuMSToNS<AuUInt64>(AuSToMS<AuUInt64>(1));
pTable[0].uBits = (AuUInt32)uUnit;
pTable[0].uTimeDelta = (AuUInt32)(uDelta / 1000ull);
if (!pTable[0].uTimeDelta)
{
pTable[0].uTimeDelta = 1;
}
if (uVecSize < 10)
{
uVecSize++;
}
AuLogDbg("Added: {} {}", pTable[0].uBits, pTable[0].uTimeDelta * 1000ull);
double dTotal {};
double dTotalBits {};
double dTotalTime {};
double dSamples {};
for (AU_ITR_N(i, uVecSize))
{
if (!pTable[i].uTimeDelta)
{
pTable[i].uTimeDelta = 1;
}
double dDeltaSeconds = ((double)((AuUInt64)pTable[i].uTimeDelta * 1000ull) / (double)kOneSecond);
dTotalBits += pTable[i].uBits;
dTotalTime += dDeltaSeconds;
dTotal += (double)pTable[i].uBits / dDeltaSeconds;
dSamples++;
}
AuLogDbg("{}/{} ({})", dTotal, dSamples, uVecSize);
this->dCurFreq = dTotal / dSamples;
double a = dTotalBits / dTotalTime;
//if (this->dCurFreq > a)
//{
//}
//this->dCurFreq = AuMin(a, this->dCurFreq);
}
AuUInt8 uVecSize {};
AuUInt64 uVecTable[10];
AuThreadPrimitives::SpinLock lock;
AuUInt64 uLast {};
AuUInt uTotalLifetime {};
AuInt64 uLastWall {};
double dCurFreq {};
};
}

View File

@ -163,6 +163,26 @@ namespace Aurora::IO
&this->endCallback));
}
AuInt64 IOPipeWork::GetLastTickMS()
{
return this->throughput_.GetLastFrameTimeWall();
}
AuInt64 IOPipeWork::GetStartTickMS()
{
return this->iStartTickMS_;
}
double IOPipeWork::GetPredictedThroughput()
{
return this->throughput_.GetEstimatedHertz();
}
AuUInt64 IOPipeWork::GetBytesProcessed()
{
return this->uBytesWritten_;
}
void IOPipeWork::PrepareStream()
{
if (!this->buffer_.IsEmpty())
@ -386,16 +406,16 @@ namespace Aurora::IO
this->bShouldReadNext = true;
}
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered
// stream back to the start
// Prevent fucky end of allocation issues by moving the tail end of a partially buffered stream back to the start
// Should help with pacing massive files, where faster hardware can just vruum through a smaller buffer, leaving
// a load of small deserializable packets at the start of a large buffer, for the CPU to immediately start failing OnDataAvailable's
// much later into the stream, where a larger packet may overhang into memory we haven't reserved
// Should help with packing massive files, where faster disks can spin through smaller frames, leaving
// the CPU to catch up towards the end of the buffer, at which point the linearity breaks.
// We must instead force linearity, and with the assumption we can move peekable memory around, we must eventually
// move the tail end of the buffer back to the start, just so we can continue that stream view linearity.
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer
// We'd need to know if this pass failed, and if the read head is near the end, it'd know
// to wrap back around to zero. An overengineered pain and liability.
// I really don't know how ReadNextAsync can be expected to wrap around a ring buffer.
// We'd need to know if this pass failed, and if the read head is near the end, it'd know to wrap back around to zero.
// An overengineered pain and liability.
// This should work
@ -428,6 +448,8 @@ namespace Aurora::IO
this->uBytesWritten_ += bytesProcessedTotal;
this->throughput_.OnUpdate(bytesProcessedTotal);
if (this->request.pListener)
{
this->request.pListener->OnPipePartialEvent(bytesProcessedTotal);

View File

@ -7,6 +7,8 @@
***/
#pragma once
#include <Aurora/Utility/ThroughputCalculator.hpp>
namespace Aurora::IO
{
struct IOProcessor;
@ -50,6 +52,13 @@ namespace Aurora::IO
virtual bool End() override;
virtual AuInt64 GetLastTickMS() override;
virtual AuInt64 GetStartTickMS() override;
virtual AuUInt64 GetBytesProcessed() override;
virtual double GetPredictedThroughput() override;
void RunOnThread();
void TerminateOnThread(bool bError = false);
@ -90,6 +99,7 @@ namespace Aurora::IO
IOPipeCallback output;
IOWorkStart startCallback;
IOWorkEnd endCallback;
AuUInt64 iStartTickMS_ {};
bool bActive {true};
AuUInt32 uBufferSize_ {};
AuUInt32 uFrameCap_ {};
@ -97,6 +107,7 @@ namespace Aurora::IO
AuUInt uBytesWrittenLimit_ {};
AuUInt uBytesWrittenTarget_ {};
AuByteBuffer buffer_;
Utility::ThroughputCalculator throughput_;
};

View File

@ -158,7 +158,7 @@ namespace Aurora::IO
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
AuThreading::Sleep(1);
AuIO::IOSleep(1);
return 0;
}
@ -174,6 +174,27 @@ namespace Aurora::IO
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::RunTickEx(AuUInt32 dwTimeout)
{
if (!CheckThread())
{
SysPushErrorGeneric("Wrong Thread");
AuIO::IOSleep(1);
return 0;
}
this->bScheduled_ = false;
this->bFrameStart = false;
AU_LOCK_GUARD(this->items.mutex);
FrameStart();
FrameWaitForAny(dwTimeout);
FramePumpWaitingBlocked();
FrameRunThreadIO();
FrameRunCheckLSes();
return FrameRunEpilogue();
}
AuUInt32 IOProcessor::ManualTick()
{
if (!CheckThread())
@ -447,7 +468,7 @@ namespace Aurora::IO
if (this->IsTickOnly())
{
auto next = this->timers.nbTicker.nextTriggerTime;
auto now = AuTime::CurrentInternalClockNS();
auto now = AuTime::HighResClockNS();
if (now >= next)
{
@ -586,7 +607,7 @@ namespace Aurora::IO
if (!this->timers.nbTicker.nextTriggerTime)
{
this->timers.nbTicker.nextTriggerTime = AuTime::CurrentInternalClockNS() + this->timers.nbTicker.nsTimeStep;
this->timers.nbTicker.nextTriggerTime = AuTime::HighResClockNS() + this->timers.nbTicker.nsTimeStep;
}
}

View File

@ -26,6 +26,7 @@ namespace Aurora::IO
AuUInt32 TryTick() override;
AuUInt32 RunTick() override;
AuUInt32 RunTickEx(AuUInt32 dwTimeout) override;
AuUInt32 TickFor(const AuSPtr<IIOProcessorItem> &ioEvent) override;
bool TickForRegister(const AuSPtr<IIOProcessorItem> &ioEvent);

View File

@ -240,7 +240,7 @@ namespace Aurora::IO::Loop
{
AU_LOCK_GUARD(this->sourceMutex_);
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::CurrentClockSteadyMS() + maxTimeout, SourceCallbacks {})))
if (!AuTryInsert(this->addedSources_, AuMakeTuple(source, AuTime::SteadyClockMS() + maxTimeout, SourceCallbacks {})))
{
return false;
}
@ -586,7 +586,7 @@ namespace Aurora::IO::Loop
count = this->handleArrayAnd_.size();
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 startTime = AuTime::HighResClockMS();
AuUInt64 endTime = startTime + timeout;
for (const auto &source : this->loopSourceExs_)
@ -598,7 +598,7 @@ namespace Aurora::IO::Loop
{
auto next = AuMin(count - index, AuUInt32(MAXIMUM_WAIT_OBJECTS));
startTime = AuTime::CurrentInternalClockMS();
startTime = AuTime::HighResClockMS();
if (timeout)
{
@ -657,7 +657,7 @@ namespace Aurora::IO::Loop
// Le great iterate
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::CurrentClockSteadyMS();
auto now = AuTime::SteadyClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
auto &source = *queueIterator.itr;
@ -724,7 +724,7 @@ namespace Aurora::IO::Loop
AuUInt32 ret {};
bool lastItr {};
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 startTime = AuTime::HighResClockMS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
@ -746,7 +746,7 @@ namespace Aurora::IO::Loop
{
AuList<AuSPtr<ILoopSource>> trigger;
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 startTime = AuTime::HighResClockMS();
AuUInt64 endTime = timeout ? (startTime + timeout) : AuUInt64(-1);
AuUInt32 chuggerIndex {};
@ -822,7 +822,7 @@ namespace Aurora::IO::Loop
auto sleepMS = this->slowTickMs_;
if (internalEndTime && internalEndTime != AuUInt64(-1))
{
auto now = AuTime::CurrentInternalClockMS();
auto now = AuTime::HighResClockMS();
auto delta = AuInt64(internalEndTime) - AuInt64(now);
if (delta <= 0)
@ -915,7 +915,7 @@ namespace Aurora::IO::Loop
}
else
{
auto now = AuTime::CurrentInternalClockMS();
auto now = AuTime::HighResClockMS();
if (internalEndTime <= now)
{
return false;
@ -977,7 +977,7 @@ namespace Aurora::IO::Loop
{
Iterator queueIterator(this);
AuSInt indexOffset {};
auto now = AuTime::CurrentInternalClockMS();
auto now = AuTime::HighResClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{
bool shouldRemove {false};
@ -1249,7 +1249,7 @@ namespace Aurora::IO::Loop
Iterator queueIterator(this);
bool bRebuildFromAnd {};
auto now = AuTime::CurrentInternalClockMS();
auto now = AuTime::HighResClockMS();
for (queueIterator.Start(); queueIterator.End() != queueIterator.itr; )
{

View File

@ -270,6 +270,9 @@ namespace Aurora::IO::Net
void SocketBase::SendOnData()
{
auto pReadableBuffer = this->socketChannel_.AsReadableByteBuffer();
auto pStartOffset = pReadableBuffer->readPtr;
if (this->bHasFinalized_)
{
if (this->socketChannel_.pRecvProtocol)
@ -300,6 +303,9 @@ namespace Aurora::IO::Net
}
this->ToChannel()->ScheduleOutOfFrameWrite();
auto uHeadDelta = pReadableBuffer->readPtr - pStartOffset;
this->socketChannel_.GetRecvStatsEx().AddBytes(uHeadDelta);
}
const NetError &SocketBase::GetError()

View File

@ -159,6 +159,26 @@ namespace Aurora::IO::Net
this->pSendProtocol = pSendProtocol;
}
AuSPtr<ISocketStats> SocketChannel::GetRecvStats()
{
return AuSPtr<ISocketStats>(this->pParent_->SharedFromThis(), &this->recvStats_);
}
AuSPtr<ISocketStats> SocketChannel::GetSendStats()
{
return AuSPtr<ISocketStats>(this->pParent_->SharedFromThis(), &this->sendStats_);
}
SocketStats &SocketChannel::GetSendStatsEx()
{
return this->sendStats_;
}
SocketStats &SocketChannel::GetRecvStatsEx()
{
return this->recvStats_;
}
bool SocketChannel::IsValid()
{
return bool(this->outputChannel.IsValid()) &&

View File

@ -10,6 +10,7 @@
#include "AuNetWriteQueue.hpp"
#include "AuNetSocketChannelInput.hpp"
#include "AuNetSocketChannelOutput.hpp"
#include "AuSocketStats.hpp"
namespace Aurora::IO::Net
{
@ -67,6 +68,12 @@ namespace Aurora::IO::Net
void SpecifySendProtocol(const AuSPtr<Protocol::IProtocolStack> &pSendProtocol) override;
AuSPtr<ISocketStats> GetRecvStats() override;
AuSPtr<ISocketStats> GetSendStats() override;
SocketStats & GetSendStatsEx();
SocketStats & GetRecvStatsEx();
void Establish();
@ -89,7 +96,10 @@ namespace Aurora::IO::Net
AuSPtr<Protocol::IProtocolStack> pRecvProtocol;
AuSPtr<Protocol::IProtocolStack> pSendProtocol;
private:
SocketBase * pParent_;
SocketStats sendStats_;
SocketStats recvStats_;
};
}

View File

@ -214,6 +214,8 @@ namespace Aurora::IO::Net
{
this->pParent_->ToWorkerEx()->DecrementIOEventTaskCounter();
AuStaticCast<SocketChannel>(this->pParent_->ToChannel())->GetSendStatsEx().AddBytes(length);
if (!length)
{
this->pParent_->SendErrorBeginShutdown({});

View File

@ -0,0 +1,45 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuSocketStats.cpp
Date: 2022-12-06
Author: Reece
***/
#include "Networking.hpp"
#include "AuSocketStats.hpp"
namespace Aurora::IO::Net
{
void SocketStats::AddBytes(AuUInt32 uBytes)
{
this->calculator.AddData(uBytes);
auto uNow = AuTime::CurrentClockMS();
if (!this->iFirstTime)
{
this->iFirstTime = uNow;
}
this->iLastTime = uNow;
}
AuInt64 SocketStats::GetFirstTickTimeMS()
{
return this->iFirstTime;
}
AuInt64 SocketStats::GetLastTickTimeMS()
{
return this->iLastTime;
}
AuUInt64 SocketStats::GetTotalBytesTransferred()
{
return this->calculator.GetTotalStats();
}
double SocketStats::GetApproximatedThroughput()
{
return this->calculator.GetEstimatedHertz();
}
}

View File

@ -0,0 +1,32 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: AuSocketStats.hpp
Date: 2022-12-06
Author: Reece
***/
#pragma once
#include <Aurora/Utility/ThroughputCalculator.hpp>
namespace Aurora::IO::Net
{
struct SocketStats : ISocketStats
{
void AddBytes(AuUInt32 uBytes);
virtual AuInt64 GetFirstTickTimeMS() override;
virtual AuInt64 GetLastTickTimeMS() override;
virtual AuUInt64 GetTotalBytesTransferred() override;
virtual double GetApproximatedThroughput() override;
private:
Utility::ThroughputCalculator calculator;
AuInt64 iFirstTime {};
AuInt64 iLastTime {};
AuUInt64 uTotalTransferred {};
};
}

View File

@ -79,7 +79,7 @@ namespace Aurora::Threading
#endif
auto status = YieldPollNs(true, qwTimeout + Time::CurrentInternalClockNS(), [=]()
auto status = YieldPollNs(true, qwTimeout + Time::HighResClockNS(), [=]()
{
return false;
});

View File

@ -51,7 +51,7 @@ namespace Aurora::Threading
static inline void _FastSnooze(long &count, AuUInt64 &startTime, AuUInt64 maxStallNS, int &alpha, int &bravo, bool &forceSpin) //, bool yieldFaster , long maxStallMS = 20)
{
// TODO: rewrite me
AuUInt64 now = Time::CurrentInternalClockNS();
AuUInt64 now = Time::HighResClockNS();
// Begin least likely checks, we're getting on now
// Ironically we need to burn off some CPU cycles
@ -142,7 +142,7 @@ namespace Aurora::Threading
long count = 0;
unsigned long long a = Time::CurrentInternalClockNS();
unsigned long long a = Time::HighResClockNS();
do
{
if (permitMultipleContextSwitches)
@ -158,7 +158,7 @@ namespace Aurora::Threading
{
return true;
}
a = Time::CurrentInternalClockNS();
a = Time::HighResClockNS();
} while ((!timeoutMs) || (timeoutMs > a));
@ -167,7 +167,7 @@ namespace Aurora::Threading
AUKN_SYM bool YieldPollNs(bool permitMultipleContextSwitches, AuUInt64 timeoutNs, PollCallback_cb cb)
{
AuUInt64 time = Time::CurrentInternalClockNS();
AuUInt64 time = Time::HighResClockNS();
if (cb())
{
@ -196,7 +196,7 @@ namespace Aurora::Threading
AUKN_SYM bool YieldPoll(bool permitMultipleContextSwitches, AuUInt64 timeoutMs, PollCallback_cb cb)
{
AuUInt64 time = Time::CurrentInternalClockNS();
AuUInt64 time = Time::HighResClockNS();
AuUInt64 timeoutNs = timeoutMs ? (time + (timeoutMs * 1000000)) : 0;
if (cb())

View File

@ -33,7 +33,7 @@ namespace Aurora::Threading::Primitives
{
AU_LOCK_GUARD(this->mutex_);
AuInt64 startTime = Time::CurrentClockSteadyMS();
AuInt64 startTime = Time::SteadyClockMS();
AuInt64 endTime = startTime + timeout;
while (!AtomicIsEventSet())
@ -42,7 +42,7 @@ namespace Aurora::Threading::Primitives
if (timeout)
{
timeoutMs = endTime - static_cast<AuInt64>(Time::CurrentClockSteadyMS());
timeoutMs = endTime - static_cast<AuInt64>(Time::SteadyClockMS());
if (timeoutMs < 0)
{
return false;

View File

@ -52,7 +52,7 @@ namespace Aurora::Threading::Primitives
AcquireSRWLockShared(&this->atomicHolder_);
AuInt64 startTime = Time::CurrentClockSteadyMS();
AuInt64 startTime = Time::SteadyClockMS();
AuInt64 endTime = startTime + timeout;
BOOL status = false;
@ -62,7 +62,7 @@ namespace Aurora::Threading::Primitives
if (timeout != 0)
{
startTime = Time::CurrentClockSteadyMS();
startTime = Time::SteadyClockMS();
if (startTime >= endTime)
{
goto exitWin32;

View File

@ -42,7 +42,7 @@ namespace Aurora::Threading::Primitives
bool Semaphore::Lock(AuUInt64 timeout)
{
AuUInt64 start = AuTime::CurrentClockSteadyMS();
AuUInt64 start = AuTime::SteadyClockMS();
AuUInt64 end = start + timeout;
AcquireSRWLockShared(&lock_); // we use atomics. using shared is fine, let's not get congested early
@ -52,7 +52,7 @@ namespace Aurora::Threading::Primitives
if (timeout != 0)
{
start = Time::CurrentClockSteadyMS();
start = Time::SteadyClockMS();
if (start >= end)
{
ReleaseSRWLockShared(&this->lock_);

View File

@ -65,7 +65,7 @@ namespace Aurora::Threading::Primitives
}
else
{
AuUInt64 startTime = AuTime::CurrentInternalClockMS();
AuUInt64 startTime = AuTime::HighResClockMS();
AuUInt64 endTime = startTime + timeout;
while (AuAtomicTestAndSet(&this->value_, 0))
@ -73,7 +73,7 @@ namespace Aurora::Threading::Primitives
long count = 0;
while (value_)
{
if (endTime <= AuTime::CurrentInternalClockMS())
if (endTime <= AuTime::HighResClockMS())
{
return false;
}

View File

@ -144,27 +144,27 @@ namespace Aurora::Time
return CalculateTimeT<std::chrono::milliseconds>(time);
}
AUKN_SYM AuUInt64 CurrentClock()
AUKN_SYM AuInt64 CurrentClock()
{
return NormalizeEpoch(sys_clock::now().time_since_epoch()).count();
}
AUKN_SYM AuUInt64 CurrentClockMS()
AUKN_SYM AuInt64 CurrentClockMS()
{
return std::chrono::duration_cast<std::chrono::milliseconds>(NormalizeEpoch(sys_clock::now().time_since_epoch())).count();
}
AUKN_SYM AuUInt64 CurrentClockNS()
AUKN_SYM AuInt64 CurrentClockNS()
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(NormalizeEpoch(sys_clock::now().time_since_epoch())).count();
}
AUKN_SYM AuUInt64 CurrentClockSteady()
AUKN_SYM AuUInt64 SteadyClock()
{
return CurrentClockSteadyNS();
return SteadyClockNS();
}
AUKN_SYM AuUInt64 CurrentClockSteadyMS()
AUKN_SYM AuUInt64 SteadyClockMS()
{
#if defined(AURORA_IS_POSIX_DERIVED)
::timespec spec {};
@ -181,7 +181,7 @@ namespace Aurora::Time
return std::chrono::duration_cast<std::chrono::milliseconds>(steady_clock::now().time_since_epoch()).count();
}
AUKN_SYM AuUInt64 CurrentClockSteadyNS()
AUKN_SYM AuUInt64 SteadyClockNS()
{
#if defined(AURORA_IS_POSIX_DERIVED)
::timespec spec {};
@ -203,18 +203,24 @@ namespace Aurora::Time
return std::chrono::duration_cast<std::chrono::milliseconds>(NormalizeEpoch(sys_clock::from_time_t(time).time_since_epoch())).count();
}
AUKN_SYM AuUInt64 CurrentInternalClock()
AUKN_SYM AuUInt64 HighResClock()
{
return high_res_clock::now().time_since_epoch().count();
}
AUKN_SYM AuUInt64 CurrentInternalClockMS()
AUKN_SYM AuUInt64 HighResClockMS()
{
#if defined(AURORA_IS_POSIX_DERIVED)
return SteadyClockMS();
#endif
return std::chrono::duration_cast<std::chrono::milliseconds>(high_res_clock::now().time_since_epoch()).count();
}
AUKN_SYM AuUInt64 CurrentInternalClockNS()
AUKN_SYM AuUInt64 HighResClockNS()
{
#if defined(AURORA_IS_POSIX_DERIVED)
return SteadyClockNS();
#endif
return std::chrono::duration_cast<std::chrono::nanoseconds>(high_res_clock::now().time_since_epoch()).count();
}
@ -244,7 +250,7 @@ namespace Aurora::Time
if (epochDelta == 0)
{
epochDelta = CurrentClockMS() - CurrentInternalClockMS();
epochDelta = CurrentClockMS() - HighResClockMS();
}
return epochDelta + in;
@ -256,7 +262,7 @@ namespace Aurora::Time
if (epochDelta == 0)
{
epochDelta = CurrentClockNS() - CurrentInternalClockNS();
epochDelta = CurrentClockNS() - HighResClockNS();
}
return epochDelta + in;
@ -282,7 +288,33 @@ namespace Aurora::Time
return frequency = (static_cast<double>(high_res_clock::period::num) / static_cast<double>(high_res_clock::period::den) * 1'000.f);
}
AUKN_SYM AuUInt64 ClockJiffies()
AUKN_SYM AuUInt64 SteadyClockJiffies()
{
static AuUInt64 frequency = 0;
if (frequency != 0)
{
return frequency;
}
#if defined(AURORA_COMPILER_MSVC)
return frequency = _Query_perf_frequency();
#endif
#if defined(AURORA_IS_POSIX_DERIVED)
::timespec spec {};
if (::clock_getres(CLOCK_MONOTONIC, &spec) == 0)
{
if (spec.tv_nsec && !spec.tv_sec)
{
return frequency = 1000000000ull / spec.tv_nsec;
}
}
#endif
return frequency = static_cast<double>(steady_clock::period::den) / static_cast<double>(steady_clock::period::num);
}
AUKN_SYM AuUInt64 HighResClockJiffies()
{
static AuUInt64 frequency = 0;
if (frequency != 0)