[+] ILoopSource::WaitOn(u32: timeout)

[+] ms2tv
This commit is contained in:
Reece Wilson 2022-04-12 22:26:15 +01:00
parent 74613d61e0
commit 457d263fb0
23 changed files with 173 additions and 27 deletions

View File

@ -9,9 +9,18 @@
namespace Aurora::IO::FS
{
// WARNING: ASYNC FILE IS SINGLE THREAD ONLY.
// THESE APIS ARE NOT YET MT SAFE
// LOOP SOURCES ARE THE ONLY EXCEPTION
AUKN_SHARED_API(OpenAsync, IAsyncFileStream, const AuString &path, EFileOpenMode openMode, bool directIO = false, EFileAdvisoryLockLevel lock = EFileAdvisoryLockLevel::eNoSafety);
/// ~param transactions Array of FIO transactions
/// @param timeout Timeout in milliseconds, zero = infinity
/**
* Waits for at least one file IO event
* @param transactions An array mask of FIO transactions
* @param timeout Timeout in milliseconds, zero = indefinite
*/
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeout);
}

View File

@ -22,5 +22,12 @@ namespace Aurora::Loop
* @return
*/
virtual ELoopSource GetType() = 0;
/**
* @breif Blocks the current thread for the kernel primitives
* @warning Are you looking for LoopQueues? You can even reduce async threads down to kernel ILoopQueue's
*/
virtual bool WaitOn(AuUInt32 timeout = 0) = 0;
};
}

View File

@ -70,6 +70,11 @@ namespace Aurora::Loop
return IsSignaledFromNonblockingImpl(this, this, &LSEvent::IsSignaledNonblocking);
}
bool LSEvent::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
bool LSEvent::IsSignaledNonblocking()
{
if (!this->atomicRelease_)

View File

@ -20,7 +20,8 @@ namespace Aurora::Loop
virtual bool OnTrigger(AuUInt handle) override;
bool IsSignaled() override;
virtual bool IsSignaled() override;
virtual bool WaitOn(AuUInt32 timeout) override;
virtual ELoopSource GetType() override;
private:

View File

@ -36,6 +36,11 @@ namespace Aurora::Loop
return LSHandle::IsSignaled();
}
bool LSEvent::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
ELoopSource LSEvent::GetType()
{
return ELoopSource::eSourceEvent;

View File

@ -18,7 +18,8 @@ namespace Aurora::Loop
bool Set() override;
bool Reset() override;
bool ILSEvent::IsSignaled() override;
virtual bool ILSEvent::IsSignaled() override;
virtual bool ILSEvent::WaitOn(AuUInt32 timeout) override;
virtual ELoopSource ILSEvent::GetType() override;
};
}

View File

@ -50,6 +50,11 @@ namespace Aurora::Loop
return IsSignaledFromNonblockingImpl(this, this, &LSMutex::IsSignaledNonblocking);
}
bool LSMutex::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
bool LSMutex::IsSignaledNonblocking()
{
AuUInt64 oldSemaphoreValue {};

View File

@ -20,6 +20,7 @@ namespace Aurora::Loop
virtual bool OnTrigger(AuUInt handle) override;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
virtual ELoopSource GetType() override;
private:

View File

@ -29,6 +29,11 @@ namespace Aurora::Loop
return LSHandle::IsSignaled();
}
bool Mutex::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
ELoopSource Mutex::GetType()
{
return ELoopSource::eSourceMutex;

View File

@ -19,6 +19,7 @@ namespace Aurora::Loop
bool Unlock() override;
bool ILSMutex::IsSignaled() override;
bool ILSMutex::WaitOn(AuUInt32 timeout) override;
ELoopSource ILSMutex::GetType() override;
};
}

View File

@ -59,6 +59,11 @@ namespace Aurora::Loop
return IsSignaledFromNonblockingImpl(this, this, &LSSemaphore::IsSignaledNonblocking);
}
bool LSSemaphore::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
ELoopSource LSSemaphore::GetType()
{
return ELoopSource::eSourceSemaphore;

View File

@ -20,6 +20,7 @@ namespace Aurora::Loop
virtual bool OnTrigger(AuUInt handle) override;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
virtual ELoopSource GetType() override;
private:

View File

@ -30,6 +30,11 @@ namespace Aurora::Loop
return LSHandle::IsSignaled();
}
bool LSSemaphore::WaitOn(AuUInt32 timeout)
{
return LSHandle::WaitOn(timeout);
}
ELoopSource LSSemaphore::GetType()
{
return ELoopSource::eSourceSemaphore;

View File

@ -19,6 +19,7 @@ namespace Aurora::Loop
bool AddOne() override;
bool ILSSemaphore::IsSignaled() override;
bool ILSSemaphore::WaitOn(AuUInt32 timeout) override;
ELoopSource ILSSemaphore::GetType() override;
};
}

View File

@ -16,6 +16,13 @@ namespace Aurora::Loop
return PeekMessageW(&askers, 0, 0, 0, PM_NOREMOVE);
}
bool Win32Dummy::WaitOn(AuUInt32 timeout)
{
// TODO: Close
static HANDLE kAlwaysOffMutex = CreateEventW(nullptr, true, nullptr);
return MsgWaitForMultipleObjects(0, &kAlwaysOffMutex, false, timeout ? timeout : INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1;
}
ELoopSource Win32Dummy::GetType()
{
return ELoopSource::eSourceWin32;

View File

@ -17,6 +17,7 @@ namespace Aurora::Loop
const bool bIsPumping_;
bool IsSignaled() override;
bool WaitOn(AuUInt32 timeout) override;
ELoopSource GetType() override;
};
}

View File

@ -10,7 +10,7 @@
namespace Aurora::Loop
{
bool WaitSingleGeneric::WaitForAtleastOne(const AuList<AuUInt> &handles, AuUInt &one)
bool WaitSingleGeneric::WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, AuUInt &one)
{
if (handles.empty())
{
@ -20,18 +20,22 @@ namespace Aurora::Loop
if (handles.size() == 1)
{
one = 0;
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handles.at(0)), 0, true) == WAIT_OBJECT_0;
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handles.at(0)), timeout, true) == WAIT_OBJECT_0;
}
else
{
AuList<HANDLE> ntHandles;
ntHandles.reserve(handles.size());
for (const auto &handle : handles)
{
ntHandles.push_back(reinterpret_cast<HANDLE>(handle));
if (!AuTryInsert(ntHandles, reinterpret_cast<HANDLE>(handle)))
{
return false;
}
}
auto idx = WaitForMultipleObjectsEx(ntHandles.size(), ntHandles.data(), false, 0, true);
auto idx = WaitForMultipleObjectsEx(ntHandles.size(), ntHandles.data(), false, timeout, true);
if (idx < WAIT_OBJECT_0)
{
return false;
@ -42,9 +46,9 @@ namespace Aurora::Loop
}
}
bool WaitSingleGeneric::WaitForOne(AuUInt handle)
bool WaitSingleGeneric::WaitForOne(AuUInt32 timeout, AuUInt handle)
{
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handle), 0, true) == WAIT_OBJECT_0;
return WaitForSingleObjectEx(reinterpret_cast<HANDLE>(handle), timeout, true) == WAIT_OBJECT_0;
}
void WaitSingleGeneric::OnPresleep()

View File

@ -11,8 +11,8 @@ namespace Aurora::Loop
{
struct WaitSingleGeneric : WaitSingleBase
{
virtual bool WaitForAtleastOne(const AuList<AuUInt> &handles, AuUInt &one) override;
virtual bool WaitForOne(AuUInt handle) override;
virtual bool WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, AuUInt &one) override;
virtual bool WaitForOne(AuUInt32 timeout, AuUInt handle) override;
virtual void OnPresleep() override;
virtual void OnFinishSleep() override;
virtual ELoopSource GetType() override;

View File

@ -7,10 +7,11 @@
***/
#include <Source/RuntimeInternal.hpp>
#include "WaitSingle.hpp"
#include <Source/Time/Time.hpp>
namespace Aurora::Loop
{
bool WaitSingleGeneric::WaitForAtleastOne(const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
bool WaitSingleGeneric::WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
{
fd_set readSet, writeSet;
AuUInt maxHandle {};
@ -19,6 +20,11 @@ namespace Aurora::Loop
FD_ZERO(&readSet);
FD_ZERO(&writeSet);
if (timeout)
{
AuTime::ms2tv(&tv, timeout);
}
for (const auto i : handles)
{
if (i == -1) continue;
@ -33,7 +39,11 @@ namespace Aurora::Loop
maxHandle = AuMax(maxHandle, i + 1);
}
auto active = select(maxHandle, handles.size() ? &readSet : NULL, handlesWrite.size() ? &writeSet : NULL, NULL, &tv);
auto active = select(maxHandle,
handles.size() ? &readSet : nullptr,
handlesWrite.size() ? &writeSet : nullptr,
nullptr,
timeout == AuUInt32(-1) ? nullptr : &tv);
if (active == -1)
{
// todo push error
@ -65,7 +75,7 @@ namespace Aurora::Loop
return true;
}
bool WaitSingleGeneric::WaitForOne(AuUInt read, AuUInt write)
bool WaitSingleGeneric::WaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write)
{
fd_set readSet, writeSet;
struct timeval tv {};
@ -74,6 +84,11 @@ namespace Aurora::Loop
FD_ZERO(&readSet);
FD_ZERO(&writeSet);
if (timeout)
{
AuTime::ms2tv(&tv, timeout);
}
if (read != -1)
{
maxFd = read + 1;
@ -86,7 +101,11 @@ namespace Aurora::Loop
maxFd = AuMax(maxFd, int(write) + 1);
}
auto active = select(maxFd, read != -1 ? &readSet : NULL, write != -1 ? &writeSet : NULL, NULL, &tv);
auto active = select(maxFd,
read != -1 ? &readSet : nullptr,
write != -1 ? &writeSet : nullptr,
nullptr,
timeout == AuUInt32(-1) ? nullptr : &tv);
if (active == -1)
{
// todo push error

View File

@ -11,8 +11,8 @@ namespace Aurora::Loop
{
struct WaitSingleGeneric : WaitSingleBase
{
virtual bool WaitForAtleastOne(const AuList<AuUInt> &handles _OPT_WRITE_ARRAY, AuUInt &read _OPT_WRITE_REF) override;
virtual bool WaitForOne(AuUInt handle _OPT_WRITE) override;
virtual bool WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles _OPT_WRITE_ARRAY, AuUInt &read _OPT_WRITE_REF) override;
virtual bool WaitForOne(AuUInt32 timeout, AuUInt handle _OPT_WRITE) override;
virtual void OnPresleep() override;
virtual void OnFinishSleep() override;
virtual ELoopSource GetType() override;

View File

@ -22,10 +22,10 @@ namespace Aurora::Loop
#if defined(AURORA_IS_POSIX_DERIVED)
auto handle = this->GetHandle();
auto handleRw = this->GetWriteHandle();
val = this->WaitForOne(handle, handleRw);
val = this->WaitForOne(0, handle, handleRw);
#else
auto handle = this->GetHandle();
val = this->WaitForOne(handle);
val = this->WaitForOne(0, handle);
#endif
if (val)
{
@ -37,11 +37,55 @@ namespace Aurora::Loop
#if defined(AURORA_IS_POSIX_DERIVED)
auto handles = this->GetHandles();
auto handlesRw = this->GetWriteHandles();
val = this->WaitForAtleastOne(handles, handlesRw, one, two);
val = this->WaitForAtleastOne(0, handles, handlesRw, one, two);
if (one == -1) one = two;
#else
auto handles = this->GetHandles();
val = this->WaitForAtleastOne(handles, one);
val = this->WaitForAtleastOne(0, handles, one);
#endif
if (val)
{
val = this->OnTrigger(one);
}
}
this->OnFinishSleep();
return val;
}
bool WaitSingleBase::WaitOn(AuUInt32 timeout)
{
bool val {};
AuUInt one {};
AuUInt two {};
this->OnPresleep();
if (this->Singular())
{
#if defined(AURORA_IS_POSIX_DERIVED)
auto handle = this->GetHandle();
auto handleRw = this->GetWriteHandle();
val = this->WaitForOne(timeout ? timeout : AuUInt32(-1), handle, handleRw);
#else
auto handle = this->GetHandle();
val = this->WaitForOne(timeout ? timeout : AuUInt32(-1), handle);
#endif
if (val)
{
val = this->OnTrigger(handle);
}
}
else
{
#if defined(AURORA_IS_POSIX_DERIVED)
auto handles = this->GetHandles();
auto handlesRw = this->GetWriteHandles();
val = this->WaitForAtleastOne(timeout ? timeout : AuUInt32(-1), handles, handlesRw, one, two);
if (one == -1) one = two;
#else
auto handles = this->GetHandles();
val = this->WaitForAtleastOne(timeout ? timeout : AuUInt32(-1), handles, one);
#endif
if (val)

View File

@ -29,9 +29,10 @@ namespace Aurora::Loop
struct WaitSingleBase : public ILoopSourceEx
{
bool IsSignaled() override;
virtual bool IsSignaled() override;
virtual bool WaitOn(AuUInt32 timeout) override;
virtual bool WaitForAtleastOne(const AuList<AuUInt> &handles _OPT_WRITE_ARRAY, AuUInt &one _OPT_WRITE_REF) = 0;
virtual bool WaitForOne(AuUInt handle _OPT_WRITE) = 0;
virtual bool WaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles _OPT_WRITE_ARRAY, AuUInt &one _OPT_WRITE_REF) = 0;
virtual bool WaitForOne(AuUInt32 timeout, AuUInt handle _OPT_WRITE) = 0;
};
}

View File

@ -48,6 +48,24 @@ namespace Aurora::Time
ts->tv_nsec = remainderNS;
}
static void ms2tvabs(struct timeval *tv, unsigned long ms)
{
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
auto baseNS = ((AuUInt64)ms * (AuUInt64)1'000'000) + (AuUInt64)ts.tv_nsec;
auto remainderNS = (AuUInt64)baseNS % (AuUInt64)1'000'000'000;
tv->tv_sec += baseNS / 1'000'000'000ull;
tv->tv_usec = remainderNS / 1'000ull;
}
static void ms2tv(struct timeval *tv, unsigned long ms)
{
tv->tv_sec = ms / 1'000ull;
tv->tv_usec = (ms % 1'000ull) * 1'000ull;
}
static void ms2tsabsmono(struct timespec *ts, unsigned long ms)
{
clock_gettime(CLOCK_MONOTONIC, ts);