[+] IPC Logger: Added optional stream frame header (single u32) option

[*] FIX: Missing `this->isStream = isStream;`
[*] FIX: Blocking APIs under async objects under NT were using illegal calls that were for some reason not failing
This commit is contained in:
Reece Wilson 2022-07-05 20:35:40 +01:00
parent c20b802da7
commit 7fb73ccdb4
8 changed files with 108 additions and 43 deletions

View File

@ -54,7 +54,7 @@ namespace Aurora::Logging
/**
* @brief
*/
AUKN_SHARED_API(NewIPCSink, IIPCLogger, const AuString &path);
AUKN_SHARED_API(NewIPCSink, IIPCLogger, const AuString &path, bool lengthPrefixedStream = false);
/**
* @brief Constructs an in-memory ring buffer sink

View File

@ -194,12 +194,26 @@ namespace Aurora::IO::FS
return false;
}
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
DWORD read;
if (!::ReadFile(this->handle_->handle, parameters.ptr, parameters.length, &read, nullptr))
if (!::ReadFile(this->handle_->handle, parameters.ptr, parameters.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(this->handle_->handle, &a, &read, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
parameters.outVariable = read;
return true;
}
@ -215,12 +229,26 @@ namespace Aurora::IO::FS
return false;
}
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
DWORD read;
if (!::WriteFile(this->handle_->handle, parameters.ptr, parameters.length, &read, nullptr))
if (!::WriteFile(this->handle_->writeHandle, parameters.ptr, parameters.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(this->handle_->writeHandle, &a, &read, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
parameters.outVariable = read;
return true;
}
@ -257,8 +285,6 @@ namespace Aurora::IO::FS
{
auto transaction = reinterpret_cast<NtAsyncFileTransaction *>(reinterpret_cast<AuUInt8*>(lpOverlapped) - offsetof(NtAsyncFileTransaction, overlap_));
auto hold = AuExchange(transaction->pin_, {});
if (dwErrorCode)
{
hold->hasFailed = true;
@ -332,7 +358,7 @@ namespace Aurora::IO::FS
this->lastAbstractStat_ = memoryView->length;
this->lastAbstractOffset_ = offset;
this->lastBytes = 0;
this->overlap_.Offset = offset & 0xFFFFFFFF;
this->overlap_.OffsetHigh = (offset >> 32) & 0xFFFFFFFF;
@ -442,27 +468,19 @@ namespace Aurora::IO::FS
if (!completeRoutine)
{
if (this->GetOSErrorCode() == ERROR_BROKEN_PIPE)
{
auto pipe = this->ntIpcPipeImpl.lock();
DispatchCb(0);
if (pipe)
{
pipe->OnEndOfReadStream();
}
return true;
}
if ((this->hasFailed) ||
(this->lastBytes) ||
::GetOverlappedResult(this->handle_->handle, &this->overlap_, &read, false))
{
if (this->lastBytes || this->hasFailed)
{
return true;
}
this->lastAbstractStat_ = 0;
bool bLatched = this->latch_;
DispatchCb(read);
return this->hasFailed || read;
return read;
}
}
else

View File

@ -174,6 +174,7 @@ namespace Aurora::IO
this->asyncActive = false;
this->reader.parent = this;
this->writer.parent = this;
this->isStream = isStream;
return true;
}

View File

@ -44,18 +44,12 @@ namespace Aurora::IO
if (AuExchange(this->triggered, true))
{
StopWatch();
return true;
return false;
}
}
IOAlert(false);
if (this->singleshot)
{
StopWatch();
}
return false;
return this->singleshot;
}
void IOProcessorItem::OnTimeout(const AuSPtr<AuLoop::ILoopSource>& source)
@ -97,5 +91,11 @@ namespace Aurora::IO
SysPanic("Missing");
}
}
if (this->singleshot)
{
StopWatch();
}
}
}

View File

@ -195,9 +195,27 @@ namespace Aurora::IO::IPC
return true;
}
auto ret = ::ReadFile(h, write.ptr, size, &size, NULL);
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
if (!::ReadFile(h, write.ptr, size, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(h, &a, &size, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
write.outVariable = size;
return ret;
return true;
}
bool IPCPipeImpl::Write(const Memory::MemoryViewStreamRead &read)
@ -212,12 +230,25 @@ namespace Aurora::IO::IPC
TryConnect();
DWORD temp;
if (!::WriteFile(h, read.ptr, read.length, &temp, nullptr))
OVERLAPPED a {};
a.hEvent = CreateEventA(NULL, true, 0, NULL);
if (!::WriteFile(h, read.ptr, read.length, NULL, &a) &&
::GetLastError() != ERROR_IO_PENDING)
{
SysPushErrorIO();
SysPushErrorIO("{}", GetLastError());
::CloseHandle(a.hEvent);
return false;
}
::WaitForSingleObject(a.hEvent, 0);
if (!::GetOverlappedResult(h, &a, &temp, true))
{
::CloseHandle(a.hEvent);
return false;
}
::CloseHandle(a.hEvent);
read.outVariable = temp;
return true;
}

View File

@ -95,9 +95,9 @@ namespace Aurora::Logging
Sinks::NewFileSinkRelease(sink);
}
AUKN_SYM IIPCLogger *NewIPCSinkNew(const AuString &path)
AUKN_SYM IIPCLogger *NewIPCSinkNew(const AuString &path, bool lengthPrefixed)
{
return Sinks::NewIPCSinkNew(path);
return Sinks::NewIPCSinkNew(path, lengthPrefixed);
}
AUKN_SYM void NewIPCSinkRelease(IIPCLogger *sink)

View File

@ -10,7 +10,7 @@
namespace Aurora::Logging::Sinks
{
IPCSink::IPCSink(const AuSPtr<Aurora::IO::IPC::IPCPipe> &pipe) : pipe_(pipe)
IPCSink::IPCSink(const AuSPtr<Aurora::IO::IPC::IPCPipe> &pipe, bool lengthPrefixed) : pipe_(pipe), lengthPrefixed_(lengthPrefixed)
{
}
@ -33,9 +33,15 @@ namespace Aurora::Logging::Sinks
auto str = msg.ToPersistentString();
auto startOffset = this->logBuffer_.GetWriteOffset();
if (this->lengthPrefixed_)
{
if (this->logBuffer_.base == this->logBuffer_.writePtr)
{
this->logBuffer_.Write<AuUInt32>(0);
}
}
msg.Write(this->logBuffer_);
if (!this->logBuffer_)
{
@ -62,11 +68,19 @@ namespace Aurora::Logging::Sinks
return;
}
this->pipe_->Write(AuMemoryViewStreamRead(this->logBuffer_));
auto len = this->logBuffer_.writePtr - this->logBuffer_.base;
if (this->lengthPrefixed_)
{
this->logBuffer_.writePtr = this->logBuffer_.base;
this->logBuffer_.Write<AuUInt32>(len);
}
this->pipe_->Write(AuMemoryViewStreamRead(this->logBuffer_.base, this->logBuffer_.base + len));
this->logBuffer_.ResetPositions();
}
IIPCLogger *NewIPCSinkNew(const AuString &str)
IIPCLogger *NewIPCSinkNew(const AuString &str, bool lengthPrefixed)
{
auto pipe = AuIPC::ImportPipe(str);
if (!pipe)
@ -74,7 +88,7 @@ namespace Aurora::Logging::Sinks
return nullptr;
}
auto logger = _new IPCSink(pipe);
auto logger = _new IPCSink(pipe, lengthPrefixed);
if (!logger)
{
return nullptr;

View File

@ -11,7 +11,7 @@ namespace Aurora::Logging::Sinks
{
struct IPCSink : IIPCLogger
{
IPCSink(const AuSPtr<Aurora::IO::IPC::IPCPipe> &pipe);
IPCSink(const AuSPtr<Aurora::IO::IPC::IPCPipe> &pipe, bool lengthPrefixed);
AuSPtr<IO::IPC::IPCPipe> ToPipe() override;
@ -25,8 +25,9 @@ namespace Aurora::Logging::Sinks
AuSPtr<Aurora::IO::IPC::IPCPipe> pipe_;
AuByteBuffer logBuffer_;
AuThreadPrimitives::MutexUnique_t logMutex_;
bool lengthPrefixed_;
};
void NewIPCSinkRelease(IIPCLogger *logger);
IIPCLogger *NewIPCSinkNew(const AuString &str);
IIPCLogger *NewIPCSinkNew(const AuString &str, bool lengthPrefixed);
}