AuroraRuntime/Source/IO/UNIX/IOSubmit.Linux.cpp

755 lines
19 KiB
C++
Raw Normal View History

2022-04-13 11:00:35 +00:00
/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IOSubmit.Linux.cpp
Date: 2022-4-12
Author: Reece
Note: I want APCs, overlapped, and event based IO on Linux. Gib.
***/
#include <Source/RuntimeInternal.hpp>
#include "UnixIO.hpp"
#include <linux/aio_abi.h>
#include <sys/syscall.h>
#include <sys/epoll.h>
#include <poll.h>
#include <arpa/inet.h>
#include "IOSubmit.Linux.hpp"
#include <Source/Time/Time.hpp>
#include <Source/Loop/Loop.hpp>
#include <Source/Loop/LSEvent.hpp>
static int io_setup(unsigned nr, aio_context_t *ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
static int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
static int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
#if 0
static int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events,
struct timespec *timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
#endif
#define read_barrier() __asm__ __volatile__("lfence" ::: "memory")
#define AIO_RING_MAGIC 0xa10a10a1
struct aio_ring {
unsigned id; /** kernel internal index number */
unsigned nr; /** number of io_events */
unsigned head;
unsigned tail;
unsigned magic;
unsigned compat_features;
unsigned incompat_features;
unsigned header_length; /** size of aio_ring */
struct io_event events[0];
};
/* Code based on axboe/fio:
* https://github.com/axboe/fio/blob/702906e9e3e03e9836421d5e5b5eaae3cd99d398/engines/libaio.c#L149-L172
*/
static int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events,
struct timespec *timeout)
{
int i = 0;
struct aio_ring *ring = (struct aio_ring *)ctx;
if (ring == NULL || ring->magic != AIO_RING_MAGIC) {
goto do_syscall;
}
while (i < max_nr) {
unsigned head = ring->head;
if (head == ring->tail) {
/* There are no more completions */
break;
} else {
/* There is another completion to reap */
events[i] = ring->events[head];
read_barrier();
ring->head = (head + 1) % ring->nr;
i++;
}
}
if (i == 0 && timeout != NULL && timeout->tv_sec == 0 &&
timeout->tv_nsec == 0) {
/* Requested non blocking operation. */
return 0;
}
if (i && i >= min_nr) {
return i;
}
do_syscall:
return syscall(__NR_io_getevents, ctx, min_nr - i, max_nr - i,
&events[i], timeout) + i;
}
static int io_cancel(aio_context_t ctx_id, struct iocb *iocb,
struct io_event *result)
{
return syscall(SYS_io_cancel, ctx_id, iocb, result);
}
namespace Aurora::IO::UNIX
{
//////////////////////////////////////////////////////////////////
// ASubmittable
//////////////////////////////////////////////////////////////////
void ASubmittable::LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark)
{
// Allow for reuse by releasing before dispatch
// ...holding "this" ownership with a copy of the ref counter on the stack
AuSPtr<void> pinSelf, pinMem;
pinSelf = this->pin_;
pinMem = this->memPin_;
LIOS_Reset();
LIOS_Process(read, failure, err, mark);
}
void ASubmittable::LIOS_Init(const AuSPtr<void> &pin)
{
this->pin_ = pin;
}
void ASubmittable::LIOS_Reset()
{
this->pin_.reset();
this->memPin_.reset();
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewRead> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
void ASubmittable::SetMemory(const AuSPtr<AuMemoryViewWrite> &view)
{
this->dataPtr_ = AuReinterpretCast<AuUInt64>(view->ptr);
this->dataLen_ = view->length;
this->memPin_ = view;
}
bool ASubmittable::HasState()
{
return bool(this->pin_);
}
AuUInt64 ASubmittable::GetBuf()
{
return this->dataPtr_;
}
AuUInt ASubmittable::GetBufLength()
{
return this->dataLen_;
}
AuUInt64 ASubmittable::GetData()
{
return AuReinterpretCast<AuUInt64>(this);
}
iocb & ASubmittable::GetIOCB()
{
return this->cb;
}
//////////////////////////////////////////////////////////////////
// TLS
//////////////////////////////////////////////////////////////////
struct TLSIO
{
bool bInitialized {};
aio_context_t context {};
AuList<iocb*> submitPendingArray;
AuUInt32 dwIoSubmits;
~TLSIO()
{
Deinit();
}
bool Init();
void Deinit();
};
static thread_local TLSIO tlsLocal;
static TLSIO *GetTls()
{
auto handle = &tlsLocal;
if (!handle->bInitialized)
{
handle->bInitialized = handle->Init();
}
return handle->bInitialized ? handle : nullptr;
}
//////////////////////////////////////////////////////////////////
// TLS IO IMPL
//////////////////////////////////////////////////////////////////
bool TLSIO::Init()
{
if (io_setup(1024, &this->context) != 0)
{
return false;
}
return true;
}
void TLSIO::Deinit()
{
io_destroy(this->context);
}
//////////////////////////////////////////////////////////////////
// GLUE
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedTrySubmitWorkInternal(TLSIO *io)
{
int index = {};
int startingLength = io->submitPendingArray.size();
while (index != startingLength)
{
int ret = io_submit(io->context, io->submitPendingArray.size(), io->submitPendingArray.data() + index);
if (ret <= 0)
{
switch (ret)
{
case (-EAGAIN):
{
SysPushErrorIO("Insufficient resources are available to queue any iocbs");
goto err;
}
case (-ENOSYS):
{
SysPushErrorIO("io_submit() is not implemented on this architecture");
goto err;
}
case (-EINVAL):
{
SysPushErrorIO("io submit context is dead");
goto err;
}
case (-EPERM):
{
SysPanic("IOPRIO_CLASS_RT was requested but not granted to non-CAP_SYS_ADMIN (root?) process");
}
case (-EFAULT):
{
SysPanic("One of the data structures points to invalid data.");
}
case (-EBADF):
{
auto submittable = AuReinterpretCast<ASubmittable*>(io->submitPendingArray[index]->aio_data);
if (submittable)
{
submittable->LIOS_SendProcess(0, true, EBADF);
index ++;
}
}
default:
SysPanic("unhandled case");
}
}
else
{
index += ret;
io->dwIoSubmits += ret;
}
}
AuTryClear(io->submitPendingArray);
return true;
err:
if (index)
{
io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index);
return true;
}
else
{
return false;
}
}
static bool LinuxOverlappedTrySubmitWork()
{
auto io = GetTls();
if (!io)
{
return false;
}
return LinuxOverlappedTrySubmitWorkInternal(io);
}
//////////////////////////////////////////////////////////////////
// OVERLAPPED API
//////////////////////////////////////////////////////////////////
static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
auto io = GetTls();
if (!io)
{
return false;
}
iocb &submit = context->GetIOCB();
submit.aio_data = context->GetData();
submit.aio_lio_opcode = op;
submit.aio_reqprio = 0;
submit.aio_fildes = fd;
submit.aio_offset = offset;
submit.aio_buf = context->GetBuf();
submit.aio_nbytes = context->GetBufLength();
if (optEvent)
{
submit.aio_resfd = AuStaticCast<AuLoop::LSEvent>(optEvent)->GetHandle();
submit.aio_flags |= IOCB_FLAG_RESFD;
}
return AuTryInsert(io->submitPendingArray, &submit);
}
bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
return LinuxOverlappedSubmit(fd, IOCB_CMD_PREAD, offset, context, optEvent);
}
bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent)
{
return LinuxOverlappedSubmit(fd, IOCB_CMD_PWRITE, offset, context, optEvent);
}
bool LinuxOverlappedPoll(AuUInt32 timeout)
{
timespec targetTime;
io_event ioEvents[512];
int temp;
int dwApcsSent {};
auto io = GetTls();
if (!io)
{
return false;
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
dwApcsSent++;
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout);
return dwApcsSent;
}
bool SendIOBuffers()
{
return LinuxOverlappedTrySubmitWork();
}
int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events,
int maxevents, int timeout)
{
timespec targetTime;
auto io = GetTls();
if (!io)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return -1;
}
if (!io->dwIoSubmits)
{
return epoll_wait(epfd, events, maxevents, timeout);
}
// evil nested epoll garbage
// but i'm sure the kevent way would be more inefficient
iocb pollSubmit {};
pollSubmit.aio_fildes = epfd;
pollSubmit.aio_lio_opcode = IOCB_CMD_POLL;
pollSubmit.aio_buf = POLLIN;
iocb * ptr = &pollSubmit;
iocb ** ptrArray = &ptr;
if (io_submit(io->context, 1, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
bool bEpollTriggered {};
io_event ioEvents[512];
int temp;
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
if (!handle)
{
// loop source has message
bEpollTriggered = true;
}
else
{
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout ? !bEpollTriggered : false);
io_event finalEpollEvent;
if (io_cancel(io->context, ptr, &finalEpollEvent) == 0)
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
//SysPushErrorIO();
// I don't
}
if (bEpollTriggered)
{
return epoll_wait(epfd, events, maxevents, 0);
}
else
{
return 0;
}
}
bool LinuxOverlappedWaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered)
{
timespec targetTime;
auto io = GetTls();
bWriteTriggered = false;
bReadTriggered = false;
if (!io)
{
return false;
}
if (timeout)
{
AuTime::ms2tsabs(&targetTime, timeout);
}
if (!LinuxOverlappedTrySubmitWork())
{
SysPushErrorIO();
errno = EBADF;
return false;
}
if (!io->dwIoSubmits)
{
return false;
}
iocb submit {};
iocb readSubmit {};
iocb writeSubmit {};
submit.aio_lio_opcode = IOCB_CMD_POLL;
iocb * ptrArray[2];
int iocbIdx {};
if (read != -1)
{
readSubmit = submit;
readSubmit.aio_fildes = read;
readSubmit.aio_data = 1;
readSubmit.aio_buf = POLLIN;
ptrArray[iocbIdx++] = &readSubmit;
}
if (write != -1)
{
writeSubmit = submit;
writeSubmit.aio_fildes = write;
writeSubmit.aio_data = 2;
writeSubmit.aio_buf = POLLOUT;
ptrArray[iocbIdx++] = &writeSubmit;
}
if (io_submit(io->context, iocbIdx, ptrArray) <= 0)
{
errno = EFAULT;
SysPushErrorIO();
return -1;
}
io_event ioEvents[512];
int temp;
do
{
temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr);
if (temp >= 0)
{
for (AU_ITERATE_N(i, temp))
{
if (ioEvents[i].data == 1)
{
bReadTriggered = true;
}
else if (ioEvents[i].data == 2)
{
bWriteTriggered = true;
}
else
{
auto handle = AuReinterpretCast<ASubmittable *>(ioEvents[i].data);
io->dwIoSubmits--;
auto errNo = 0;
bool bError {};
AuUInt bytesTransacted {};
if (ioEvents[i].res <= 0)
{
errNo = 0 - ioEvents[i].res;
bError = true;
}
else
{
bytesTransacted = ioEvents[i].res;
}
handle->LIOS_SendProcess(bytesTransacted, bError, errNo);
}
}
}
else
{
auto err = 0 - temp;
if (err == EINTR)
{
errno = EINTR;
break;
}
if (err == EINVAL)
{
SysPushErrorArg();
break;
}
if (err == EFAULT)
{
SysPanic("Either events or timeout is an invalid pointer.");
}
if (err == ENOSYS)
{
SysPanic("io_getevents() is not implemented on this architecture.");
}
}
} while (timeout ? (!bReadTriggered && !bWriteTriggered) : false);
for (int i = 0; i < iocbIdx; i++)
{
io_event finalEpollEvent;
if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) == 0)
{
// TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true?
}
else
{
// Do I care?
//SysPushErrorIO();
// I don't
}
}
return true;
}
bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList<AuUInt> &handles, const AuList<AuUInt> &handlesWrite, AuUInt &one, AuUInt &two)
{
// TODO:
return false;
}
}