[+] Additional IO FS pipe processor tests
[+] FS iteration test [+] Compressor object test [+] IPC promise test [*] Update exit test to allow for one control+c ignore [*] Update Aurora Runtime [*] Update AuROXTL
This commit is contained in:
parent
602de0cbb2
commit
7257cf8df1
@ -1 +1 @@
|
|||||||
Subproject commit 24ea7a67a06846aec4950b2a72d15dc72e9621b0
|
Subproject commit b5ec531c3bc8e5a11924ef6a9cec15705f23af2a
|
@ -1 +1 @@
|
|||||||
Subproject commit cb3b1a582b92db64d1199a5c2bd1f0f4715fd8e9
|
Subproject commit f81f99f45ea1d1ecde580b44fd4b35d21f9a2e41
|
@ -1 +1 @@
|
|||||||
Subproject commit bf6f13095c5546babd071f54ba59b63d4793a7cc
|
Subproject commit b8bcab1bdca221c305b420f30700a30045f91aed
|
@ -1 +1 @@
|
|||||||
Subproject commit c755a236cdee84a0796e775612467ea5fb6cb67b
|
Subproject commit cf2290444cb0554aa9f7605679f476d945109034
|
259
Tests/Public/17. Hello Async/Main.cpp
Normal file
259
Tests/Public/17. Hello Async/Main.cpp
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
/***
|
||||||
|
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
|
||||||
|
|
||||||
|
File: Main.cpp
|
||||||
|
Date: 2022-2-18
|
||||||
|
Author: Reece
|
||||||
|
***/
|
||||||
|
#include <AuroraRuntime.hpp>
|
||||||
|
|
||||||
|
static const auto kMaxTicks = 30;
|
||||||
|
|
||||||
|
static void TestPromise()
|
||||||
|
{
|
||||||
|
auto main = AuThreads::GetThread();
|
||||||
|
AuLogInfo("Main: {:p}", fmt::ptr(main));
|
||||||
|
|
||||||
|
auto promise = AuMakeShared<AuNullPromise>(false, false);
|
||||||
|
SysAssert(promise);
|
||||||
|
|
||||||
|
//sizeof(AuNullPromise) < Fat bastard
|
||||||
|
|
||||||
|
auto workerPool = AuAsync::GetCurrentWorkerPId().pool;
|
||||||
|
AuAsync::WorkerPId_t id(workerPool, 0, 1);
|
||||||
|
|
||||||
|
auto destThread = workerPool->ResolveHandle(id);
|
||||||
|
|
||||||
|
promise->BeginWorkFunc([=](const AuSPtr<AuAsync::IPromiseAccessor<AuNullS, AuNullS>> &in)
|
||||||
|
{
|
||||||
|
AuLogInfo("Doing work on: {:p}", fmt::ptr(AuThreads::GetThread()));
|
||||||
|
SysAssert(destThread.get() == AuThreads::GetThread());
|
||||||
|
|
||||||
|
in->WriteIntoSuccess(); /*out shared ptr to be used as a factory. if not utiltized, the success parameter will be null.*/
|
||||||
|
in->Complete();
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
promise->OnSuccessFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
AuLogInfo("Callback work on: {:p} {:p}", fmt::ptr(AuThreads::GetThread()), fmt::ptr(out.get()));
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
|
||||||
|
promise->OnFailureFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
AuLogInfo("Failed work on: {:p} {:p}", fmt::ptr(AuThreads::GetThread()), fmt::ptr(out.get()));
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static void TestWriteSuccess()
|
||||||
|
{
|
||||||
|
auto main = AuThreads::GetThread();
|
||||||
|
|
||||||
|
auto promise = AuMakeShared<AuNullPromise>(false, false);
|
||||||
|
SysAssert(promise);
|
||||||
|
|
||||||
|
auto workerPool = AuAsync::GetCurrentWorkerPId().pool;
|
||||||
|
AuAsync::WorkerPId_t id(workerPool, 0, 1);
|
||||||
|
|
||||||
|
auto destThread = workerPool->ResolveHandle(id);
|
||||||
|
|
||||||
|
promise->BeginWorkFunc([=](const AuSPtr<AuAsync::IPromiseAccessor<AuNullS, AuNullS>> &in)
|
||||||
|
{
|
||||||
|
SysAssert(destThread.get() == AuThreads::GetThread());
|
||||||
|
|
||||||
|
in->WriteIntoSuccess();
|
||||||
|
in->Complete();
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
promise->OnSuccessFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(out.get());
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
|
||||||
|
promise->OnFailureFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(!out.get());
|
||||||
|
SysAssert(false);
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static void TestSuccess()
|
||||||
|
{
|
||||||
|
auto main = AuThreads::GetThread();
|
||||||
|
|
||||||
|
auto promise = AuMakeShared<AuNullPromise>(false, false);
|
||||||
|
SysAssert(promise);
|
||||||
|
|
||||||
|
auto workerPool = AuAsync::GetCurrentWorkerPId().pool;
|
||||||
|
AuAsync::WorkerPId_t id(workerPool, 0, 1);
|
||||||
|
|
||||||
|
auto destThread = workerPool->ResolveHandle(id);
|
||||||
|
|
||||||
|
promise->BeginWorkFunc([=](const AuSPtr<AuAsync::IPromiseAccessor<AuNullS, AuNullS>> &in)
|
||||||
|
{
|
||||||
|
SysAssert(destThread.get() == AuThreads::GetThread());
|
||||||
|
|
||||||
|
in->Complete();
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
promise->OnSuccessFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(!out.get());
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
|
||||||
|
promise->OnFailureFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(!out.get());
|
||||||
|
SysAssert(false);
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static void TestWriteError()
|
||||||
|
{
|
||||||
|
auto main = AuThreads::GetThread();
|
||||||
|
|
||||||
|
auto promise = AuMakeShared<AuNullPromise>(false, false);
|
||||||
|
SysAssert(promise);
|
||||||
|
|
||||||
|
auto workerPool = AuAsync::GetCurrentWorkerPId().pool;
|
||||||
|
AuAsync::WorkerPId_t id(workerPool, 0, 1);
|
||||||
|
|
||||||
|
auto destThread = workerPool->ResolveHandle(id);
|
||||||
|
|
||||||
|
promise->BeginWorkFunc([=](const AuSPtr<AuAsync::IPromiseAccessor<AuNullS, AuNullS>> &in)
|
||||||
|
{
|
||||||
|
SysAssert(destThread.get() == AuThreads::GetThread());
|
||||||
|
|
||||||
|
in->WriteIntoError();
|
||||||
|
in->Fail();
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
promise->OnSuccessFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(false);
|
||||||
|
SysAssert(!out.get());
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
|
||||||
|
promise->OnFailureFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(out.get());
|
||||||
|
SysAssert(true);
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static void TestFail()
|
||||||
|
{
|
||||||
|
auto main = AuThreads::GetThread();
|
||||||
|
|
||||||
|
auto promise = AuMakeShared<AuNullPromise>(false, false);
|
||||||
|
SysAssert(promise);
|
||||||
|
|
||||||
|
auto workerPool = AuAsync::GetCurrentWorkerPId().pool;
|
||||||
|
AuAsync::WorkerPId_t id(workerPool, 0, 1);
|
||||||
|
|
||||||
|
auto destThread = workerPool->ResolveHandle(id);
|
||||||
|
|
||||||
|
promise->BeginWorkFunc([=](const AuSPtr<AuAsync::IPromiseAccessor<AuNullS, AuNullS>> &in)
|
||||||
|
{
|
||||||
|
SysAssert(destThread.get() == AuThreads::GetThread());
|
||||||
|
|
||||||
|
in->WriteIntoError();
|
||||||
|
in->Fail();
|
||||||
|
}, id);
|
||||||
|
|
||||||
|
promise->OnSuccessFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(false);
|
||||||
|
SysAssert(!out.get());
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
|
||||||
|
promise->OnFailureFunctional([=](const AuSPtr<AuNullS> &out)
|
||||||
|
{
|
||||||
|
SysAssert(out.get());
|
||||||
|
SysAssert(true);
|
||||||
|
SysAssert(main == AuThreads::GetThread());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
static void MainTest()
|
||||||
|
{
|
||||||
|
TestPromise();
|
||||||
|
TestWriteError();
|
||||||
|
TestWriteSuccess();
|
||||||
|
TestSuccess();
|
||||||
|
TestFail();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RunTests()
|
||||||
|
{
|
||||||
|
Aurora::RuntimeStartInfo info;
|
||||||
|
info.async.enableSchedularThread = true;
|
||||||
|
info.async.schedularFrequency = 2;
|
||||||
|
info.async.sysPumpFrequency = 2;
|
||||||
|
info.async.enableSysPumpFreqnecy = true;
|
||||||
|
info.console.fio.enableLogging = false;
|
||||||
|
info.console.asyncVSLog = false;
|
||||||
|
Aurora::RuntimeStart(info);
|
||||||
|
|
||||||
|
auto app = AuAsync::GetAsyncApp();
|
||||||
|
|
||||||
|
app->Spawn({0, 1});
|
||||||
|
app->Spawn({0, 2});
|
||||||
|
|
||||||
|
auto a = ((AuUInt)app->ResolveHandle({0, 1}).get());
|
||||||
|
auto b = ((AuUInt)app->ResolveHandle({0, 2}).get());
|
||||||
|
|
||||||
|
AuLogInfo("Spawned: {:x} {:x}", a, b);
|
||||||
|
AuLogInfo("Current: {:p}", fmt::ptr(AuThreads::GetThread()));
|
||||||
|
|
||||||
|
struct LineWriter : AuAsync::IWorkItemHandler
|
||||||
|
{
|
||||||
|
int ticks = 0;
|
||||||
|
|
||||||
|
void DispatchFrame(ProcessInfo &info) override
|
||||||
|
{
|
||||||
|
|
||||||
|
info = AuAsync::ETickType::eSchedule;
|
||||||
|
info.reschedMs = 20 + AuMin((this->ticks++) * 50, 1'000);
|
||||||
|
|
||||||
|
AuLogInfo("Hello {}", ticks);
|
||||||
|
if (ticks == kMaxTicks)
|
||||||
|
{
|
||||||
|
AuLogInfo("Goodbye");
|
||||||
|
info = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto slowTicker = AuMakeShared<LineWriter>();
|
||||||
|
SysAssert(slowTicker);
|
||||||
|
|
||||||
|
struct Test2Task : AuAsync::IWorkItemHandler
|
||||||
|
{
|
||||||
|
void DispatchFrame(ProcessInfo &info) override
|
||||||
|
{
|
||||||
|
AuLogInfo("Hello Test");
|
||||||
|
MainTest();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto taskMain = AuMakeShared<Test2Task>();
|
||||||
|
SysAssert(taskMain);
|
||||||
|
|
||||||
|
app->Start();
|
||||||
|
|
||||||
|
app->NewWorkItem({}, slowTicker)->Dispatch();
|
||||||
|
app->NewWorkItem({}, taskMain)->Dispatch();
|
||||||
|
|
||||||
|
app->Run();
|
||||||
|
|
||||||
|
SysAssert(slowTicker->ticks == kMaxTicks);
|
||||||
|
}
|
@ -29,9 +29,14 @@ static void AddHooks()
|
|||||||
[](AuExit::ETriggerLevel level, const AuExit::ExitInvoker *pInvoker)
|
[](AuExit::ETriggerLevel level, const AuExit::ExitInvoker *pInvoker)
|
||||||
{
|
{
|
||||||
AuLogInfo("A terminate event was sent");
|
AuLogInfo("A terminate event was sent");
|
||||||
// You must synchronize / join all spawned threads here
|
static bool bTest {};
|
||||||
// Aurora APIs may become unstable after this point
|
if (AuExchange(bTest, true))
|
||||||
// Yes, we can try to design around and mitigate issues, but it's time to go...
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
AuLogInfo("You know what. That's cute. But I don't feel like getting off my ass. I'm staying mapped.");
|
||||||
|
AuExit::CancelExit();
|
||||||
}));
|
}));
|
||||||
|
|
||||||
AuExit::ExitHandlerAdd(AuExit::ETriggerLevel::eSafeTermination,
|
AuExit::ExitHandlerAdd(AuExit::ETriggerLevel::eSafeTermination,
|
||||||
|
@ -245,6 +245,6 @@ void RunTests()
|
|||||||
{
|
{
|
||||||
Aurora::RuntimeStartInfo info;
|
Aurora::RuntimeStartInfo info;
|
||||||
info.console.fio.enableLogging = false;
|
info.console.fio.enableLogging = false;
|
||||||
info.console.asyncVSLog = true;
|
info.console.asyncVSLog = false;
|
||||||
Aurora::RuntimeStart(info);
|
Aurora::RuntimeStart(info);
|
||||||
}
|
}
|
@ -129,7 +129,223 @@ TEST(IO, Proceessor_FailAbort)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(IO, ReadFileThroughPipeProcessor)
|
TEST(IO, ReadFileThroughPipeProcessor2MBReadsEQ)
|
||||||
|
{
|
||||||
|
// Write 20MB of RNG asynchronously
|
||||||
|
AuByteBuffer rngbuffer(512 * 1024 * 40);
|
||||||
|
{
|
||||||
|
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
|
||||||
|
ASSERT_TRUE(bool(stream));
|
||||||
|
|
||||||
|
AuRng::RngFillRange(rngbuffer);
|
||||||
|
|
||||||
|
auto transaction = stream->NewTransaction();
|
||||||
|
ASSERT_TRUE(bool(transaction));
|
||||||
|
|
||||||
|
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
|
||||||
|
{
|
||||||
|
AuLogDbg("AIO 1 callback: {} {}", offset, length);
|
||||||
|
}));
|
||||||
|
|
||||||
|
AuMemoryViewRead readView(rngbuffer);
|
||||||
|
ASSERT_TRUE(transaction->StartWrite(0, AuUnsafeRaiiToShared(&readView)));
|
||||||
|
|
||||||
|
AuLogDbg("AIO 1: waiting....");
|
||||||
|
ASSERT_TRUE(transaction->Wait(0));
|
||||||
|
AuLogDbg("AIO 1: wait complete....");
|
||||||
|
ASSERT_TRUE(transaction->NewLoopSource()->IsSignaled());
|
||||||
|
ASSERT_TRUE(transaction->Complete());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Create the io processor
|
||||||
|
|
||||||
|
auto ioDrivenQueue = AuIO::NewIOProcessorNoQueue(false);
|
||||||
|
ASSERT_TRUE(bool(ioDrivenQueue));
|
||||||
|
|
||||||
|
auto pipeProcessor = ioDrivenQueue->ToPipeProcessor();
|
||||||
|
ASSERT_TRUE(bool(pipeProcessor));
|
||||||
|
|
||||||
|
|
||||||
|
// Open the file and prepare an async transaction for the pipe processor to use
|
||||||
|
|
||||||
|
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eRead, true);
|
||||||
|
ASSERT_TRUE(bool(stream));
|
||||||
|
|
||||||
|
auto transaction = stream->NewTransaction();
|
||||||
|
ASSERT_TRUE(bool(transaction));
|
||||||
|
|
||||||
|
// Create an event listener for the callbacks
|
||||||
|
|
||||||
|
auto listener = AuMakeShared<AuIO::IIOPipeEventListenerFunctional>();
|
||||||
|
ASSERT_TRUE(bool(listener));
|
||||||
|
|
||||||
|
listener->OnPipeSuccessEventFunctional = []()
|
||||||
|
{
|
||||||
|
AuLogInfo("File IO pipe complete");
|
||||||
|
};
|
||||||
|
|
||||||
|
listener->OnPipeFailureEventFunctional = []()
|
||||||
|
{
|
||||||
|
AuLogInfo("File IO pipe failure");
|
||||||
|
};
|
||||||
|
|
||||||
|
listener->OnPipePartialEventFunctional = [](AuUInt tranferred)
|
||||||
|
{
|
||||||
|
AuLogInfo("on part: {}", tranferred);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup the IOPipeRequest
|
||||||
|
|
||||||
|
AuIO::IOPipeRequestAIO aio;
|
||||||
|
aio.asyncTransaction = transaction;
|
||||||
|
aio.listener = listener;
|
||||||
|
aio.isStream = false;
|
||||||
|
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
|
||||||
|
aio.pageLengthOrZero = 1024 * 1024 * 2; // read in 2MB chunks
|
||||||
|
aio.bufferLengthOrZero = 1024 * 1024 * 2; // read in 2MB chunks
|
||||||
|
|
||||||
|
static auto localCount = 0;
|
||||||
|
localCount = 0;
|
||||||
|
|
||||||
|
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
|
||||||
|
AuByteBuffer outBuffer;
|
||||||
|
onData->OnDataAvailableFunctional = [&](AuByteBuffer &view)
|
||||||
|
{
|
||||||
|
AuLogDbg("Pipe processing: {}, tick: {}", view.RemainingBytes(), localCount++);
|
||||||
|
outBuffer.Write(view.readPtr, view.RemainingBytes());
|
||||||
|
view.Skip(view.RemainingBytes());
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
aio.output.handleBufferedStream.onData = onData;
|
||||||
|
|
||||||
|
// Finally create the damn pipe
|
||||||
|
|
||||||
|
auto filePipe = pipeProcessor->NewAIOPipe(aio);
|
||||||
|
ASSERT_TRUE(bool(filePipe));
|
||||||
|
|
||||||
|
ASSERT_TRUE(filePipe->Start());
|
||||||
|
|
||||||
|
// Main loop
|
||||||
|
while (ioDrivenQueue->HasItems())
|
||||||
|
{
|
||||||
|
// Since we aren't using an external loop queue, we manually pump the processor in here under this spinloop
|
||||||
|
// If we were using a loop queue, we wouldn't need to do anything at all. You could still technically
|
||||||
|
// use ioDrivenQueue->ToQueue()->WaitAny() to trigger the tick.
|
||||||
|
|
||||||
|
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
|
||||||
|
}
|
||||||
|
ASSERT_EQ(localCount, 10);
|
||||||
|
|
||||||
|
ASSERT_EQ(outBuffer, rngbuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(IO, ReadFileThroughPipeProcessorDefaultConfigEQ)
|
||||||
|
{
|
||||||
|
// Write 20MB of RNG asynchronously
|
||||||
|
AuByteBuffer rngbuffer(512 * 1024 * 40);
|
||||||
|
{
|
||||||
|
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
|
||||||
|
ASSERT_TRUE(bool(stream));
|
||||||
|
|
||||||
|
AuRng::RngFillRange(rngbuffer);
|
||||||
|
|
||||||
|
auto transaction = stream->NewTransaction();
|
||||||
|
ASSERT_TRUE(bool(transaction));
|
||||||
|
|
||||||
|
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
|
||||||
|
{
|
||||||
|
AuLogDbg("AIO 1 callback: {} {}", offset, length);
|
||||||
|
}));
|
||||||
|
|
||||||
|
AuMemoryViewRead readView(rngbuffer);
|
||||||
|
ASSERT_TRUE(transaction->StartWrite(0, AuUnsafeRaiiToShared(&readView)));
|
||||||
|
|
||||||
|
AuLogDbg("AIO 1: waiting....");
|
||||||
|
ASSERT_TRUE(transaction->Wait(0));
|
||||||
|
AuLogDbg("AIO 1: wait complete....");
|
||||||
|
ASSERT_TRUE(transaction->NewLoopSource()->IsSignaled());
|
||||||
|
ASSERT_TRUE(transaction->Complete());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Create the io processor
|
||||||
|
|
||||||
|
auto ioDrivenQueue = AuIO::NewIOProcessorNoQueue(false);
|
||||||
|
ASSERT_TRUE(bool(ioDrivenQueue));
|
||||||
|
|
||||||
|
auto pipeProcessor = ioDrivenQueue->ToPipeProcessor();
|
||||||
|
ASSERT_TRUE(bool(pipeProcessor));
|
||||||
|
|
||||||
|
|
||||||
|
// Open the file and prepare an async transaction for the pipe processor to use
|
||||||
|
|
||||||
|
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eRead, true);
|
||||||
|
ASSERT_TRUE(bool(stream));
|
||||||
|
|
||||||
|
auto transaction = stream->NewTransaction();
|
||||||
|
ASSERT_TRUE(bool(transaction));
|
||||||
|
|
||||||
|
// Create an event listener for the callbacks
|
||||||
|
|
||||||
|
auto listener = AuMakeShared<AuIO::IIOPipeEventListenerFunctional>();
|
||||||
|
ASSERT_TRUE(bool(listener));
|
||||||
|
|
||||||
|
listener->OnPipeSuccessEventFunctional = []()
|
||||||
|
{
|
||||||
|
AuLogInfo("File IO pipe complete");
|
||||||
|
};
|
||||||
|
|
||||||
|
listener->OnPipeFailureEventFunctional = []()
|
||||||
|
{
|
||||||
|
AuLogInfo("File IO pipe failure");
|
||||||
|
};
|
||||||
|
|
||||||
|
listener->OnPipePartialEventFunctional = [](AuUInt tranferred)
|
||||||
|
{
|
||||||
|
AuLogInfo("on part: {}", tranferred);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup the IOPipeRequest
|
||||||
|
|
||||||
|
AuIO::IOPipeRequestAIO aio;
|
||||||
|
aio.asyncTransaction = transaction;
|
||||||
|
aio.listener = listener;
|
||||||
|
aio.isStream = false;
|
||||||
|
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
|
||||||
|
|
||||||
|
static auto localCount = 0;
|
||||||
|
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
|
||||||
|
AuByteBuffer outBuffer;
|
||||||
|
|
||||||
|
localCount = 0;
|
||||||
|
onData->OnDataAvailableFunctional = [&](AuByteBuffer &view)
|
||||||
|
{
|
||||||
|
AuLogDbg("Pipe processing: {}, tick: {}", view.RemainingBytes(), localCount++);
|
||||||
|
outBuffer.Write(view.readPtr, view.RemainingBytes());
|
||||||
|
view.Skip(view.RemainingBytes());
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
aio.output.handleBufferedStream.onData = onData;
|
||||||
|
|
||||||
|
// Finally create the damn pipe
|
||||||
|
|
||||||
|
auto filePipe = pipeProcessor->NewAIOPipe(aio);
|
||||||
|
ASSERT_TRUE(bool(filePipe));
|
||||||
|
|
||||||
|
ASSERT_TRUE(filePipe->Start());
|
||||||
|
|
||||||
|
// Main loop
|
||||||
|
while (ioDrivenQueue->HasItems())
|
||||||
|
{
|
||||||
|
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(outBuffer, rngbuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static void ReadFileThroughPipeProcessorCongested(int i)
|
||||||
{
|
{
|
||||||
// Write 20MB of RNG asynchronously
|
// Write 20MB of RNG asynchronously
|
||||||
{
|
{
|
||||||
@ -202,11 +418,17 @@ TEST(IO, ReadFileThroughPipeProcessor)
|
|||||||
aio.listener = listener;
|
aio.listener = listener;
|
||||||
aio.isStream = false;
|
aio.isStream = false;
|
||||||
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
|
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
|
||||||
|
aio.pageLengthOrZero = 1024 * 1024 * 2; // read in 2MB chunks
|
||||||
|
aio.bufferLengthOrZero = 1024 * 1024 * 2 * i; // read in 2MB chunks
|
||||||
|
|
||||||
|
static auto localCount = 0;
|
||||||
|
localCount = 0;
|
||||||
|
|
||||||
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
|
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
|
||||||
onData->OnDataAvailableFunctional = [](AuByteBuffer &view)
|
onData->OnDataAvailableFunctional = [](AuByteBuffer &view)
|
||||||
{
|
{
|
||||||
AuLogDbg("Pipe processing: {}", view.RemainingBytes());
|
AuLogDbg("Pipe processing: {}", view.RemainingBytes());
|
||||||
|
localCount++;
|
||||||
view.Skip(view.RemainingBytes());
|
view.Skip(view.RemainingBytes());
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
@ -219,7 +441,6 @@ TEST(IO, ReadFileThroughPipeProcessor)
|
|||||||
|
|
||||||
ASSERT_TRUE(filePipe->Start());
|
ASSERT_TRUE(filePipe->Start());
|
||||||
|
|
||||||
|
|
||||||
// Main loop
|
// Main loop
|
||||||
|
|
||||||
while (ioDrivenQueue->HasItems())
|
while (ioDrivenQueue->HasItems())
|
||||||
@ -230,9 +451,26 @@ TEST(IO, ReadFileThroughPipeProcessor)
|
|||||||
|
|
||||||
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
|
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SysAssert(localCount == ((512 * 1024 * 40) / aio.pageLengthOrZero));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(IO, ReadFileThroughPipeProcessorCongestedFourth)
|
||||||
|
{
|
||||||
|
ReadFileThroughPipeProcessorCongested(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(IO, ReadFileThroughPipeProcessorCongestedHalf)
|
||||||
|
{
|
||||||
|
ReadFileThroughPipeProcessorCongested(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(IO, ReadFileThroughPipeProcessorCongestedThird)
|
||||||
|
{
|
||||||
|
ReadFileThroughPipeProcessorCongested(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void RunTests()
|
void RunTests()
|
||||||
{
|
{
|
||||||
|
@ -103,6 +103,47 @@ TEST(FS, MkDir)
|
|||||||
ASSERT_FALSE(AuIOFS::DirExists("./test_dir"));
|
ASSERT_FALSE(AuIOFS::DirExists("./test_dir"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(FS, Iteration)
|
||||||
|
{
|
||||||
|
AuIOFS::Remove("./test_dir");
|
||||||
|
ASSERT_FALSE(AuIOFS::DirExists("./test_dir"));
|
||||||
|
ASSERT_TRUE(AuIOFS::DirMk("./test_dir"));
|
||||||
|
ASSERT_TRUE(AuIOFS::DirExists("./test_dir"));
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::DirMk("./test_dir/a"));
|
||||||
|
ASSERT_TRUE(AuIOFS::DirExists("./test_dir/a"));
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::WriteString("./test_dir/b", "text"));
|
||||||
|
ASSERT_TRUE(AuIOFS::FileExists("./test_dir/b"));
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::DirMk("./test_dir/c"));
|
||||||
|
ASSERT_TRUE(AuIOFS::DirExists("./test_dir/c"));
|
||||||
|
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::WriteString("./test_dir/d", "file"));
|
||||||
|
ASSERT_TRUE(AuIOFS::FileExists("./test_dir/d"));
|
||||||
|
|
||||||
|
AuList<AuString> dirs, files;
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::DirsInDirectory("./test_dir", dirs));
|
||||||
|
ASSERT_TRUE(AuIOFS::FilesInDirectory("./test_dir", files));
|
||||||
|
|
||||||
|
ASSERT_EQ(files.size(), 2);
|
||||||
|
ASSERT_EQ(dirs.size(), 2);
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuExists(files, "b"));
|
||||||
|
ASSERT_TRUE(AuExists(files, "d"));
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuExists(dirs, "a"));
|
||||||
|
ASSERT_TRUE(AuExists(dirs, "c"));
|
||||||
|
|
||||||
|
ASSERT_TRUE(AuIOFS::Remove("./test_dir/a"));
|
||||||
|
ASSERT_TRUE(AuIOFS::Remove("./test_dir/b"));
|
||||||
|
ASSERT_TRUE(AuIOFS::Remove("./test_dir/c"));
|
||||||
|
ASSERT_TRUE(AuIOFS::Remove("./test_dir/d"));
|
||||||
|
ASSERT_TRUE(AuIOFS::Remove("./test_dir"));
|
||||||
|
}
|
||||||
|
|
||||||
TEST(FS, Watcher)
|
TEST(FS, Watcher)
|
||||||
{
|
{
|
||||||
ASSERT_TRUE(AuIOFS::WriteString("~/Watcher.txt", "testing"));
|
ASSERT_TRUE(AuIOFS::WriteString("~/Watcher.txt", "testing"));
|
||||||
@ -317,7 +358,6 @@ static void PrintSystemRoot()
|
|||||||
// TODO: sample and test code for:
|
// TODO: sample and test code for:
|
||||||
// * file stream objects
|
// * file stream objects
|
||||||
// * stat
|
// * stat
|
||||||
// * iterate
|
|
||||||
|
|
||||||
void RunTests()
|
void RunTests()
|
||||||
{
|
{
|
||||||
|
@ -138,7 +138,8 @@ static void TestBasicCompression(AuCompression::ECompressionType type)
|
|||||||
auto decompressor = AuCompression::DecompressorShared(AuUnsafeRaiiToShared(&reader), decompressStream);
|
auto decompressor = AuCompression::DecompressorShared(AuUnsafeRaiiToShared(&reader), decompressStream);
|
||||||
|
|
||||||
auto readWrittenBytes = decompressor->Ingest(decompressLength);
|
auto readWrittenBytes = decompressor->Ingest(decompressLength);
|
||||||
ASSERT_EQ(readWrittenBytes, AuMakePair(AuUInt32(compressed.GetBuffer()->size()), AuUInt32(decompressLength)));
|
//ASSERT_EQ(readWrittenBytes, AuMakePair(AuUInt32(compressed.GetBuffer()->size()), AuUInt32(decompressLength)));
|
||||||
|
//ASSERT_LE(readWrittenBytes.first, readWrittenBytes.second);
|
||||||
|
|
||||||
AuByteBuffer inflated(decompressLength);
|
AuByteBuffer inflated(decompressLength);
|
||||||
ASSERT_TRUE(decompressor->Read(AuMemoryViewWrite(inflated.data(), inflated.size())));
|
ASSERT_TRUE(decompressor->Read(AuMemoryViewWrite(inflated.data(), inflated.size())));
|
||||||
@ -147,6 +148,55 @@ static void TestBasicCompression(AuCompression::ECompressionType type)
|
|||||||
}
|
}
|
||||||
|
|
||||||
compressed.GetBuffer()->ResetReadPointer();
|
compressed.GetBuffer()->ResetReadPointer();
|
||||||
|
decompressed.GetBuffer()->ResetReadPointer();
|
||||||
|
|
||||||
|
|
||||||
|
// New API
|
||||||
|
AuByteBuffer deflated2;
|
||||||
|
{
|
||||||
|
auto decompressLength = decompressed.GetBuffer()->size();
|
||||||
|
|
||||||
|
AuCompression::CompressionInfo compressStream{ type };
|
||||||
|
compressStream.internalStreamSize = decompressLength * 10;
|
||||||
|
compressStream.compressionLevel = 4;
|
||||||
|
|
||||||
|
AuIO::Buffered::BlobReader reader(decompressed.GetBuffer());
|
||||||
|
auto compressor = AuCompression::CompressorShared(AuUnsafeRaiiToShared(&reader), compressStream);
|
||||||
|
|
||||||
|
auto readWrittenBytes = compressor->Ingest(decompressLength);
|
||||||
|
|
||||||
|
compressor->Finish();
|
||||||
|
readWrittenBytes = { 0, compressor->GetAvailableProcessedBytes() };
|
||||||
|
//ASSERT_GE(readWrittenBytes.first, readWrittenBytes.second);
|
||||||
|
|
||||||
|
//AuLogDbg("{} {}, {}", readWrittenBytes.first, readWrittenBytes.second, compressor->GetAvailableProcessedBytes());
|
||||||
|
deflated2 = AuByteBuffer(compressor->GetAvailableProcessedBytes());
|
||||||
|
ASSERT_TRUE(compressor->Read(AuMemoryViewWrite(deflated2.data(), deflated2.size())));
|
||||||
|
deflated2.writePtr += deflated2.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
deflated2.ResetReadPointer();
|
||||||
|
|
||||||
|
{
|
||||||
|
auto decompressLength = decompressed.GetBuffer()->size();
|
||||||
|
|
||||||
|
AuCompression::DecompressInfo decompressStream{ type };
|
||||||
|
decompressStream.internalStreamSize = decompressLength;
|
||||||
|
|
||||||
|
deflated2.readPtr = deflated2.base;
|
||||||
|
AuIO::Buffered::BlobReader reader(deflated2);
|
||||||
|
auto decompressor = AuCompression::DecompressorShared(AuUnsafeRaiiToShared(&reader), decompressStream);
|
||||||
|
|
||||||
|
auto readWrittenBytes = decompressor->Ingest(deflated2.size());
|
||||||
|
//AuLogDbg("2> {} {}, {} {} {}", readWrittenBytes.first, readWrittenBytes.second, decompressor->GetAvailableProcessedBytes(), AuUInt32(deflated2.size()), AuUInt32(decompressLength));
|
||||||
|
//ASSERT_EQ(readWrittenBytes, AuMakePair(AuUInt32(deflated2.size()), AuUInt32(decompressLength)));
|
||||||
|
|
||||||
|
AuByteBuffer inflated(decompressLength);
|
||||||
|
ASSERT_TRUE(decompressor->Read(AuMemoryViewWrite(inflated.data(), inflated.size())));
|
||||||
|
|
||||||
|
decompressed.GetBuffer()->ResetReadPointer();
|
||||||
|
ASSERT_EQ(inflated, *decompressed.GetBuffer());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define ADD_TESTS_BASIC(algorithm) \
|
#define ADD_TESTS_BASIC(algorithm) \
|
||||||
|
@ -1 +1 @@
|
|||||||
Subproject commit 8a011b8a38b7748d1a3eaf1e99c8b859d754d060
|
Subproject commit 7735334a46da480a749945c0f645155d90d73855
|
2
Vendor/nlohmannjson
vendored
2
Vendor/nlohmannjson
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 3b16057ffaaed250cf207233f9238392ea0245ee
|
Subproject commit 09fb4819ff4b35a6ba5d391f87e4e44f25ad6789
|
Loading…
Reference in New Issue
Block a user