/*** Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: IOSubmit.Linux.cpp Date: 2022-4-12 Author: Reece Note: I want APCs, overlapped, and event based IO on Linux. Gib. ***/ #include #include "UnixIO.hpp" #include #include #include #include #include #include "IOSubmit.Linux.hpp" #include #include #include static int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } static int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } static int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } #if 0 static int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } #endif #define read_barrier() __asm__ __volatile__("lfence" ::: "memory") #define AIO_RING_MAGIC 0xa10a10a1 struct aio_ring { unsigned id; /** kernel internal index number */ unsigned nr; /** number of io_events */ unsigned head; unsigned tail; unsigned magic; unsigned compat_features; unsigned incompat_features; unsigned header_length; /** size of aio_ring */ struct io_event events[0]; }; /* Code based on axboe/fio: * https://github.com/axboe/fio/blob/702906e9e3e03e9836421d5e5b5eaae3cd99d398/engines/libaio.c#L149-L172 */ static int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { int i = 0; struct aio_ring *ring = (struct aio_ring *)ctx; if (ring == NULL || ring->magic != AIO_RING_MAGIC) { goto do_syscall; } while (i < max_nr) { unsigned head = ring->head; if (head == ring->tail) { /* There are no more completions */ break; } else { /* There is another completion to reap */ events[i] = ring->events[head]; read_barrier(); ring->head = (head + 1) % ring->nr; i++; } } if (i == 0 && timeout != NULL && timeout->tv_sec == 0 && timeout->tv_nsec == 0) { /* Requested non blocking operation. */ return 0; } if (i && i >= min_nr) { return i; } do_syscall: return syscall(__NR_io_getevents, ctx, min_nr - i, max_nr - i, &events[i], timeout) + i; } static int io_cancel(aio_context_t ctx_id, struct iocb *iocb, struct io_event *result) { return syscall(SYS_io_cancel, ctx_id, iocb, result); } namespace Aurora::IO::UNIX { static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent); ////////////////////////////////////////////////////////////////// // ASubmittable ////////////////////////////////////////////////////////////////// ASubmittable::ASubmittable() { } ASubmittable::~ASubmittable() { if ((this->tempEPoll != 0) && (this->tempEPoll != -1)) { ::close(AuExchange(this->tempEPoll, -1)); } } void ASubmittable::LIOS_SendProcess(AuUInt32 read, bool failure, int err, bool mark) { // Allow for reuse by releasing before dispatch // ...holding "this" ownership with a copy of the ref counter on the stack AuSPtr pinSelf, pinMem; pinSelf = this->pin_; pinMem = this->memPin_; LIOS_Reset(); if (AuExchange(this->bIsReadPending, false)) { // Psyche - it was just the poll half of a blocking read // We haven't read anything yet. if (!LinuxOverlappedSubmit(this->fd2, IOCB_CMD_PREAD, this->offset2 /* always zero */, this, this->optEvent2)) { try { LIOS_Process(0, true, 69, false); } catch (...) { SysPushErrorCatch("IO Callback threw an exception"); } } return; } try { LIOS_Process(read, failure, err, mark); } catch (...) { SysPushErrorCatch("IO Callback threw an exception"); } } bool LIOS_PopOne() { return false; } void ASubmittable::LIOS_Init(const AuSPtr &pin) { this->pin_ = pin; } void ASubmittable::LIOS_Reset() { this->pin_.reset(); this->memPin_.reset(); } void ASubmittable::SetMemory(const AuSPtr &view) { this->dataPtr_ = AuReinterpretCast(view->ptr); this->dataLen_ = view->length; this->memPin_ = view; } void ASubmittable::SetMemory(const AuSPtr &view) { this->dataPtr_ = AuReinterpretCast(view->ptr); this->dataLen_ = view->length; this->memPin_ = view; } bool ASubmittable::HasState() { return bool(this->pin_); } AuUInt64 ASubmittable::GetBuf() { return this->dataPtr_; } AuUInt ASubmittable::GetBufLength() { return this->dataLen_; } AuUInt64 ASubmittable::GetData() { return AuReinterpretCast(this); } iocb & ASubmittable::GetIOCB() { return this->cb; } int ASubmittable::GetOrCreateFdPollForBlockingRead(int fd) { #if 1 return fd; // wtf freetards are incapable of writing an os. // the indirection of fds read state wasn't the problem. // linux will never be a real kernel // gnu will never be a real operating system #else // TODO (Reece): urgent: is this hack overkill? can we // just return fd? if ((this->tempEPoll == 0) || (this->tempEPoll == -1)) { epoll_event event; if ((this->tempEPoll = ::epoll_create1(0)) == -1) { SysPushErrorIO(); return -1; } event.events = EPOLLIN; event.data.ptr = nullptr; if (::epoll_ctl(this->tempEPoll, EPOLL_CTL_ADD, fd, &event) != 0) { SysPushErrorIO(); ::close(this->tempEPoll); this->tempEPoll = -1; return -1; } } return this->tempEPoll; #endif } ////////////////////////////////////////////////////////////////// // TLS ////////////////////////////////////////////////////////////////// struct TLSIO { bool bInitialized {}; aio_context_t context {}; AuList submitPendingArray; AuUInt32 dwIoSubmits; ~TLSIO() { Deinit(); } bool Init(); void Deinit(); }; static thread_local TLSIO tlsLocal; static TLSIO *GetTls() { auto handle = &tlsLocal; if (!handle->bInitialized) { handle->bInitialized = handle->Init(); } return handle->bInitialized ? handle : nullptr; } ////////////////////////////////////////////////////////////////// // TLS IO IMPL ////////////////////////////////////////////////////////////////// bool TLSIO::Init() { if (io_setup(1024, &this->context) != 0) { return false; } return true; } void TLSIO::Deinit() { io_destroy(this->context); } ////////////////////////////////////////////////////////////////// // GLUE ////////////////////////////////////////////////////////////////// static bool LinuxOverlappedTrySubmitWorkInternal(TLSIO *io) { int index {}; int startingLength = io->submitPendingArray.size(); while (index != startingLength) { int ret = io_submit(io->context, io->submitPendingArray.size() - index, io->submitPendingArray.data() + index); if (ret <= 0) { switch (ret) { case (-EAGAIN): { SysPushErrorIO("Insufficient resources are available to queue any iocbs"); goto err; } case (-ENOSYS): { SysPushErrorIO("io_submit() is not implemented on this architecture"); goto err; } case (-EINVAL): { SysPushErrorIO("io submit context is dead"); goto err; } case (-EPERM): { SysPanic("IOPRIO_CLASS_RT was requested but not granted to non-CAP_SYS_ADMIN (root?) process"); } case (-EFAULT): { SysPanic("One of the data structures points to invalid data."); } case (-EBADF): { auto submittable = AuReinterpretCast(io->submitPendingArray[index]->aio_data); if (submittable) { submittable->LIOS_SendProcess(0, true, EBADF); } } default: SysPanic("unhandled case"); } } else { io->dwIoSubmits += ret; } index += ret; } if (io->submitPendingArray.size() == index) { AuTryClear(io->submitPendingArray); } else { io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index); } return true; err: if (index) { io->submitPendingArray.erase(io->submitPendingArray.begin(), io->submitPendingArray.begin() + index); return true; } else { return false; } } static bool LinuxOverlappedTrySubmitWork() { auto io = GetTls(); if (!io) { return false; } return LinuxOverlappedTrySubmitWorkInternal(io); } ////////////////////////////////////////////////////////////////// // OVERLAPPED API ////////////////////////////////////////////////////////////////// static bool LinuxOverlappedSubmit(int fd, int op, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent) { auto io = GetTls(); if (!io) { return false; } iocb &submit = context->GetIOCB(); submit.aio_data = context->GetData(); submit.aio_lio_opcode = op; submit.aio_reqprio = 0; submit.aio_fildes = fd; submit.aio_offset = offset; submit.aio_buf = context->GetBuf(); submit.aio_nbytes = context->GetBufLength(); if (optEvent) { submit.aio_resfd = AuStaticCast(optEvent)->GetHandle(); submit.aio_flags |= IOCB_FLAG_RESFD; } return AuTryInsert(io->submitPendingArray, &submit); } static bool LinuxOverlappedReadWait(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent) { auto io = GetTls(); if (!io) { return false; } SysAssert(offset == 0, "Read-blocking IO streams only"); context->bIsReadPending = true; context->offset2 = offset; // redundant - always zero context->fd2 = fd; context->optEvent2 = optEvent; auto &submit = context->GetIOCB(); submit.aio_data = context->GetData(); submit.aio_lio_opcode = IOCB_CMD_POLL; submit.aio_reqprio = 0; submit.aio_fildes = context->GetOrCreateFdPollForBlockingRead(fd); submit.aio_offset = 0; submit.aio_buf = POLLIN; submit.aio_nbytes = 0; if (submit.aio_fildes == -1) { SysPushErrorIO(); return false; } return AuTryInsert(io->submitPendingArray, &submit); } bool LinuxOverlappedSubmitRead(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent, bool bWaitForRead) { if (bWaitForRead) { return LinuxOverlappedReadWait(fd, offset, context, optEvent); } return LinuxOverlappedSubmit(fd, IOCB_CMD_PREAD, offset, context, optEvent); } bool LinuxOverlappedSubmitWrite(int fd, AuUInt offset, ASubmittable *context, AuLoop::ILSEvent *optEvent) { return LinuxOverlappedSubmit(fd, IOCB_CMD_PWRITE, offset, context, optEvent); } bool LinuxOverlappedPoll(AuUInt32 timeout) { timespec targetTime; io_event ioEvents[512]; int temp; int dwApcsSent {}; auto io = GetTls(); if (!io) { return false; } AuInt64 iStartTime {}; AuInt64 iTargetTime {}; if (timeout) { iStartTime = AuTime::CurrentClockSteadyMS(); iTargetTime = iStartTime + timeout; } if (!LinuxOverlappedTrySubmitWork()) { SysPushErrorIO(); errno = EBADF; return -1; } do { if (timeout) { auto delta = iTargetTime - AuTime::CurrentClockSteadyMS(); if (delta <= 0) { return dwApcsSent; } AuTime::ns2ts(&targetTime, delta); } temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr); if (temp >= 0) { for (AU_ITERATE_N(i, temp)) { auto handle = AuReinterpretCast(ioEvents[i].data); io->dwIoSubmits--; auto errNo = 0; bool bError {}; AuUInt bytesTransacted {}; if (ioEvents[i].res < 0) { errNo = 0 - ioEvents[i].res; bError = true; } else { bytesTransacted = ioEvents[i].res; } handle->LIOS_SendProcess(bytesTransacted, bError, errNo); dwApcsSent++; } } else { auto err = 0 - temp; if (err == EINTR) { errno = EINTR; break; } if (err == EINVAL) { SysPushErrorArg(); break; } if (err == EFAULT) { SysPanic("Either events or timeout is an invalid pointer."); } if (err == ENOSYS) { SysPanic("io_getevents() is not implemented on this architecture."); } } } while (timeout); return dwApcsSent; } bool SendIOBuffers() { return LinuxOverlappedTrySubmitWork(); } int LinuxOverlappedEpollShim(int epfd, struct epoll_event *events, int maxevents, int timeout) { timespec targetTime; auto io = GetTls(); if (!io) { return epoll_wait(epfd, events, maxevents, timeout); } AuInt64 iStartTime {}; AuInt64 iTargetTime {}; if (timeout) { iStartTime = AuTime::CurrentClockSteadyMS(); iTargetTime = iStartTime + timeout; } if (!LinuxOverlappedTrySubmitWork()) { SysPushErrorIO(); errno = EBADF; return -1; } if (!io->dwIoSubmits) { return epoll_wait(epfd, events, maxevents, timeout); } // evil nested epoll garbage // but i'm sure the kevent way would be more inefficient iocb pollSubmit {}; pollSubmit.aio_fildes = epfd; pollSubmit.aio_lio_opcode = IOCB_CMD_POLL; pollSubmit.aio_buf = POLLIN; iocb * ptr = &pollSubmit; iocb ** ptrArray = &ptr; if (io_submit(io->context, 1, ptrArray) <= 0) { errno = EFAULT; SysPushErrorIO(); return -1; } bool bEpollTriggered {}; io_event ioEvents[512]; int temp; do { if (timeout) { auto delta = iTargetTime - AuTime::CurrentClockSteadyMS(); if (delta <= 0) { return dwApcsSent; } AuTime::ns2ts(&targetTime, delta); } temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr); if (temp >= 0) { for (AU_ITERATE_N(i, temp)) { auto handle = AuReinterpretCast(ioEvents[i].data); if (!handle) { // loop source has message bEpollTriggered = true; } else { io->dwIoSubmits--; auto errNo = 0; bool bError {}; AuUInt bytesTransacted {}; if (ioEvents[i].res < 0) { errNo = 0 - ioEvents[i].res; bError = true; } else { bytesTransacted = ioEvents[i].res; } handle->LIOS_SendProcess(bytesTransacted, bError, errNo); } } } else { auto err = 0 - temp; if (err == EINTR) { errno = EINTR; break; } if (err == EINVAL) { SysPushErrorArg(); break; } if (err == EFAULT) { SysPanic("Either events or timeout is an invalid pointer."); } if (err == ENOSYS) { SysPanic("io_getevents() is not implemented on this architecture."); } } } while (timeout ? !bEpollTriggered : false); io_event finalEpollEvent {}; if ((bEpollTriggered) || (io_cancel(io->context, ptr, &finalEpollEvent) == 0)) { // TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true? } else { // Do I care? SysPushErrorIO(); // I don't // I do again } if (bEpollTriggered) { return epoll_wait(epfd, events, maxevents, 0); } else { return 0; } } bool LinuxOverlappedWaitForOne(AuUInt32 timeout, AuUInt read, AuUInt write, bool &bReadTriggered, bool &bWriteTriggered) { timespec targetTime; auto io = GetTls(); bWriteTriggered = false; bReadTriggered = false; if (!io) { return false; } AuInt64 iStartTime {}; AuInt64 iTargetTime {}; if (timeout) { iStartTime = AuTime::CurrentClockSteadyMS(); iTargetTime = iStartTime + timeout; } if (!LinuxOverlappedTrySubmitWork()) { SysPushErrorIO(); errno = EBADF; return false; } if (!io->dwIoSubmits) { return false; } iocb submit {}; iocb readSubmit {}; iocb writeSubmit {}; submit.aio_lio_opcode = IOCB_CMD_POLL; iocb * ptrArray[2]; int iocbIdx {}; if (read != -1) { readSubmit = submit; readSubmit.aio_fildes = read; readSubmit.aio_data = 1; readSubmit.aio_buf = POLLIN; ptrArray[iocbIdx++] = &readSubmit; } if (write != -1) { writeSubmit = submit; writeSubmit.aio_fildes = write; writeSubmit.aio_data = 2; writeSubmit.aio_buf = POLLOUT; ptrArray[iocbIdx++] = &writeSubmit; } if (io_submit(io->context, iocbIdx, ptrArray) <= 0) { errno = EFAULT; SysPushErrorIO(); return -1; } io_event ioEvents[512]; int temp; do { if (timeout) { auto delta = iTargetTime - AuTime::CurrentClockSteadyMS(); if (delta <= 0) { return dwApcsSent; } AuTime::ns2ts(&targetTime, delta); } temp = io_getevents(io->context, 1, 512, ioEvents, timeout ? &targetTime : nullptr); if (temp >= 0) { for (AU_ITERATE_N(i, temp)) { if (ioEvents[i].data == 1) { bReadTriggered = true; } else if (ioEvents[i].data == 2) { bWriteTriggered = true; } else { auto handle = AuReinterpretCast(ioEvents[i].data); io->dwIoSubmits--; auto errNo = 0; bool bError {}; AuUInt bytesTransacted {}; if (ioEvents[i].res < 0) { errNo = 0 - ioEvents[i].res; bError = true; } else { bytesTransacted = ioEvents[i].res; } handle->LIOS_SendProcess(bytesTransacted, bError, errNo); } } } else { auto err = 0 - temp; if (err == EINTR) { errno = EINTR; break; } if (err == EINVAL) { SysPushErrorArg(); break; } if (err == EFAULT) { SysPanic("Either events or timeout is an invalid pointer."); } if (err == ENOSYS) { SysPanic("io_getevents() is not implemented on this architecture."); } } } while (timeout ? (!bReadTriggered && !bWriteTriggered) : false); for (int i = 0; i < iocbIdx; i++) { io_event finalEpollEvent {}; if (i == 0) { if ((bReadTriggered) || ((read == -1) && (bWriteTriggered))) { continue; } } else if ((i == 1) && (bWriteTriggered)) { continue; } if (io_cancel(io->context, ptrArray[i], &finalEpollEvent) == 0) { // TODO (Reece): if finalEpollEvent.res == 1, bEpollTriggered = true? } else { // Do I care? SysPushErrorIO(); // I don't // I do again } } return true; } bool LinuxOverlappedWaitForAtleastOne(AuUInt32 timeout, const AuList &handles, const AuList &handlesWrite, AuUInt &one, AuUInt &two) { // TODO: return false; } bool LinuxOverlappedYield() { io_event ioEvents[512]; int temp; timespec targetTime {}; auto io = GetTls(); if (!io) { return false; } if (!LinuxOverlappedTrySubmitWork()) { SysPushErrorIO(); errno = EBADF; return false; } if (!io->dwIoSubmits) { return false; } temp = io_getevents(io->context, 1, 512, ioEvents, &targetTime); if (temp >= 0) { for (AU_ITERATE_N(i, temp)) { auto handle = AuReinterpretCast(ioEvents[i].data); io->dwIoSubmits--; auto errNo = 0; bool bError {}; AuUInt bytesTransacted {}; if (ioEvents[i].res < 0) { errNo = 0 - ioEvents[i].res; bError = true; } else { bytesTransacted = ioEvents[i].res; } handle->LIOS_SendProcess(bytesTransacted, bError, errNo); } } else { auto err = 0 - temp; if (err == EINTR) { errno = EINTR; // Assume signal is also a valid yield } if (err == EINVAL) { SysPushErrorArg(); return false; } if (err == EFAULT) { SysPanic("Either events or timeout is an invalid pointer."); } if (err == ENOSYS) { SysPanic("io_getevents() is not implemented on this architecture."); } } return true; } }