[+] IPC: Mutex + Pipe | (manual) cross-process tests

[+] Test 25: address space
[*] Update IO test to use proper allocation alignment
[*] Refactor compression and system
[*] Update Aurora Runtime
[*] Update ROXTL
[*] Update Googletest
This commit is contained in:
Reece Wilson 2022-08-21 04:51:16 +01:00
parent 7257cf8df1
commit ebd25e85f9
12 changed files with 498 additions and 23 deletions

@ -1 +1 @@
Subproject commit f81f99f45ea1d1ecde580b44fd4b35d21f9a2e41
Subproject commit 85c85d1a1beec3ff0a7d40b3524201b02f1e3d63

@ -1 +1 @@
Subproject commit b8bcab1bdca221c305b420f30700a30045f91aed
Subproject commit 1341f805b0bd14e85d9c74d75c6cfb722d7c42c6

View File

@ -43,20 +43,20 @@ static void PrintCPUSpecs()
static void PrintRamSpecs()
{
auto val = AuHwInfo::GetMemStatProcessBlamed().value_or(AuHwInfo::RamStat {});
AuLogInfo("RamInfo Private Allocation: {}/{}", val.used, val.available);
AuLogInfo("RamInfo Private Allocation: {}/{}", val.qwUsed, val.qwAvailable);
val = AuHwInfo::GetMemStatProcess().value_or(AuHwInfo::RamStat {});
AuLogInfo("RamInfo Address Space: {}/{}", val.used, val.available);
AuLogInfo("RamInfo Address Space: {}/{}", val.qwUsed, val.qwAvailable);
val = AuHwInfo::GetMemStatSystem().value_or(AuHwInfo::RamStat {});
AuLogInfo("RamInfo System Commit Charge: {}/{}", val.used, val.available);
AuLogInfo("RamInfo System Commit Charge: {}/{}", val.qwUsed, val.qwAvailable);
AuLogInfo(" if high compared to phys, expand page file");
val = AuHwInfo::GetMemStatStartup().value_or(AuHwInfo::RamStat {});
AuLogInfo("RamInfo Startup Commit: {}/{}", val.used, val.available);
AuLogInfo("RamInfo Startup Commit: {}/{}", val.qwUsed, val.qwAvailable);
val = AuHwInfo::GetMemStatPhysical().value_or(AuHwInfo::RamStat {});
AuLogInfo("RamInfo Physical RAM: {}/{}", val.used, val.available);
AuLogInfo("RamInfo Physical RAM: {}/{}", val.qwUsed, val.qwAvailable);
AuLogInfo("");
}

View File

@ -9,7 +9,6 @@
#include <AuroraRuntime.hpp>
#include <gtest/gtest.h>
TEST(Echo, HelloReplacedGlobalLogger)
{
auto logger = AuLog::NewLoggerShared(AuList<AuSPtr<AuLog::IBasicSink>>{
@ -92,7 +91,7 @@ TEST(Echo, RingBuffer)
* 1000 / 0.00022505837 = 4'443'291 entries per second
* at 225ns per formatted LogInfo
*/
TEST(Echo, BenchmarckRingbuffer)
TEST(Echo, BenchmarkRingbuffer)
{
AuList<AuSPtr<AuLog::IBasicSink>> sinks;
@ -118,7 +117,7 @@ TEST(Echo, BenchmarckRingbuffer)
// terminal hosts, character devices, and so on to keep up
// Benchmark data is unreliable, though the relevant profiled user-code seems to be limited to a spinlock
#if 0
TEST(Echo, BenchmarckMain)
TEST(Echo, BenchmarkMain)
{
{
SysBenchmark("Push a boat-load of messages to a stdout");
@ -140,7 +139,7 @@ TEST(Echo, BenchmarckMain)
* 1000 / 0.000206834 = 4'834'795 entries per second
* at 206.834ns per formatted AuLogInfo
*/
TEST(Echo, BenchmarckLite)
TEST(Echo, BenchmarkLite)
{
{
SysBenchmark("Push a few messages to a stdout and friends");

View File

@ -132,7 +132,7 @@ TEST(IO, Proceessor_FailAbort)
TEST(IO, ReadFileThroughPipeProcessor2MBReadsEQ)
{
// Write 20MB of RNG asynchronously
AuByteBuffer rngbuffer(512 * 1024 * 40);
AuByteBuffer rngbuffer(512 * 1024 * 40, AuUInt(AuHwInfo::GetPageSize()));
{
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
ASSERT_TRUE(bool(stream));
@ -142,9 +142,10 @@ TEST(IO, ReadFileThroughPipeProcessor2MBReadsEQ)
auto transaction = stream->NewTransaction();
ASSERT_TRUE(bool(transaction));
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
{
AuLogDbg("AIO 1 callback: {} {}", offset, length);
AuLogDbg("AIO 1 callback: {} {} {}", offset, length, transaction->GetOSErrorCode());
SysAssert(length == 512 * 1024 * 40);
}));
AuMemoryViewRead readView(rngbuffer);
@ -244,7 +245,7 @@ TEST(IO, ReadFileThroughPipeProcessor2MBReadsEQ)
TEST(IO, ReadFileThroughPipeProcessorDefaultConfigEQ)
{
// Write 20MB of RNG asynchronously
AuByteBuffer rngbuffer(512 * 1024 * 40);
AuByteBuffer rngbuffer(512 * 1024 * 40, AuUInt(AuHwInfo::GetPageSize()));
{
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
ASSERT_TRUE(bool(stream));
@ -254,7 +255,7 @@ TEST(IO, ReadFileThroughPipeProcessorDefaultConfigEQ)
auto transaction = stream->NewTransaction();
ASSERT_TRUE(bool(transaction));
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
{
AuLogDbg("AIO 1 callback: {} {}", offset, length);
}));
@ -352,15 +353,16 @@ static void ReadFileThroughPipeProcessorCongested(int i)
auto stream = AuIOFS::OpenAsyncUnique("./AsyncFilePIPE", AuIOFS::EFileOpenMode::eReadWrite, true);
ASSERT_TRUE(bool(stream));
AuByteBuffer rngbuffer(512 * 1024 * 40);
AuByteBuffer rngbuffer(512 * 1024 * 40, AuUInt(AuHwInfo::GetPageSize()));
AuRng::RngFillRange(rngbuffer);
auto transaction = stream->NewTransaction();
ASSERT_TRUE(bool(transaction));
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
transaction->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([=](AuUInt64 offset, AuUInt32 length)
{
AuLogDbg("AIO 1 callback: {} {}", offset, length);
SysAssert(length == 512 * 1024 * 40);
}));
AuMemoryViewRead readView(rngbuffer);

View File

@ -0,0 +1,153 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Main.cpp
Date: 2022-2-18
Author: Reece
***/
#include <AuroraRuntime.hpp>
#include <gtest/gtest.h>
TEST(Map, FSNoLock)
{
int size = 4096 * 4;
AuByteBuffer rngbuffer(size);
{
auto fsInit = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eWrite);
ASSERT_TRUE(bool(fsInit));
AuRng::RngFillRange(rngbuffer);
fsInit->Write(AuMemoryViewStreamRead(rngbuffer));
}
{
auto fsMap = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eReadWrite, AuFS::EFileAdvisoryLockLevel::eNoSafety);
ASSERT_TRUE(bool(fsMap));
auto shared = AuProcess::GetGlobalProcessSpace();
SysAssert(shared);
auto obj = shared->MapFileByObject(fsMap, 0, size, AuIOFS::EFileOpenMode::eRead, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(obj));
ASSERT_TRUE(memcmp(obj->GetBasePointer(), rngbuffer.base, size) == 0);
}
}
TEST(Map, FSNestedReadLock)
{
int size = 4096 * 4;
AuByteBuffer rngbuffer(size);
{
auto fsInit = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eWrite);
ASSERT_TRUE(bool(fsInit));
AuRng::RngFillRange(rngbuffer);
fsInit->Write(AuMemoryViewStreamRead(rngbuffer));
}
{
auto fsMap = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eReadWrite, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(fsMap));
auto shared = AuProcess::GetGlobalProcessSpace();
SysAssert(shared);
auto obj = shared->MapFileByObject(fsMap, 0, size, AuIOFS::EFileOpenMode::eRead, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(obj));
ASSERT_TRUE(memcmp(obj->GetBasePointer(), rngbuffer.base, size) == 0);
}
}
TEST(Map, FSNestedReadLock2)
{
int size = 4096 * 4;
AuByteBuffer rngbuffer(size);
{
auto fsInit = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eWrite);
ASSERT_TRUE(bool(fsInit));
AuRng::RngFillRange(rngbuffer);
fsInit->Write(AuMemoryViewStreamRead(rngbuffer));
}
{
auto fsMap = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eRead, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(fsMap));
auto shared = AuProcess::GetGlobalProcessSpace();
SysAssert(shared);
auto obj = shared->MapFileByObject(fsMap, 0, size, AuIOFS::EFileOpenMode::eRead, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(obj));
ASSERT_TRUE(memcmp(obj->GetBasePointer(), rngbuffer.base, size) == 0);
}
}
TEST(Map, FSNestedReadLockFail1)
{
int size = 4096 * 4;
AuByteBuffer rngbuffer(size);
{
auto fsInit = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eWrite);
ASSERT_TRUE(bool(fsInit));
AuRng::RngFillRange(rngbuffer);
fsInit->Write(AuMemoryViewStreamRead(rngbuffer));
}
{
auto fsMap = AuIOFS::OpenShared("./FSMap", AuIOFS::EFileOpenMode::eRead, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(bool(fsMap));
auto shared = AuProcess::GetGlobalProcessSpace();
SysAssert(shared);
auto obj = shared->MapFileByObject(fsMap, 0, size, AuIOFS::EFileOpenMode::eWrite, AuFS::EFileAdvisoryLockLevel::eBlockWrite);
ASSERT_TRUE(!bool(obj));
}
}
TEST(Map, IPC)
{
static const AuString kHelloWorld = "Hello IPC";
auto memory = AuIPC::NewSharedMemory(4096);
ASSERT_TRUE(bool(memory));
AuMemcpy(memory->GetMemory().ptr, kHelloWorld.c_str(), kHelloWorld.size() + 1);
auto handleString = memory->ExportToString();
ASSERT_TRUE(bool(handleString.size()));
AuLogDbg("Exported shared view handle: {}", handleString);
{
auto shared = AuProcess::GetGlobalProcessSpace();
SysAssert(shared);
auto obj = shared->MapIPCMemory(handleString, 0, 4096, AuIOFS::EFileOpenMode::eWrite);
ASSERT_TRUE(bool(obj));
AuLogDbg("Shared Memory String: {}", (char *)obj->GetBasePointer());
ASSERT_TRUE(memcmp(obj->GetBasePointer(), obj->GetBasePointer(), kHelloWorld.size() - 1) == 0);
}
}
void RunTests()
{
Aurora::RuntimeStartInfo info;
info.console.fio.enableLogging = false;
info.console.asyncVSLog = false;
Aurora::RuntimeStart(info);
}

View File

@ -101,8 +101,8 @@ static void TestBasicCompression(AuCompression::ECompressionType type)
AuCompression::CompressionInfo info;
info.type = type;
pipe.inPipe = AuUnsafeRaiiToShared(&reader);
pipe.writePipe = AuUnsafeRaiiToShared(&compressed);
pipe.pReadPipe = AuUnsafeRaiiToShared(&reader);
pipe.pWritePipe = AuUnsafeRaiiToShared(&compressed);
info.compressionLevel = 4;
ASSERT_TRUE(AuCompression::Compress(pipe, info));
@ -115,8 +115,8 @@ static void TestBasicCompression(AuCompression::ECompressionType type)
AuIO::Buffered::BlobReader reader(compressed.GetBuffer());
AuCompression::CompressionPipe pipe;
pipe.inPipe = AuUnsafeRaiiToShared(&reader);
pipe.writePipe = AuUnsafeRaiiToShared(&decompressed);
pipe.pReadPipe = AuUnsafeRaiiToShared(&reader);
pipe.pWritePipe = AuUnsafeRaiiToShared(&decompressed);
ASSERT_TRUE(AuCompression::Decompress(pipe, {type, 1024 * 64 * 2}));
}

View File

@ -0,0 +1,29 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Main.cpp
Date: 2022-2-18
Author: Reece
***/
#include <AuroraRuntime.hpp>
static void NewHandle()
{
static auto mutex = AuIO::IPC::NewMutex();
AuLogDbg("Got mutex: {}", mutex->ExportToString());
}
void RunTests()
{
Aurora::RuntimeStartInfo info;
info.console.fio.enableLogging = false;
Aurora::RuntimeStart(info);
NewHandle();
while (true)
{
AuThreading::Sleep(20000);
}
}

View File

@ -0,0 +1,145 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Main.cpp
Date: 2022-2-18
Author: Reece
***/
#include <AuroraRuntime.hpp>
static AuByteBuffer outBuffer;
static void doAcceptTick(const AuSPtr<AuIOIPC::IPCPipe> &pipe, const AuSPtr<AuIO::IIOProcessor> &ioDrivenQueue)
{
ioDrivenQueue->StartSimpleLSWatchEx(pipe->AsReadChannelIsOpen(), AuMakeShared<AuIO::IIOSimpleEventListenerFunctional>([&]
{
AuLogDbg("TICK");
auto transaction = pipe->NewAsyncTransaction();
// Create an event listener for the callbacks
auto listener = AuMakeShared<AuIO::IIOPipeEventListenerFunctional>();
SysAssert(bool(listener));
listener->OnPipeSuccessEventFunctional = [&]()
{
AuLogInfo("File IO pipe complete");
doAcceptTick(pipe, ioDrivenQueue);
};
listener->OnPipeFailureEventFunctional = []()
{
AuLogInfo("File IO pipe failure");
};
listener->OnPipePartialEventFunctional = [](AuUInt tranferred)
{
AuLogInfo("on part: {}", tranferred);
};
// Setup the IOPipeRequest
AuIO::IOPipeRequestAIO aio;
aio.asyncTransaction = transaction;
aio.listener = listener;
aio.isStream = true;
aio.output.type = AuIO::EPipeCallbackType::eTryHandleBufferedPart;
auto onData = AuMakeShared<AuIO::IIOBufferedStreamAvailableFunctional>();
onData->OnDataAvailableFunctional = [&](AuByteBuffer &view)
{
AuLogDbg("Pipe processing: {}", view.RemainingBytes());
outBuffer.Write(view.readPtr, view.RemainingBytes());
view.Skip(view.RemainingBytes());
return true;
};
aio.output.handleBufferedStream.onData = onData;
auto filePipe = ioDrivenQueue->ToPipeProcessor()->NewAIOPipe(aio);
SysAssert(bool(filePipe));
SysAssert(filePipe->Start());
AuLogDbg("TICK");
},
[&]
{
AuLogDbg("FAIL");
},
[&]
{
AuLogDbg("OK");
}), true);
}
static void NewHandle()
{
auto pipe = AuIPC::NewPipe();
SysAssert(bool(pipe));
SysAssert(!pipe->AsReadChannelIsOpen()->IsSignaled());
auto handleString = pipe->ExportToString();
SysAssert(bool(handleString.size()));
AuLogDbg("Exported pipe handle: {}", handleString);
static const AuString kHelloWorldClient = "Hello Client";
static const AuString kHelloWorldServer = "Hello Server";
{
// Create the io processor
auto ioDrivenQueue = AuIO::NewIOProcessorNoQueue(false);
SysAssert(bool(ioDrivenQueue));
#if 0
// Finally create the damn pipe
for (int i = 0; i < 5; i++)
{
AuLogInfo("Iteration: {}/5", i + 1);
doAcceptTick();
// Main loop
while (ioDrivenQueue->HasItems())
{
// Since we aren't using an external loop queue, we manually pump the processor in here under this spinloop
// If we were using a loop queue, we wouldn't need to do anything at all. You could still technically
// use ioDrivenQueue->ToQueue()->WaitAny() to trigger the tick.
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
}
}
#endif
doAcceptTick(pipe, ioDrivenQueue);
while (ioDrivenQueue->HasItems())
{
// Since we aren't using an external loop queue, we manually pump the processor in here under this spinloop
// If we were using a loop queue, we wouldn't need to do anything at all. You could still technically
// use ioDrivenQueue->ToQueue()->WaitAny() to trigger the tick.
ioDrivenQueue->RunTick(); // also: TryTick and ManualTick
}
}
}
void RunTests()
{
Aurora::RuntimeStartInfo info;
info.console.fio.enableLogging = false;
Aurora::RuntimeStart(info);
NewHandle();
while (true)
{
AuThreading::Sleep(20000);
}
}

View File

@ -0,0 +1,56 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Main.cpp
Date: 2022-2-18
Author: Reece
***/
#include <AuroraRuntime.hpp>
static void TestPath(const AuString &path)
{
auto mutex = AuIO::IPC::ImportMutex(path);
AuLogDbg("Got mutex: {}", fmt::ptr(mutex.get()));
mutex->WaitOn(0);
AuThreading::Sleep(2000);
AuLogDbg("Exiting");
}
static void Read()
{
AuLogInfo("Awaiting input (10s)");
auto loop = AuLoop::NewLoopQueue();
auto source = AuConsole::StdInBufferLoopSource();
SysAssert(loop->SourceAdd(source));
SysAssert(loop->Commit());
if (loop->WaitAll(10 * 1000))
{
char binary[512];
int length = AuConsole::ReadStdIn(binary, AuArraySize(binary));
AuString path(binary, binary + length - AuLocale::NewLine().size());
SysAssert(!source->IsSignaled());
SysAssert(!source->WaitOn(100));
TestPath(path);
}
else
{
AuLogInfo("Got nothing");
}
}
void RunTests()
{
Aurora::RuntimeStartInfo info;
info.console.enableStdPassthrough = true; // for ReadStdIn
info.console.enableStdOut = false; // give back stdout to the logger
info.console.fio.enableLogging = false;
Aurora::RuntimeStart(info);
Read();
}

View File

@ -0,0 +1,91 @@
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: Main.cpp
Date: 2022-2-18
Author: Reece
***/
#include <AuroraRuntime.hpp>
static void TestPath(const AuString &handleString)
{
auto pipeImported = AuIPC::ImportPipe(handleString);
SysAssert(bool(pipeImported));
new decltype(pipeImported)(pipeImported); // Leak :S
//SysAssert(pipeImported->AsReadChannelIsOpen()->IsSignaled());
static const AuString kHelloWorldServer = "Hello Server";
auto transactionB = pipeImported->NewAsyncTransaction();
// Set callback
transactionB->SetCallback(AuMakeShared<AuIO::IAsyncFinishedSubscriberFunctional>([](AuUInt64 offset, AuUInt32 length)
{
AuLogDbg("IPC client callback: {} {}", offset, length);
}));
//
AuByteBuffer writeBuffer(512);
AuMemoryViewRead writeView(writeBuffer);
AuRng::RngFillRange(writeBuffer);
SysAssert(transactionB->StartWrite(0, AuUnsafeRaiiToShared(&writeView)));
// Create loop to sync against the two outstanding IO requests
auto loop = AuLoop::NewLoopQueue();
// Add initial loop sources
SysAssert(loop->SourceAdd(transactionB->NewLoopSource()));
SysAssert(loop->Commit());
// Wait for 100 MS
SysAssert(loop->WaitAll(100));
//SysAssert(writeBuffer == readBuffer);
// Reset client pipe
transactionB.reset();
}
static void Read()
{
AuLogInfo("Awaiting input (10s)");
auto loop = AuLoop::NewLoopQueue();
auto source = AuConsole::StdInBufferLoopSource();
SysAssert(loop->SourceAdd(source));
SysAssert(loop->Commit());
if (loop->WaitAll(10 * 1000))
{
char binary[512];
int length = AuConsole::ReadStdIn(binary, AuArraySize(binary));
AuString path(binary, binary + length - AuLocale::NewLine().size());
SysAssert(!source->IsSignaled());
SysAssert(!source->WaitOn(100));
TestPath(path);
}
else
{
AuLogInfo("Got nothing");
}
}
void RunTests()
{
Aurora::RuntimeStartInfo info;
info.console.enableStdPassthrough = true; // for ReadStdIn
info.console.enableStdOut = false; // give back stdout to the logger
info.console.fio.enableLogging = false;
Aurora::RuntimeStart(info);
Read();
}

@ -1 +1 @@
Subproject commit 7735334a46da480a749945c0f645155d90d73855
Subproject commit 7274ec186442c96e4c7dcc99c684a03e5db2ff48