AuroraRuntime/Source/IO/UNIX/IOSubmit.Linux.cpp

1038 lines
26 KiB
C++

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSubmit.Linux.cpp
Date: 2022-4-12
Author: Reece
Note: I want APCs, overlapped, and event based IO on Linux. Gib.
***/
#include <Source/RuntimeInternal.hpp>
#include "UnixIO.hpp"
#include <linux/aio_abi.h>
#include <sys/syscall.h>
#include <sys/epoll.h>
#include <poll.h>
#include <arpa/inet.h>
#include "IOSubmit.Linux.hpp"
#include <Source/Time/Time.hpp>
#include <Source/IO/Loop/Loop.hpp>
#include <Source/IO/Loop/LSEvent.hpp>
namespace Aurora::IO::UNIX
{
static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent);
static bool LinuxOverlappedCancel(ASubmittable *context);
static void IoTlsRedo();
//////////////////////////////////////////////////////////////////
// ASubmittable
//////////////////////////////////////////////////////////////////
ASubmittable::ASubmittable()
{
}
ASubmittable::~ASubmittable()
{
if ((this->tempEPoll != 0) &&
(this->tempEPoll != -1))
{
::close(AuExchange(this->tempEPoll, -1));
}
}
void ASubmittable::LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark)
{
// Allow for reuse by releasing before dispatch
// ...holding "this" ownership with a copy of the ref counter on the stack
AuSPtr<void> pinSelf, pinMem;
pinSelf = this->pin_;
pinMem = this->memPin_;
if (AuExchange(this->bIsReadPending, false))
{
// Psyche - it was just the poll half of a blocking read
// We haven't read anything yet.
if (!LinuxOverlappedSubmit(this->fd2, IOCB_CMD_PREAD, this->offset2 /* always zero */, this, this->optEvent2))
{
try
{
LIOS_Process(0, true, 69, false);
}
catch (...)
{
SysPushErrorCatch("IO Callback threw an exception");
}
}
IoTlsRedo();
return;
}
LIOS_Reset();
try
{
LIOS_Process(read, failure, err, mark);
}
catch (...)
{
SysPushErrorCatch("IO Callback threw an exception");
}
}
bool ASubmittable::LIOS_Cancel()
{
return LinuxOverlappedCancel(this);
}
bool LIOS_PopOne()
{
return false;
}
void ASubmittable::LIOS_Init(const AuSPtr<void> &pin)
{
this->pin_ = pin;
}
void ASubmittable::LIOS_Reset()
{
this->pin_.reset();
this->memPin_.reset();
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewRead> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewWrite> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
bool ASubmittable::HasState()
{
return bool(this->pin_);
}
AuUInt64 ASubmittable::GetBuf()
{
return this->dataPtr_;
}
AuUInt ASubmittable::GetBufLength()
{
return this->dataLen_;
}
AuUInt64 ASubmittable::GetData()
{
return AuReinterpretCast<AuUInt64>(this);
}
iocb & ASubmittable::GetIOCB()
{
return this->cb;
}
int ASubmittable::GetOrCreateFdPollForBlockingRead(int fd)
{
#if 1
return fd; // wtf freetards are incapable of writing an os.
// the indirection of fds read state wasn't the problem.
// linux will never be a real kernel
// gnu will never be a real operating system
#else
// TODO (Reece): urgent: is this hack overkill? can we
// just return fd?
if ((this->tempEPoll == 0) ||
(this->tempEPoll == -1))
{
epoll_event event;
if ((this->tempEPoll = ::epoll_create1(0)) == -1)
{
SysPushErrorIO();
return -1;
}
event.events = EPOLLIN;
event.data.ptr = nullptr;
if (::epoll_ctl(this->tempEPoll, EPOLL_CTL_ADD, fd, &event) != 0)
{
SysPushErrorIO();
::close(this->tempEPoll);
this->tempEPoll = -1;
return -1;
}
}
return this->tempEPoll;
#endif
}
//////////////////////////////////////////////////////////////////
// TLS
//////////////////////////////////////////////////////////////////
struct TLSIO
{
bool bInitialized {};
aio_context_t context {};
AuList<iocb*> submitPendingArray;
AuUInt32 dwIoSubmits;
bool bPollHit {};
~TLSIO()
{
Deinit();
}
bool Init();
void Deinit();
};
static thread_local TLSIO tlsLocal;
static TLSIO *GetTls()
{
auto handle = &tlsLocal;
if (!handle->bInitialized)
{
handle->bInitialized = handle->Init();
}
return handle->bInitialized ? handle : nullptr;
}
void IoTlsRedo()
{
auto pTLS = GetTls();
if (!pTLS)
{
return;
}
pTLS->bPollHit = true;
}
//////////////////////////////////////////////////////////////////
// TLS IO IMPL
//////////////////////////////////////////////////////////////////
bool TLSIO::Init()
{
if (io_setup(1024, &this->context) != 0)
{
return false;
}
return true;
}
void TLSIO::Deinit()
{
io_destroy(this->context);
}
//////////////////////////////////////////////////////////////////
// GLUE
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedTrySubmitWorkInternal(TLSIO *io)
{
int index {};
int startingLength = io->submitPendingArray.size();
while (index != startingLength)
{
int ret = io_submit(io->context, io->submitPendingArray.size() - index, io->submitPendingArray.data() + index);
if (ret <= 0)
{
switch (ret)
{
case (-EAGAIN):
{
SysPushErrorIO("Insufficient resources are available to queue any iocbs");
goto err;
}
case (-ENOSYS):
{
SysPushErrorIO("io_submit() is not implemented on this architecture");
goto err;
}
case (-EINVAL):
{
SysPushErrorIO("io submit context is dead");
goto err;
}
case (-EPERM):
{
SysPanic("IOPRIO_CLASS_RT was requested but not granted to non-CAP_SYS_ADMIN (root?) process");
}
case (-EFAULT):
{
SysPanic("One of the data structures points to invalid data.");
}
case (-EBADF):
{
auto submittable = AuReinterpretCast<ASubmittable*>(io->submitPendingArray[index]->aio_data);
if (submittable)
{
submittable->LIOS_SendProcess(0, true, EBADF);
}
}
default:
SysPanic("unhandled case");
}
}
else
{
io->dwIoSubmits += ret;
}
index += ret;
}
if (io->submitPendingArray.size() == index)
{
AuTryClear(io->submitPendingArray);
}
else
{
io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index);
}
return true;
err:
if (index)
{
io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index);
return true;
}
else
{
return false;
}
}
static bool LinuxOverlappedTrySubmitWork()
{
auto io = GetTls();
if (!io)
{
return false;
}
return LinuxOverlappedTrySubmitWorkInternal(io);
}
//////////////////////////////////////////////////////////////////
// OVERLAPPED API
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
auto io = GetTls();
if (!io)
{
return false;
}
context->abortContext = io->context;
iocb &submit = context->GetIOCB();
submit.aio_data = context->GetData();
submit.aio_lio_opcode = op;
submit.aio_reqprio = 0;
submit.aio_fildes = fd;
submit.aio_offset = offset;
submit.aio_buf = context->GetBuf();
submit.aio_nbytes = context->GetBufLength();
if (optEvent)
{
submit.aio_resfd = AuStaticCast<AuLoop::LSEvent>(optEvent)->GetHandle();
submit.aio_flags |= IOCB_FLAG_RESFD;
}
return AuTryInsert(io->submitPendingArray, &submit);
}
static bool LinuxOverlappedCancel(ASubmittable *context)
{
AuSPtr<void> pinSelf, pinMem;
io_event result;
pinSelf = context->pin_;
pinMem = context->memPin_;
if (!pinSelf)
{
return true;
}
auto io = GetTls();
if (!io)
{
return false;
}
iocb &submit = context->GetIOCB();
if (io_cancel(context->abortContext, &submit, &result) != 0)
{
if (errno == EAGAIN) // The iocb specified was not canceled
{
return false;
}
else // some other garbage state
{
context->LIOS_Reset();
return true;
}
}
auto iErrNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (result.res < 0)
{
iErrNo = 0 - result.res;
bError = true;
}
else
{
bytesTransacted = result.res;
}
context->LIOS_Reset();
// if we're going out of our way to somewhat support cross-thread aborts, this doesnt matter
// let the higher level api filter the abort/reset call
// we're going to be more sorry if we miss a reset/proces-death/eos callback than we allow the api to perform cross-thread aborts
#if 0
if (context->abortContext == io->context)
{
#endif
context->LIOS_SendProcess(bytesTransacted, bError, iErrNo);
#if 0
}
#endif
return true;
}
static bool LinuxOverlappedReadWait(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
auto io = GetTls();
if (!io)
{
return false;
}
SysAssert(offset == 0, "Read-blocking IO streams only");
context->bIsReadPending = true;
context->offset2 = offset; // redundant - always zero
context->fd2 = fd;
context->optEvent2 = optEvent;
auto &submit = context->GetIOCB();
submit.aio_data = context->GetData();
submit.aio_lio_opcode = IOCB_CMD_POLL;
submit.aio_reqprio = 0;
submit.aio_fildes = context->GetOrCreateFdPollForBlockingRead(fd);
submit.aio_offset = 0;
submit.aio_buf = POLLIN;
submit.aio_nbytes = 0;
if (submit.aio_fildes == -1)
{
SysPushErrorIO();
return false;
}
return AuTryInsert(io->submitPendingArray, &submit);
}
bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent, bool bWaitForRead)
{
if (bWaitForRead)
{
return LinuxOverlappedReadWait(fd, offset, context, optEvent);
}
return LinuxOverlappedSubmit(fd, IOCB_CMD_PREAD, offset, context, optEvent);
}
bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
return LinuxOverlappedSubmit(fd, IOCB_CMD_PWRITE, offset, context, optEvent);
}
bool LinuxOverlappedPoll(AuUInt32 timeout)
{
timespec targetTime;
io_event ioEvents[512];
int temp;
int dwApcsSent {};
auto io = GetTls();
if (!io)
{
return false;
}
AuUInt64 uStartTime {};
AuUInt64 uTargetTime {};
if (timeout)
{
uStartTime = AuTime::SteadyClockNS();
uTargetTime = uStartTime + AuMSToNS<AuUInt64>(timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
bool bAgain {};
do
{
if (timeout)
{
AuUInt64 uTimeNow = AuTime::SteadyClockNS();
if (uTargetTime <= uTimeNow)
{
return bool(dwApcsSent);
}
AuTime::ns2ts(&targetTime, uTimeNow - uTargetTime);
}
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res < 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
dwApcsSent++;
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
bAgain = AuExchange(io->bPollHit, false);
if (bAgain)
{
(void)LinuxOverlappedTrySubmitWork();
}
} while (timeout || AuExchange(bAgain, false));
return dwApcsSent;
}
bool SendIOBuffers()
{
return LinuxOverlappedTrySubmitWork();
}
int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events,
int maxevents, int timeout)
{
timespec targetTime;
auto io = GetTls();
if (!io)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
AuUInt64 uStartTime {};
AuUInt64 uTargetTime {};
if (timeout)
{
uStartTime = AuTime::SteadyClockNS();
uTargetTime = uStartTime + AuMSToNS<AuUInt64>(timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
if (!io->dwIoSubmits)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
// evil nested epoll garbage
// but i'm sure the kevent way would be more inefficient
iocb pollSubmit {};
pollSubmit.aio_fildes = epfd;
pollSubmit.aio_lio_opcode = IOCB_CMD_POLL;
pollSubmit.aio_buf = POLLIN;
iocb * ptr = &pollSubmit;
iocb ** ptrArray = &ptr;
if (io_submit(io->context, 1, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
bool bEpollTriggered {};
io_event ioEvents[512];
int temp {};
bool bAgain {};
do
{
if (timeout)
{
AuUInt64 uTimeNow = AuTime::SteadyClockNS();
if (uTargetTime <= uTimeNow)
{
errno = EINTR;
return -1;
}
AuTime::ns2ts(&targetTime, uTimeNow - uTargetTime);
}
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
if (!handle)
{
// loop source has message
bEpollTriggered = true;
}
else
{
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res < 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
bAgain = AuExchange(io->bPollHit, false);
if (bAgain)
{
(void)LinuxOverlappedTrySubmitWork();
}
} while ((timeout ? !bEpollTriggered : false) || AuExchange(bAgain, false));
io_event finalEpollEvent {};
if ((bEpollTriggered) ||
(io_cancel(io->context, ptr, &finalEpollEvent) == 0))
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
SysPushErrorIO();
// I don't
// I do again
}
if (bEpollTriggered)
{
return epoll_wait(epfd, events, maxevents, 0);
}
else
{
return 0;
}
}
bool LinuxOverlappedWaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered)
{
timespec targetTime;
auto io = GetTls();
bWriteTriggered = false;
bReadTriggered = false;
if (!io)
{
return false;
}
AuUInt64 uStartTime {};
AuUInt64 uTargetTime {};
if (timeout)
{
uStartTime = AuTime::SteadyClockNS();
uTargetTime = uStartTime + AuMSToNS<AuUInt64>(timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return false;
}
if (!io->dwIoSubmits)
{
return false;
}
iocb submit {};
iocb readSubmit {};
iocb writeSubmit {};
submit.aio_lio_opcode = IOCB_CMD_POLL;
iocb * ptrArray[2];
int iocbIdx {};
if (read != -1)
{
readSubmit = submit;
readSubmit.aio_fildes = read;
readSubmit.aio_data = 1;
readSubmit.aio_buf = POLLIN;
ptrArray[iocbIdx++] = &readSubmit;
}
if (write != -1)
{
writeSubmit = submit;
writeSubmit.aio_fildes = write;
writeSubmit.aio_data = 2;
writeSubmit.aio_buf = POLLOUT;
ptrArray[iocbIdx++] = &writeSubmit;
}
if (io_submit(io->context, iocbIdx, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
io_event ioEvents[512];
int temp;
do
{
if (timeout)
{
AuUInt64 uTimeNow = AuTime::SteadyClockNS();
if (uTargetTime <= uTimeNow)
{
errno = EINTR;
return false;
}
AuTime::ns2ts(&targetTime, uTimeNow - uTargetTime);
}
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
if (ioEvents[i].data == 1)
{
bReadTriggered = true;
}
else if (ioEvents[i].data == 2)
{
bWriteTriggered = true;
}
else
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res < 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout ? (!bReadTriggered && !bWriteTriggered) : false);
for (int i = 0; i < iocbIdx; i++)
{
io_event finalEpollEvent {};
if (i == 0)
{
if ((bReadTriggered) ||
((read == -1) && (bWriteTriggered)))
{
continue;
}
}
else if ((i == 1) && (bWriteTriggered))
{
continue;
}
if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) == 0)
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
SysPushErrorIO();
// I don't
// I do again
}
}
return true;
}
bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
{
// TODO:
return false;
}
bool LinuxOverlappedYield()
{
io_event ioEvents[512];
int temp;
int iTicks {};
timespec targetTime {};
auto io = GetTls();
if (!io)
{
return false;
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return false;
}
if (!io->dwIoSubmits)
{
return false;
}
temp = io_getevents(io->context, 1, 512, ioEvents, &targetTime);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res < 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
iTicks++;
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
// Assume signal is also a valid yield
}
if (err == EINVAL)
{
SysPushErrorArg();
return false;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
return iTicks;
}
}