[*] Move AuIO::WaitMultiple

[+] AuIO::WaitMultiple2
This commit is contained in:
Reece Wilson 2022-05-13 19:24:27 +01:00
parent 021959bdfc
commit 86ac108f11
10 changed files with 290 additions and 120 deletions

View File

@ -0,0 +1,35 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Async.hpp
Date: 2022-5-13
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
/**
* Waits for at least one overlapped IO event
* @param transactions An array mask of FIO transactions
* @param timeout Timeout in milliseconds, zero = indefinite
*/
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeoutMs);
AUKN_SYM AuList<AuSPtr<IAsyncTransaction>> WaitMultiple2(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeoutMs);
/**
* A hint to dispatch any outstanding IO requests.
*
* The Aurora Runtime optimizes file FileRead/Writes by batching on some platforms.
* It may be required to call SendBatched for lower latency immediately after a Read/Write.
*
* Consider any call to WaitMultiple, internal loop source signal state, or loop queue wait,
* an internal hint to request an async IO work-queue flush.
*
* @warning This does nothing on NT
* @warning Linux StartRead/StartWrites will not start immediately.
* They instead wait for a yield or call to SendBatched, unless reconfigured under the RuntimeConfig struct.
*/
AUKN_SYM void SendBatched();
}

View File

@ -9,31 +9,9 @@
namespace Aurora::IO::FS
{
// WARNING: AURORA::IO::FS ASYNC IS NOT THREAD SAFE
// You can use the synchronization objects for the purpose of cross-thread synchronization
// However; IAsyncTransaction should not be accessed from differing threads.
// However, IAsyncTransaction should not be accessed from differing threads.
AUKN_SHARED_API(OpenAsync, IAsyncFileStream, const AuString &path, EFileOpenMode openMode, bool directIO = false, EFileAdvisoryLockLevel lock = EFileAdvisoryLockLevel::eNoSafety);
/**
* Waits for at least one file IO event
* @param transactions An array mask of FIO transactions
* @param timeout Timeout in milliseconds, zero = indefinite
*/
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeout);
/**
* A hint to dispatch any outstanding IO requests.
*
* The Aurora Runtime optimizes file FileRead/Writes by batching on some platforms.
* It may be required to call SendBatched for lower latency immediately after a Read/Write.
*
* Consider any call to WaitMultiple, internal loop source signal state, or loop queue wait,
* an internal hint to request an async IO work-queue flush.
*
* @warning This does nothing on NT
* @warning Linux StartRead/StartWrites will not start immediately. They instead wait for a yield or call to SendBatched.
*/
AUKN_SYM void SendBatched();
}

View File

@ -17,6 +17,7 @@
#include "IAsyncFinishedSubscriber.hpp"
#include "IAsyncTransaction.hpp"
#include "Async.hpp"
#include "FS/FS.hpp"
#include "Net/Net.hpp"

View File

@ -396,64 +396,4 @@ namespace Aurora::IO::FS
{
AuSafeDelete<LinuxAsyncFileStream *>(handle);
}
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuCtorCode_t code;
AuUInt32 count {};
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(timeout);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, files);
if (!code)
{
return 0;
}
while (true)
{
if (waitQueue.empty())
{
break;
}
AuUInt32 timeoutMS {};
if (timeout)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
if (delta <= 0)
{
break;
}
timeoutMS = delta;
}
if (!UNIX::LinuxOverlappedPoll(timeoutMS))
{
continue;
}
for (auto itr = waitQueue.begin();
itr != waitQueue.end();)
{
if (AuStaticPointerCast<LinuxAsyncFileTransaction>(*itr)->Complete())
{
count ++;
itr = waitQueue.erase(itr);
}
else
{
itr++;
}
}
if (count)
{
break;
}
}
return count;
}
}

View File

@ -438,34 +438,4 @@ namespace Aurora::IO::FS
{
AuSafeDelete<NtAsyncFileStream *>(handle);
}
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuList<HANDLE> handles;
AuUInt32 count {};
if (files.empty())
{
return true;
}
for (const auto & file : files)
{
handles.push_back(AuStaticPointerCast<NtAsyncFileTransaction>(file)->GetHandle());
}
auto ret = WaitForMultipleObjectsEx(handles.size(), handles.data(), false, timeout ? timeout : INFINITE, TRUE);
if (ret == WAIT_TIMEOUT || ret == WAIT_FAILED)
{
return false;
}
for (auto &file : files)
{
count += AuStaticPointerCast<NtAsyncFileTransaction>(file)->Complete();
}
return count;
}
}

View File

@ -31,11 +31,4 @@ namespace Aurora::IO
UNIX::DeinitUnixIO();
#endif
}
AUKN_SYM void SendBatched()
{
#if defined(AURORA_IS_LINUX_DERIVED)
UNIX::SendIOBuffers();
#endif
}
}

View File

@ -0,0 +1,142 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WaitMultiple.Linux.cpp
Date: 2022-5-13
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IO.hpp"
#include "WaitMultiple.Linux.hpp"
#include "FS/Async.Linux.hpp"
#include "UNIX/IOSubmit.Linux.hpp"
namespace Aurora::IO
{
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuCtorCode_t code;
AuUInt32 count {};
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(timeout);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, files);
if (!code)
{
return 0;
}
while (true)
{
if (waitQueue.empty())
{
break;
}
AuUInt32 timeoutMS {};
if (timeout)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
if (delta <= 0)
{
break;
}
timeoutMS = delta;
}
if (!UNIX::LinuxOverlappedPoll(timeoutMS))
{
continue;
}
for (auto itr = waitQueue.begin();
itr != waitQueue.end();)
{
if (AuStaticPointerCast<FS::LinuxAsyncFileTransaction>(*itr)->Complete())
{
count ++;
itr = waitQueue.erase(itr);
}
else
{
itr++;
}
}
if (count)
{
break;
}
}
return count;
}
AUKN_SYM AuList<AuSPtr<IAsyncTransaction>> WaitMultiple2(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 requestedTimeoutMs)
{
AuCtorCode_t code;
AuList<AuSPtr<IAsyncTransaction>> retTransactions;
AuUInt64 endTime = AuTime::CurrentInternalClockMS() + AuUInt64(requestedTimeoutMs);
auto waitQueue = AuTryConstruct<AuList<AuSPtr<IAsyncTransaction>>>(code, transactions);
if (!code)
{
return 0;
}
while (true)
{
if (waitQueue.empty())
{
break;
}
AuUInt32 timeoutMS {};
if (requestedTimeoutMs)
{
AuInt64 delta = (AuInt64)endTime - AuTime::CurrentInternalClockMS();
if (delta <= 0)
{
break;
}
timeoutMS = delta;
}
if (!UNIX::LinuxOverlappedPoll(timeoutMS))
{
continue;
}
for (auto itr = waitQueue.begin();
itr != waitQueue.end();)
{
if (AuStaticPointerCast<FS::LinuxAsyncFileTransaction>(*itr)->Complete())
{
AuTryInsert(retTransactions, *itr);
itr = waitQueue.erase(itr);
}
else
{
itr++;
}
}
if (retTransactions.size())
{
break;
}
}
return retTransactions;
}
AUKN_SYM void SendBatched()
{
#if defined(AURORA_IS_LINUX_DERIVED)
UNIX::SendIOBuffers();
#endif
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WaitMultiple.Linux.hpp
Date: 2022-5-13
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}

View File

@ -0,0 +1,85 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WaitMultiple.NT.cpp
Date: 2022-5-13
Author: Reece
***/
#include <Source/RuntimeInternal.hpp>
#include "IO.hpp"
#include "WaitMultiple.NT.hpp"
#include "FS/Async.NT.hpp"
namespace Aurora::IO
{
AUKN_SYM AuUInt32 WaitMultiple(const AuList<AuSPtr<IAsyncTransaction>> &files, AuUInt32 timeout)
{
AuList<HANDLE> handles;
AuUInt32 count {};
if (files.empty())
{
return 0;
}
for (const auto &file : files)
{
AuTryInsert(handles, AuStaticPointerCast<FS::NtAsyncFileTransaction>(file)->GetHandle());
}
auto ret = WaitForMultipleObjectsEx(handles.size(), handles.data(), false, timeout ? timeout : INFINITE, TRUE);
if (ret == WAIT_TIMEOUT || ret == WAIT_FAILED)
{
return 0;
}
for (auto &file : files)
{
count += AuStaticPointerCast<FS::NtAsyncFileTransaction>(file)->Complete();
}
return count;
}
AUKN_SYM AuList<AuSPtr<IAsyncTransaction>> WaitMultiple2(const AuList<AuSPtr<IAsyncTransaction>> &transactions, AuUInt32 timeoutMs)
{
AuList<AuSPtr<IAsyncTransaction>> retTransasctions;
AuList<HANDLE> handles;
AuUInt32 count {};
if (transactions.empty())
{
return {};
}
retTransasctions.reserve(transactions.size());
for (const auto &file : transactions)
{
AuTryInsert(handles, AuStaticPointerCast<FS::NtAsyncFileTransaction>(file)->GetHandle());
}
auto ret = WaitForMultipleObjectsEx(handles.size(), handles.data(), false, timeoutMs ? timeoutMs : INFINITE, TRUE);
if (ret == WAIT_TIMEOUT || ret == WAIT_FAILED)
{
return {};
}
for (auto &file : transactions)
{
if (AuStaticPointerCast<FS::NtAsyncFileTransaction>(file)->Complete())
{
AuTryInsert(retTransasctions, file);
}
}
return retTransasctions;
}
AUKN_SYM void SendBatched()
{
}
}

View File

@ -0,0 +1,13 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: WaitMultiple.NT.hpp
Date: 2022-5-13
Author: Reece
***/
#pragma once
namespace Aurora::IO
{
}