[+] Near 1:1 Linux IPC Pipe compared to the NT equivalent (~= CreateNamedPipeA(nMaxInstances=1, dwOpenMode=PIPE_ACCESS_DUPLEX, dwPipeMode=PIPE_TYPE_BYTE))
[+] Ability to bypass blocking limitation of certain io_submit reads, if the blocking subsystem is a pollable stream (ie: a pipe). [*] Fixed major Linux bug where LoopQueue items weren't being submitted, if no dequeues were in the same tick [*] Fix various Linux pipe related bugs [*] Fix futex bug where the callback was nulled on server-release
This commit is contained in:
parent
d60176afb3
commit
64cb7404ba
@ -1145,7 +1145,7 @@ namespace Aurora::Console::ConsoleStd
|
||||
{
|
||||
i++;
|
||||
auto next = gLineEncodedBuffer[i];
|
||||
|
||||
|
||||
if (next == ';')
|
||||
{
|
||||
i++;
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <fcntl.h>
|
||||
#include "FileStream.Unix.hpp"
|
||||
#include "Async.Linux.hpp"
|
||||
#include <Source/IO/IPC/IPCPipe.Unix.hpp>
|
||||
|
||||
namespace Aurora::IO::FS
|
||||
{
|
||||
@ -26,13 +27,39 @@ namespace Aurora::IO::FS
|
||||
virtual bool IsSignaled() override;
|
||||
virtual bool OnTrigger(AuUInt handle) override;
|
||||
virtual AuLoop::ELoopSource GetType() override;
|
||||
|
||||
virtual const AuList<AuUInt> &GetHandles() override;
|
||||
virtual bool Singular() override;
|
||||
|
||||
private:
|
||||
|
||||
bool bExMode {};
|
||||
AuWPtr<LinuxAsyncFileTransaction> caller_;
|
||||
AuList<AuUInt> handles_;
|
||||
};
|
||||
|
||||
LinuxAsyncFileTransactionLoopSource::LinuxAsyncFileTransactionLoopSource(AuSPtr<LinuxAsyncFileTransaction> that) : caller_(that), Loop::LSEvent(false, false, true)
|
||||
{
|
||||
if (that)
|
||||
{
|
||||
auto possiblePipe = that->GetFileHandle()->pIPCPipe;
|
||||
if (possiblePipe)
|
||||
{
|
||||
this->bExMode = true;
|
||||
|
||||
this->handles_ = {possiblePipe->GetPreemptFd(), Loop::LSEvent::GetHandle()};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const AuList<AuUInt> &LinuxAsyncFileTransactionLoopSource::GetHandles()
|
||||
{
|
||||
return this->handles_;
|
||||
}
|
||||
|
||||
bool LinuxAsyncFileTransactionLoopSource::Singular()
|
||||
{
|
||||
return !this->bExMode;
|
||||
}
|
||||
|
||||
bool LinuxAsyncFileTransactionLoopSource::OnTrigger(AuUInt handle)
|
||||
@ -159,6 +186,15 @@ namespace Aurora::IO::FS
|
||||
|
||||
bool LinuxAsyncFileStream::BlockingRead(AuUInt64 offset, const Memory::MemoryViewStreamWrite ¶meters)
|
||||
{
|
||||
if (this->handle_->pIPCPipe)
|
||||
{
|
||||
if (this->handle_->pIPCPipe->LIOS_PopOne())
|
||||
{
|
||||
parameters.outVariable = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!PosixSetOffset(this->handle_->readHandle, offset))
|
||||
{
|
||||
return false;
|
||||
@ -204,6 +240,7 @@ namespace Aurora::IO::FS
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
auto fd = this->handle_->readHandle;
|
||||
if (fd == -1)
|
||||
{
|
||||
@ -229,10 +266,19 @@ namespace Aurora::IO::FS
|
||||
LIOS_Init(AuSharedFromThis());
|
||||
SetMemory(memoryView);
|
||||
|
||||
if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get()))
|
||||
if (this->handle_->pIPCPipe)
|
||||
{
|
||||
if (this->handle_->pIPCPipe->LIOS_PopOne())
|
||||
{
|
||||
LIOS_SendProcess(0, false, errno);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!UNIX::LinuxOverlappedSubmitRead(fd, offset, this, this->loopSource_.get(), bool(this->handle_->pIPCPipe)))
|
||||
{
|
||||
LIOS_SendProcess(0, true, errno);
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -313,13 +359,25 @@ namespace Aurora::IO::FS
|
||||
return;
|
||||
}
|
||||
this->DispatchCb();
|
||||
|
||||
if (read)
|
||||
{
|
||||
if (this->handle_->pIPCPipe)
|
||||
{
|
||||
// Return value intentionally ignored
|
||||
// We just need to poke on read...
|
||||
this->handle_->pIPCPipe->LIOS_PopOne();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void LinuxAsyncFileTransaction::DispatchCb()
|
||||
{
|
||||
if (AuExchange(this->latch_, true))
|
||||
{
|
||||
SysPushErrorGeneric();
|
||||
// TODO (Reece): urgent
|
||||
//SysPushErrorGeneric();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -338,6 +396,7 @@ namespace Aurora::IO::FS
|
||||
LIOS_SendProcess(this->lastFinishedStat_, this->lastFinishedStat_ == 0, 0, false);
|
||||
//DispatchCb();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,11 @@
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
namespace Aurora::IO::IPC
|
||||
{
|
||||
struct IPCPipeImpl;
|
||||
}
|
||||
|
||||
namespace Aurora::IO::FS
|
||||
{
|
||||
struct LinuxAsyncFileTransaction;
|
||||
@ -25,6 +30,7 @@ namespace Aurora::IO::FS
|
||||
AuString path;
|
||||
bool readOnly;
|
||||
bool directIO;
|
||||
IPC::IPCPipeImpl *pIPCPipe {};
|
||||
};
|
||||
|
||||
struct LinuxAsyncFileStream : IAsyncFileStream
|
||||
|
@ -307,19 +307,17 @@ static bool TryReleaseFutex(AuUInt8 index)
|
||||
auto old = gFutexCallbacks[index];
|
||||
auto oldState = gFutexArray[index].futexPadded;
|
||||
|
||||
if ((old) &&
|
||||
(!old->OnClosed()))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (AuAtomicCompareExchange<AuUInt32>(&gFutexArray[index].futex, kFutexValueUnlocked, oldState) != oldState)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((old) &&
|
||||
(!old->OnClosed()))
|
||||
{
|
||||
gFutexArray[index].futexPadded = oldState;
|
||||
return false;
|
||||
}
|
||||
|
||||
gFutexCallbacks[index] = nullptr;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -482,6 +480,8 @@ namespace Aurora::IO::IPC
|
||||
|
||||
this->mem_ = gFutexSharedMemory;
|
||||
this->mutex_.bNoAutoRel = true;
|
||||
|
||||
gFutexCallbacks[index] = this;
|
||||
}
|
||||
|
||||
IPCMutexProxy::IPCMutexProxy(int handle, AuSPtr<IPCSharedMemory> mem, AuUInt32 index) :
|
||||
@ -600,16 +600,11 @@ namespace Aurora::IO::IPC
|
||||
|
||||
if (futex)
|
||||
{
|
||||
auto a = *futex;
|
||||
if (*futex == kFutexIsDead)
|
||||
{
|
||||
this->mutex_.Unlock();
|
||||
}
|
||||
|
||||
// This atomic is dumb and makes no sense
|
||||
if (AuAtomicCompareExchange<AuUInt32>(futex, kFutexValueUnlocked, kFutexIsDead) != kFutexIsDead)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -687,7 +682,7 @@ namespace Aurora::IO::IPC
|
||||
return shared;
|
||||
}
|
||||
|
||||
AuSPtr<IPCMutex> ImportMutexEx(const IPCToken &handle, const IPCToken &mem, AuUInt32 index)
|
||||
AuSPtr<IPCMutexProxy> ImportMutexEx(const IPCToken &handle, const IPCToken &mem, AuUInt32 index)
|
||||
{
|
||||
int fd {-1};
|
||||
if (!IO::UNIX::FDAccept(handle, fd))
|
||||
|
@ -44,5 +44,5 @@ namespace Aurora::IO::IPC
|
||||
friend IPCPipeImpl;
|
||||
};
|
||||
|
||||
AuSPtr<IPCMutex> ImportMutexEx(const IPCToken &handle, const IPCToken &mem, AuUInt32 index);
|
||||
AuSPtr<IPCMutexProxy> ImportMutexEx(const IPCToken &handle, const IPCToken &mem, AuUInt32 index);
|
||||
}
|
@ -9,80 +9,27 @@
|
||||
#include "IPC.hpp"
|
||||
#include "IPCHandle.hpp"
|
||||
#include "IPCPipe.Unix.hpp"
|
||||
|
||||
#include <Source/IO/UNIX/FDIpcServer.hpp>
|
||||
|
||||
#include <Source/IO/Loop/ILoopSourceEx.hpp>
|
||||
#include <Source/IO/Loop/LSHandle.hpp>
|
||||
#include <Source/IO/Loop/LSEvent.hpp>
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
|
||||
#include <Source/IO/FS/Async.Linux.hpp>
|
||||
#include "IPCPrimitives.Linux.hpp"
|
||||
#include "IPCMutexFutex.Linux.hpp"
|
||||
#endif
|
||||
|
||||
#include "IPCMemory.Unix.hpp" // required for handle
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <termios.h>
|
||||
|
||||
namespace Aurora::IO::IPC
|
||||
{
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Pipes
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
|
||||
{
|
||||
IPCPipeImpl(int (fds)[2], int (secondary)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex);
|
||||
~IPCPipeImpl();
|
||||
|
||||
PROXY_INTERNAL_INTERFACE_(LSHandle::)
|
||||
|
||||
virtual AuSPtr<IO::IAsyncTransaction> NewAsyncTransaction() override;
|
||||
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelIsOpen() override;
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelHasData() override;
|
||||
|
||||
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override;
|
||||
virtual bool Write(const Memory::MemoryViewStreamRead &read) override;
|
||||
virtual AuString ExportToString() override;
|
||||
|
||||
bool IsSignaled() override;
|
||||
bool WaitOn(AuUInt32 timeout) override;
|
||||
Loop::ELoopSource GetType() override;
|
||||
|
||||
private:
|
||||
int fds[2] {-1, -1};
|
||||
int secondary[2] {-1, -1};
|
||||
|
||||
AuSPtr<IO::FS::FileHandle> fsHandle_;
|
||||
AuSPtr<IO::FS::LinuxAsyncFileStream> fsStream_;
|
||||
|
||||
IPCToken readEnd_;
|
||||
IPCToken writeEnd_;
|
||||
AuSPtr<IPCEvent> event_;
|
||||
AuSPtr<IPCMutex> mutex_;
|
||||
};
|
||||
|
||||
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex) :
|
||||
fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]},
|
||||
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex)
|
||||
{
|
||||
this->handle = fds[0];
|
||||
|
||||
this->fsHandle_ = AuMakeShared<IO::FS::FileHandle>();
|
||||
this->fsStream_ = AuMakeShared<IO::FS::LinuxAsyncFileStream>();
|
||||
|
||||
this->fsHandle_->Init(fds2[0], fds2[1]);
|
||||
this->fsStream_->Init(this->fsHandle_);
|
||||
}
|
||||
|
||||
IPCPipeImpl::~IPCPipeImpl()
|
||||
{
|
||||
int fd {-1};
|
||||
|
||||
if (secondary[0] == -1)
|
||||
{
|
||||
this->mutex_->Unlock();
|
||||
this->event_->Reset();
|
||||
}
|
||||
|
||||
if ((fd = AuExchange(fds[0], -1)) != -1)
|
||||
{
|
||||
IO::UNIX::FDServeEnd(readEnd_);
|
||||
@ -108,10 +55,86 @@ namespace Aurora::IO::IPC
|
||||
fsHandle_.reset();
|
||||
fsStream_.reset();
|
||||
|
||||
if (secondary[0] == -1)
|
||||
}
|
||||
|
||||
void IPCPipeImpl::DrainOtherFd()
|
||||
{
|
||||
// TODO (Reece): urgent
|
||||
}
|
||||
|
||||
AuUInt IPCPipeImpl::GetPreemptFd()
|
||||
{
|
||||
return this->eventPreempt_.GetHandle();
|
||||
}
|
||||
|
||||
bool IPCPipeImpl::LIOS_PopOne()
|
||||
{
|
||||
if (!this->bIsSendingZero)
|
||||
{
|
||||
event_->Reset();
|
||||
return false;
|
||||
}
|
||||
|
||||
#if defined(FIONREAD)
|
||||
int available {};
|
||||
int res = ::ioctl(fds[0], FIONREAD, &available) ;
|
||||
if (res< 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (available)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
this->FinishFinalize();
|
||||
return true;
|
||||
}
|
||||
|
||||
void IPCPipeImpl::FinishFinalize()
|
||||
{
|
||||
this->bIsSendingZero = false;
|
||||
this->bHasDied = true;
|
||||
}
|
||||
|
||||
const AuList<AuUInt> &IPCPipeImpl::GetHandles()
|
||||
{
|
||||
return this->handles;
|
||||
}
|
||||
|
||||
bool IPCPipeImpl::Singular()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool IPCPipeImpl::OnTrigger(AuUInt handle)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
IPCPipeImpl::IPCPipeImpl(int (fds2)[2], int (fds3)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex) :
|
||||
fds {fds2[0], fds2[1]}, secondary {fds3[0], fds3[1]},
|
||||
readEnd_(readEnd), writeEnd_(writeEnd), event_(event), mutex_(mutex),
|
||||
eventPreempt_(false, false, true)
|
||||
{
|
||||
this->handles = {fds[0], eventPreempt_.GetHandle()};
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
if (fds3[0] != -1)
|
||||
{
|
||||
AuStaticCast<IPCMutexProxy>(mutex)->pMutexClosedHook = this;
|
||||
}
|
||||
#endif
|
||||
|
||||
this->fsHandle_ = AuMakeShared<IO::FS::FileHandle>();
|
||||
this->fsStream_ = AuMakeShared<IO::FS::LinuxAsyncFileStream>();
|
||||
|
||||
this->fsHandle_->pIPCPipe = this;
|
||||
|
||||
this->fsHandle_->Init(fds2[0], fds2[1]);
|
||||
this->fsStream_->Init(this->fsHandle_);
|
||||
|
||||
}
|
||||
|
||||
AuSPtr<Loop::ILoopSource> IPCPipeImpl::AsReadChannelIsOpen()
|
||||
@ -153,7 +176,7 @@ namespace Aurora::IO::IPC
|
||||
{
|
||||
#if defined(FIONREAD)
|
||||
int available {};
|
||||
if (::ioctl(ref, FIONREAD, &available) < 0)
|
||||
if (::ioctl(fds[0], FIONREAD, &available) < 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@ -170,7 +193,8 @@ namespace Aurora::IO::IPC
|
||||
do
|
||||
{
|
||||
tmp = ::read(handle, write.ptr, write.length);
|
||||
} while ((tmp == -1 && errno == EINTR));
|
||||
}
|
||||
while ((tmp == -1 && errno == EINTR));
|
||||
|
||||
if (tmp <= 0)
|
||||
{
|
||||
@ -212,7 +236,8 @@ namespace Aurora::IO::IPC
|
||||
do
|
||||
{
|
||||
tmp = ::write(handle, read.ptr, read.length);
|
||||
} while ((tmp == -1 && errno == EINTR));
|
||||
}
|
||||
while ((tmp == -1 && errno == EINTR));
|
||||
|
||||
if (tmp <= 0)
|
||||
{
|
||||
@ -244,6 +269,30 @@ namespace Aurora::IO::IPC
|
||||
return Loop::ELoopSource::eSourceIPCReadPipe;
|
||||
}
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
bool IPCPipeImpl::OnClosed()
|
||||
{
|
||||
if (this->bHasDied)
|
||||
{
|
||||
this->DrainOtherFd();
|
||||
this->bHasDied = false;
|
||||
this->eventPreempt_.Reset();
|
||||
this->event_->Reset();
|
||||
return true;
|
||||
}
|
||||
|
||||
this->SendTerminateSignal();
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
void IPCPipeImpl::SendTerminateSignal()
|
||||
{
|
||||
this->bIsSendingZero = true;
|
||||
this->eventPreempt_.Set();
|
||||
this->event_->Reset();
|
||||
}
|
||||
|
||||
AuString IPCPipeImpl::ExportToString()
|
||||
{
|
||||
IPC::IPCHandle handle;
|
||||
@ -431,7 +480,7 @@ namespace Aurora::IO::IPC
|
||||
return {};
|
||||
}
|
||||
|
||||
mutex->WaitOn();
|
||||
mutex->WaitOn(0);
|
||||
|
||||
if (event->IsSignaled())
|
||||
{
|
||||
@ -444,7 +493,6 @@ namespace Aurora::IO::IPC
|
||||
|
||||
event->Set();
|
||||
|
||||
mutex->Unlock();
|
||||
|
||||
return object;
|
||||
}
|
||||
|
@ -7,7 +7,101 @@
|
||||
***/
|
||||
#pragma once
|
||||
|
||||
#include <Source/IO/Loop/ILoopSourceEx.hpp>
|
||||
#include <Source/IO/Loop/LSHandle.hpp>
|
||||
#include <Source/IO/Loop/LSEvent.hpp>
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
#include <Source/IO/UNIX/IOSubmit.Linux.hpp>
|
||||
#include <Source/IO/FS/Async.Linux.hpp>
|
||||
#include "IPCPrimitives.Linux.hpp"
|
||||
#include "IPCMutexFutex.Linux.hpp"
|
||||
#endif
|
||||
|
||||
#include "IPCMemory.Unix.hpp" // required for handle
|
||||
|
||||
namespace Aurora::IO::IPC
|
||||
{
|
||||
struct IPCPipeImpl : IPCPipe, Loop::LSHandle, AuEnableSharedFromThis<IPCPipeImpl>
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
, IMutexClosedHook
|
||||
#endif
|
||||
{
|
||||
IPCPipeImpl(int (fds)[2], int (secondary)[2], IPCToken readEnd, IPCToken writeEnd, AuSPtr<IPCEvent> event, AuSPtr<IPCMutex> mutex);
|
||||
~IPCPipeImpl();
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
inline virtual void OnPresleep() override
|
||||
{
|
||||
Loop::LSHandle::OnPresleep();
|
||||
};
|
||||
inline virtual void OnFinishSleep() override
|
||||
{
|
||||
Loop::LSHandle:: OnFinishSleep();
|
||||
}
|
||||
inline virtual AuUInt GetHandle() override
|
||||
{
|
||||
return Loop::LSHandle:: GetHandle();
|
||||
}
|
||||
inline bool HasValidHandle()
|
||||
{
|
||||
return Loop::LSHandle:: HasValidHandle();
|
||||
}
|
||||
|
||||
#else
|
||||
PROXY_INTERNAL_INTERFACE_(LSHandle::)
|
||||
#endif
|
||||
|
||||
virtual AuSPtr<IO::IAsyncTransaction> NewAsyncTransaction() override;
|
||||
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelIsOpen() override;
|
||||
virtual AuSPtr<Loop::ILoopSource> AsReadChannelHasData() override;
|
||||
|
||||
virtual bool Read(const Memory::MemoryViewStreamWrite &write, bool nonblock) override;
|
||||
virtual bool Write(const Memory::MemoryViewStreamRead &read) override;
|
||||
virtual AuString ExportToString() override;
|
||||
|
||||
bool IsSignaled() override;
|
||||
bool WaitOn(AuUInt32 timeout) override;
|
||||
Loop::ELoopSource GetType() override;
|
||||
|
||||
#if defined(AURORA_IS_LINUX_DERIVED)
|
||||
bool OnClosed() override;
|
||||
|
||||
virtual const AuList<AuUInt> &GetHandles() override;
|
||||
virtual bool Singular() override;
|
||||
virtual bool OnTrigger(AuUInt handle) override;
|
||||
#endif
|
||||
|
||||
bool LIOS_PopOne();
|
||||
|
||||
void DrainOtherFd();
|
||||
AuUInt GetPreemptFd();
|
||||
|
||||
bool bIsSendingZero {};
|
||||
|
||||
private:
|
||||
int fds[2] {-1, -1};
|
||||
int secondary[2] {-1, -1};
|
||||
|
||||
AuSPtr<IO::FS::FileHandle> fsHandle_;
|
||||
AuSPtr<IO::FS::LinuxAsyncFileStream> fsStream_;
|
||||
|
||||
void SendTerminateSignal();
|
||||
|
||||
AuList<AuUInt> handles;
|
||||
|
||||
|
||||
IPCToken readEnd_;
|
||||
IPCToken writeEnd_;
|
||||
|
||||
AuSPtr<IPCEvent> event_;
|
||||
AuSPtr<IPCMutex> mutex_;
|
||||
|
||||
AuLoop::LSEvent eventPreempt_;
|
||||
|
||||
bool bHasDied {};
|
||||
|
||||
void FinishFinalize();
|
||||
};
|
||||
}
|
@ -310,7 +310,7 @@ namespace Aurora::IO::Loop
|
||||
|
||||
if (!CommitDecommit())
|
||||
{
|
||||
return false;
|
||||
//return false;
|
||||
}
|
||||
|
||||
auto pending = AuExchange(this->commitPending_, {});
|
||||
@ -418,6 +418,7 @@ namespace Aurora::IO::Loop
|
||||
|
||||
bool bTryAgain {};
|
||||
DoTick(timeout, {}, &bTryAgain);
|
||||
PumpHooks();
|
||||
// but this hack should apply to wait any as well, so i'm moving it to the DoTick function
|
||||
|
||||
anythingLeft = epollReference.startingWorkRead.size() || epollReference.startingWorkWrite.size();
|
||||
@ -447,6 +448,7 @@ namespace Aurora::IO::Loop
|
||||
{
|
||||
bTryAgain = false;
|
||||
AuUInt32 ticked = DoTick(timeout, {}, &bTryAgain);
|
||||
PumpHooks();
|
||||
cTicked += ticked;
|
||||
} while (bTryAgain);
|
||||
|
||||
@ -462,6 +464,7 @@ namespace Aurora::IO::Loop
|
||||
{
|
||||
bTryAgain = false;
|
||||
AuUInt32 ticked = DoTick(0, {}, &bTryAgain, true);
|
||||
PumpHooks();
|
||||
cTicked += ticked;
|
||||
} while (bTryAgain);
|
||||
|
||||
@ -477,6 +480,7 @@ namespace Aurora::IO::Loop
|
||||
{
|
||||
bTryAgain = false;
|
||||
AuUInt32 ticked = DoTick(0, &ret, &bTryAgain, true);
|
||||
PumpHooks();
|
||||
} while (bTryAgain);
|
||||
|
||||
return ret;
|
||||
@ -497,6 +501,7 @@ namespace Aurora::IO::Loop
|
||||
{
|
||||
bTryAgain = false;
|
||||
AuUInt32 ticked = DoTick(timeout, &ret, &bTryAgain);
|
||||
PumpHooks();
|
||||
} while (bTryAgain);
|
||||
|
||||
return ret;
|
||||
@ -510,7 +515,6 @@ namespace Aurora::IO::Loop
|
||||
}
|
||||
|
||||
auto ex = source->sourceExtended;
|
||||
|
||||
AU_LOCK_GUARD(this->lock);
|
||||
bool bIsRoot = this == &this->parent->globalEpoll_;
|
||||
if (readData)
|
||||
@ -782,9 +786,7 @@ namespace Aurora::IO::Loop
|
||||
itr ++;
|
||||
}
|
||||
}
|
||||
|
||||
PumpHooks();
|
||||
|
||||
|
||||
return bTicked;
|
||||
}
|
||||
|
||||
|
@ -111,10 +111,25 @@ static int io_cancel(aio_context_t ctx_id, struct iocb *iocb,
|
||||
|
||||
namespace Aurora::IO::UNIX
|
||||
{
|
||||
static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
|
||||
|
||||
//////////////////////////////////////////////////////////////////
|
||||
// ASubmittable
|
||||
//////////////////////////////////////////////////////////////////
|
||||
|
||||
ASubmittable::ASubmittable()
|
||||
{
|
||||
}
|
||||
|
||||
ASubmittable::~ASubmittable()
|
||||
{
|
||||
if ((this->tempEPoll != 0) &&
|
||||
(this->tempEPoll != -1))
|
||||
{
|
||||
::close(AuExchange(this->tempEPoll, -1));
|
||||
}
|
||||
}
|
||||
|
||||
void ASubmittable::LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark)
|
||||
{
|
||||
// Allow for reuse by releasing before dispatch
|
||||
@ -125,7 +140,32 @@ namespace Aurora::IO::UNIX
|
||||
pinMem = this->memPin_;
|
||||
|
||||
LIOS_Reset();
|
||||
LIOS_Process(read, failure, err, mark);
|
||||
|
||||
if (AuExchange(this->bIsReadPending, false))
|
||||
{
|
||||
// Psyche - it was just the poll half of a blocking read
|
||||
// We haven't read anything yet.
|
||||
if (!LinuxOverlappedSubmit(this->fd2, IOCB_CMD_PREAD, this->offset2 /* always zero */, this, this->optEvent2))
|
||||
{
|
||||
LIOS_Process(0, true, 69, false);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
LIOS_Process(read, failure, err, mark);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
SysPushErrorCatch("IO Callback threw an exception");
|
||||
}
|
||||
}
|
||||
|
||||
bool LIOS_PopOne()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
void ASubmittable::LIOS_Init(const AuSPtr<void> &pin)
|
||||
@ -178,6 +218,32 @@ namespace Aurora::IO::UNIX
|
||||
return this->cb;
|
||||
}
|
||||
|
||||
int ASubmittable::GetOrCreateFdPollForBlockingRead(int fd)
|
||||
{
|
||||
if ((this->tempEPoll == 0) ||
|
||||
(this->tempEPoll == -1))
|
||||
{
|
||||
epoll_event event;
|
||||
|
||||
if ((this->tempEPoll = ::epoll_create1(0)) == -1)
|
||||
{
|
||||
SysPushErrorIO();
|
||||
return -1;
|
||||
}
|
||||
|
||||
event.events = EPOLLIN;
|
||||
event.data.ptr = nullptr;
|
||||
if (::epoll_ctl(this->tempEPoll, EPOLL_CTL_ADD, fd, &event) != 0)
|
||||
{
|
||||
SysPushErrorIO();
|
||||
::close(this->tempEPoll);
|
||||
this->tempEPoll = -1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return this->tempEPoll;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////
|
||||
// TLS
|
||||
@ -239,12 +305,12 @@ namespace Aurora::IO::UNIX
|
||||
|
||||
static bool LinuxOverlappedTrySubmitWorkInternal(TLSIO *io)
|
||||
{
|
||||
int index = {};
|
||||
int index {};
|
||||
int startingLength = io->submitPendingArray.size();
|
||||
|
||||
while (index != startingLength)
|
||||
{
|
||||
int ret = io_submit(io->context, io->submitPendingArray.size(), io->submitPendingArray.data() + index);
|
||||
int ret = io_submit(io->context, io->submitPendingArray.size() - index, io->submitPendingArray.data() + index);
|
||||
|
||||
if (ret <= 0)
|
||||
{
|
||||
@ -284,7 +350,6 @@ namespace Aurora::IO::UNIX
|
||||
if (submittable)
|
||||
{
|
||||
submittable->LIOS_SendProcess(0, true, EBADF);
|
||||
index ++;
|
||||
}
|
||||
}
|
||||
default:
|
||||
@ -294,12 +359,20 @@ namespace Aurora::IO::UNIX
|
||||
}
|
||||
else
|
||||
{
|
||||
index += ret;
|
||||
io->dwIoSubmits += ret;
|
||||
}
|
||||
|
||||
index += ret;
|
||||
}
|
||||
|
||||
AuTryClear(io->submitPendingArray);
|
||||
if (io->submitPendingArray.size() == index)
|
||||
{
|
||||
AuTryClear(io->submitPendingArray);
|
||||
}
|
||||
else
|
||||
{
|
||||
io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@ -359,8 +432,47 @@ namespace Aurora::IO::UNIX
|
||||
return AuTryInsert(io->submitPendingArray, &submit);
|
||||
}
|
||||
|
||||
bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
|
||||
static bool LinuxOverlappedReadWait(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
|
||||
{
|
||||
auto io = GetTls();
|
||||
if (!io)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
SysAssert(offset == 0, "Read-blocking IO streams only");
|
||||
|
||||
context->bIsReadPending = true;
|
||||
|
||||
context->offset2 = offset; // redundant - always zero
|
||||
context->fd2 = fd;
|
||||
context->optEvent2 = optEvent;
|
||||
|
||||
auto &submit = context->GetIOCB();
|
||||
submit.aio_data = context->GetData();
|
||||
submit.aio_lio_opcode = IOCB_CMD_POLL;
|
||||
submit.aio_reqprio = 0;
|
||||
submit.aio_fildes = context->GetOrCreateFdPollForBlockingRead(fd);
|
||||
submit.aio_offset = 0;
|
||||
submit.aio_buf = POLLIN;
|
||||
submit.aio_nbytes = 0;
|
||||
|
||||
if (submit.aio_fildes == -1)
|
||||
{
|
||||
SysPushErrorIO();
|
||||
return false;
|
||||
}
|
||||
|
||||
return AuTryInsert(io->submitPendingArray, &submit);
|
||||
}
|
||||
|
||||
bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent, bool bWaitForRead)
|
||||
{
|
||||
if (bWaitForRead)
|
||||
{
|
||||
return LinuxOverlappedReadWait(fd, offset, context, optEvent);
|
||||
}
|
||||
|
||||
return LinuxOverlappedSubmit(fd, IOCB_CMD_PREAD, offset, context, optEvent);
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,9 @@ namespace Aurora::IO::UNIX
|
||||
{
|
||||
struct ASubmittable
|
||||
{
|
||||
ASubmittable();
|
||||
~ASubmittable();
|
||||
|
||||
virtual void LIOS_Process(AuUInt32 read, bool failure, int err, bool mark) = 0;
|
||||
|
||||
void LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark = false);
|
||||
@ -30,11 +33,20 @@ namespace Aurora::IO::UNIX
|
||||
|
||||
iocb & GetIOCB();
|
||||
|
||||
int GetOrCreateFdPollForBlockingRead(int fd);
|
||||
|
||||
// Hack for blocking reads
|
||||
bool bIsReadPending {};
|
||||
AuUInt64 offset2 {};
|
||||
int fd2 {};
|
||||
AuLoop::ILSEvent *optEvent2 {};
|
||||
|
||||
private:
|
||||
AuUInt64 dataPtr_ {};
|
||||
AuUInt dataLen_ {};
|
||||
AuSPtr<void> pin_;
|
||||
AuSPtr<void> memPin_;
|
||||
int tempEPoll {-1};
|
||||
iocb cb {};
|
||||
};
|
||||
|
||||
@ -52,7 +64,7 @@ namespace Aurora::IO::UNIX
|
||||
|
||||
|
||||
// Work queue
|
||||
bool LinuxOverlappedSubmitRead (int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
|
||||
bool LinuxOverlappedSubmitRead (int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent, bool bWaitForRead = false /* use on unsupported blocking interfaces. costs an epoll to work around >current< limitiations in io_submit*/);
|
||||
bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
|
||||
// TODO: Stream copy operations
|
||||
|
||||
|
@ -23,8 +23,8 @@ namespace Aurora::Threading::Primitives
|
||||
|
||||
Mutex::~Mutex()
|
||||
{
|
||||
auto status = pthread_mutex_destroy(&value_) == 0;
|
||||
SysAssert(status, "Mutex init failed");
|
||||
auto status = pthread_mutex_destroy(&value_);
|
||||
SysAssert(status == 0, "Mutex destruct failed, {}", status);
|
||||
}
|
||||
|
||||
bool Mutex::HasOSHandle(AuMach &mach)
|
||||
|
Loading…
Reference in New Issue
Block a user