[+] IO test: ReadFileThroughPipeProcessor

[*] IO refactor
[*] Update tests and runtime. Linux was good ~4 ish commits ago. Almost back again.
[*] Update build scripts
This commit is contained in:
Reece Wilson 2022-06-23 22:07:57 +01:00
parent 333fe12f9a
commit 602de0cbb2
6 changed files with 124 additions and 20 deletions

@ -1 +1 @@
Subproject commit 4fb09a68e921f3cf3ecd759787ff4fdc69964cec Subproject commit cb3b1a582b92db64d1199a5c2bd1f0f4715fd8e9

@ -1 +1 @@
Subproject commit 8bd7f698e31fd2c84cedf3512edaa28c9db428de Subproject commit bf6f13095c5546babd071f54ba59b63d4793a7cc

@ -1 +1 @@
Subproject commit d551cda998c04c72a0d63bc585c0ad6cac633507 Subproject commit c755a236cdee84a0796e775612467ea5fb6cb67b

View File

@ -19,9 +19,9 @@ TEST(Process, RunSystem)
AuProcesses::StartupParmaters params; AuProcesses::StartupParmaters params;
params.type = AuProcesses::ESpawnType::eSpawnChildProcessWorker; params.type = AuProcesses::ESpawnType::eSpawnChildProcessWorker;
params.process = proc; params.process = proc;
params.fwdOut = true; params.fwdOut = AuProcesses::EStreamForward::eAsyncPipe;
params.fwdIn = true; params.fwdIn = AuProcesses::EStreamForward::eAsyncPipe;
params.noShowConsole = true; params.bNoShowConsole = true;
auto process = AuProcesses::SpawnUnique(params); auto process = AuProcesses::SpawnUnique(params);
ASSERT_TRUE(static_cast<bool>(process)); ASSERT_TRUE(static_cast<bool>(process));
@ -49,9 +49,9 @@ TEST(Process, RunSystemLoopSourceStdOut)
AuProcesses::StartupParmaters params; AuProcesses::StartupParmaters params;
params.type = AuProcesses::ESpawnType::eSpawnChildProcessWorker; params.type = AuProcesses::ESpawnType::eSpawnChildProcessWorker;
params.process = proc; params.process = proc;
params.fwdOut = true; params.fwdOut = AuProcesses::EStreamForward::eAsyncPipe;
params.fwdIn = true; params.fwdIn = AuProcesses::EStreamForward::eAsyncPipe;
params.noShowConsole = true; params.bNoShowConsole = true;
auto process = AuProcesses::SpawnUnique(params); auto process = AuProcesses::SpawnUnique(params);
ASSERT_TRUE(static_cast<bool>(process)); ASSERT_TRUE(static_cast<bool>(process));

View File

@ -42,14 +42,14 @@ TEST(IPC, ShareUnixFd)
TEST(IPC, Event) TEST(IPC, Event)
{ {
auto event = Aurora::IPC::NewEvent(false, true); auto event = AuIPC::NewEvent(false, true);
ASSERT_TRUE(bool(event)); ASSERT_TRUE(bool(event));
auto handleString = event->ExportToString(); auto handleString = event->ExportToString();
ASSERT_TRUE(bool(handleString.size())); ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported event handle: {}", handleString); AuLogDbg("Exported event handle: {}", handleString);
auto eventImported = Aurora::IPC::ImportEvent(handleString); auto eventImported = AuIPC::ImportEvent(handleString);
ASSERT_TRUE(bool(eventImported)); ASSERT_TRUE(bool(eventImported));
ASSERT_FALSE(eventImported->IsSignaled()); ASSERT_FALSE(eventImported->IsSignaled());
@ -64,14 +64,14 @@ TEST(IPC, Event)
TEST(IPC, Semaphore) TEST(IPC, Semaphore)
{ {
auto semaphore = Aurora::IPC::NewSemaphore(2); auto semaphore = AuIPC::NewSemaphore(2);
ASSERT_TRUE(bool(semaphore)); ASSERT_TRUE(bool(semaphore));
auto handleString = semaphore->ExportToString(); auto handleString = semaphore->ExportToString();
ASSERT_TRUE(bool(handleString.size())); ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported semaphore handle: {}", handleString); AuLogDbg("Exported semaphore handle: {}", handleString);
auto semaphoreImported = Aurora::IPC::ImportSemaphore(handleString); auto semaphoreImported = AuIPC::ImportSemaphore(handleString);
ASSERT_TRUE(bool(semaphoreImported)); ASSERT_TRUE(bool(semaphoreImported));
ASSERT_TRUE(semaphoreImported->IsSignaled()); ASSERT_TRUE(semaphoreImported->IsSignaled());
@ -90,14 +90,14 @@ TEST(IPC, Semaphore)
TEST(IPC, Memory) TEST(IPC, Memory)
{ {
auto memory = Aurora::IPC::NewSharedMemory(4096); auto memory = AuIPC::NewSharedMemory(4096);
ASSERT_TRUE(bool(memory)); ASSERT_TRUE(bool(memory));
auto handleString = memory->ExportToString(); auto handleString = memory->ExportToString();
ASSERT_TRUE(bool(handleString.size())); ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported shared view handle: {}", handleString); AuLogDbg("Exported shared view handle: {}", handleString);
auto memoryImported = Aurora::IPC::ImportSharedMemory(handleString); auto memoryImported = AuIPC::ImportSharedMemory(handleString);
ASSERT_TRUE(bool(memoryImported)); ASSERT_TRUE(bool(memoryImported));
static const AuString kHelloWorld = "Hello IPC"; static const AuString kHelloWorld = "Hello IPC";
@ -110,7 +110,7 @@ TEST(IPC, Memory)
TEST(IPC, Pipe) TEST(IPC, Pipe)
{ {
auto pipe = Aurora::IPC::NewPipe(); auto pipe = AuIPC::NewPipe();
ASSERT_TRUE(bool(pipe)); ASSERT_TRUE(bool(pipe));
ASSERT_FALSE(pipe->AsReadChannelIsOpen()->IsSignaled()); ASSERT_FALSE(pipe->AsReadChannelIsOpen()->IsSignaled());
@ -119,7 +119,7 @@ TEST(IPC, Pipe)
ASSERT_TRUE(bool(handleString.size())); ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported pipe handle: {}", handleString); AuLogDbg("Exported pipe handle: {}", handleString);
auto pipeImported = Aurora::IPC::ImportPipe(handleString); auto pipeImported = AuIPC::ImportPipe(handleString);
ASSERT_TRUE(bool(pipeImported)); ASSERT_TRUE(bool(pipeImported));
ASSERT_TRUE(pipe->AsReadChannelIsOpen()->IsSignaled()); ASSERT_TRUE(pipe->AsReadChannelIsOpen()->IsSignaled());
@ -174,7 +174,7 @@ TEST(IPC, Pipe)
TEST(IPC, AsyncPipe) TEST(IPC, AsyncPipe)
{ {
auto pipe = Aurora::IPC::NewPipe(); auto pipe = AuIPC::NewPipe();
ASSERT_TRUE(bool(pipe)); ASSERT_TRUE(bool(pipe));
ASSERT_FALSE(pipe->AsReadChannelIsOpen()->IsSignaled()); ASSERT_FALSE(pipe->AsReadChannelIsOpen()->IsSignaled());
@ -183,7 +183,7 @@ TEST(IPC, AsyncPipe)
ASSERT_TRUE(bool(handleString.size())); ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported pipe handle: {}", handleString); AuLogDbg("Exported pipe handle: {}", handleString);
auto pipeImported = Aurora::IPC::ImportPipe(handleString); auto pipeImported = AuIPC::ImportPipe(handleString);
ASSERT_TRUE(bool(pipeImported)); ASSERT_TRUE(bool(pipeImported));
ASSERT_TRUE(pipe->AsReadChannelIsOpen()->IsSignaled()); ASSERT_TRUE(pipe->AsReadChannelIsOpen()->IsSignaled());

View File

@ -9,7 +9,6 @@
#include <Aurora/IO/IOExperimental.hpp> #include <Aurora/IO/IOExperimental.hpp>
#include <gtest/gtest.h> #include <gtest/gtest.h>
TEST(IO, Proceessor_SoftAbort) TEST(IO, Proceessor_SoftAbort)
{ {
static int counter {}; static int counter {};
@ -130,6 +129,111 @@ TEST(IO, Proceessor_FailAbort)
} }
TEST(IO, ReadFileThroughPipeProcessor)
{
// Write 20MB of RNG asynchronously
{
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
ASSERT_TRUE(bool(stream));
AuByteBuffer rngbuffer(512 * 1024 * 40);
AuRng::RngFillRange(rngbuffer);
auto transaction = stream->NewTransaction();
ASSERT_TRUE(bool(transaction));
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
{
AuLogDbg("AIO 1 callback: {} {}", offset, length);
}));
AuMemoryViewRead readView(rngbuffer);
ASSERT_TRUE(transaction->StartWrite(0, AuUnsafeRaiiToShared(&readView)));
AuLogDbg("AIO 1: waiting....");
ASSERT_TRUE(transaction->Wait(0));
AuLogDbg("AIO 1: wait complete....");
ASSERT_TRUE(transaction->NewLoopSource()->IsSignaled());
ASSERT_TRUE(transaction->Complete());
}
{
// Create the io processor
auto ioDrivenQueue = AuIO::NewIOProcessorNoQueue(false);
ASSERT_TRUE(bool(ioDrivenQueue));
auto pipeProcessor = ioDrivenQueue->ToPipeProcessor();
ASSERT_TRUE(bool(pipeProcessor));
// Open the file and prepare an async transaction for the pipe processor to use
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eRead, true);
ASSERT_TRUE(bool(stream));
auto transaction = stream->NewTransaction();
ASSERT_TRUE(bool(transaction));
// Create an event listener for the callbacks
auto listener = AuMakeShared<AuIO::IIOPipeEventListenerFunctional>();
ASSERT_TRUE(bool(listener));
listener->OnPipeSuccessEventFunctional = []()
{
AuLogInfo("File IO pipe complete");
};
listener->OnPipeFailureEventFunctional = []()
{
AuLogInfo("File IO pipe failure");
};
listener->OnPipePartialEventFunctional = [](AuUInt tranferred)
{
AuLogInfo("on part: {}", tranferred);
};
// Setup the IOPipeRequest
AuIO::IOPipeRequestAIO aio;
aio.asyncTransaction = transaction;
aio.listener = listener;
aio.isStream = false;
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
onData->OnDataAvailableFunctional = [](AuByteBuffer &view)
{
AuLogDbg("Pipe processing: {}", view.RemainingBytes());
view.Skip(view.RemainingBytes());
return true;
};
aio.output.handleBufferedStream.onData = onData;
// Finally create the damn pipe
auto filePipe = pipeProcessor->NewAIOPipe(aio);
ASSERT_TRUE(bool(filePipe));
ASSERT_TRUE(filePipe->Start());
// Main loop
while (ioDrivenQueue->HasItems())
{
// Since we aren't using an external loop queue, we manually pump the processor in here under this spinloop
// If we were using a loop queue, we wouldn't need to do anything at all. You could still technically
// use ioDrivenQueue->ToQueue()->WaitAny() to trigger the tick.
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
}
}
}
void RunTests() void RunTests()
{ {
Aurora::RuntimeStartInfo info; Aurora::RuntimeStartInfo info;