AuroraRuntime/Source/IO/IPC/IPCMutexFutex.Linux.cpp
J Reece Wilson 64cb7404ba [+] Near 1:1 Linux IPC Pipe compared to the NT equivalent (~= CreateNamedPipeA(nMaxInstances=1, dwOpenMode=PIPE_ACCESS_DUPLEX, dwPipeMode=PIPE_TYPE_BYTE))
[+] Ability to bypass blocking limitation of certain io_submit reads, if the blocking subsystem is a pollable stream (ie: a pipe).
[*] Fixed major Linux bug where LoopQueue items weren't being submitted, if no dequeues were in the same tick
[*] Fix various Linux pipe related bugs
[*] Fix futex bug where the callback was nulled on server-release
2022-08-09 07:48:29 +01:00

744 lines
18 KiB
C++
Executable File

/***
Copyright (C) 2022 J Reece Wilson (a/k/a "Reece"). All rights reserved.
File: IPCMutexFutex.Linux.cpp
Date: 2022-
Author: Reece
Note:
***/
#include <Source/RuntimeInternal.hpp>
#include "IPC.hpp"
#include "IPCHandle.hpp"
#include "IPCMutexFutex.Linux.hpp"
#include "IPCPrimitives.Linux.hpp"
#include "IPCMemory.Unix.hpp"
// LINUX SYSCALL APIS
#include <linux/futex.h>
#include <syscall.h>
// INTERNAL UTILS
// ...IO / FD SHARING
#include <Source/IO/UNIX/FDIpcServer.hpp>
// ...TIME UTILS
#include <Source/Time/Time.hpp>
////////////////////////////////////////////////////////////////////////////////////
// SYSCALLS
////////////////////////////////////////////////////////////////////////////////////
static int futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3)
{
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
static int futex_wait(uint32_t *addr, uint32_t expected)
{
return futex(addr, FUTEX_WAIT, expected, 0, 0, 0);
}
static int futex_wait(uint32_t *addr, uint32_t expected, const struct timespec *timeout)
{
return futex(addr, FUTEX_WAIT, expected, timeout, 0, 0);
}
static int futex_wake(uint32_t *addr, uint32_t nthreads)
{
return futex(addr, FUTEX_WAKE, nthreads, 0, 0, 0);
}
static long set_robust_list(struct robust_list_head *head, size_t len)
{
return syscall(SYS_set_robust_list, head, len);
}
static long get_robust_list(int pid, struct robust_list_head **head_ptr, size_t *len_ptr)
{
return syscall(SYS_get_robust_list, pid, head_ptr, len_ptr);
}
////////////////////////////////////////////////////////////////////////////////////
// CONSTANTS
////////////////////////////////////////////////////////////////////////////////////
static const AuUInt8 kMaxFutexes = 128;
static const AuUInt32 kFutexArraySize = AuUInt32(kMaxFutexes) * (sizeof(AuUInt64) * 3);
static const AuUInt32 kFutexIsValid = 0x80000000;
static const AuUInt32 kFutexIsDead = 0x40000000;
static const AuUInt32 kFutexIsHasOwner = 0x20000000;
//static const AuUInt32 kFutexValueLocked = kFutexIsHasOwner | kFutexIsValid | 1;
static const AuUInt32 kFutexValueUnlocked = kFutexIsHasOwner;
static const AuUInt32 kFutexValueNULL = 0;
static AuUInt32 gConstNull = 0;
////////////////////////////////////////////////////////////////////////////////////
// VARIABLES
////////////////////////////////////////////////////////////////////////////////////
struct FutexObject
{
union
{
AuUInt64 maxWord;
void *nextPtr;
};
union
{
AuUInt32 futex;
AuUInt64 futexPadded;
};
};
static_assert(sizeof(FutexObject) == 16);
static AuThreadPrimitives::SpinLock gLock;
static AuSPtr<AuIPC::IPCSharedMemory> gFutexSharedMemory;
static FutexObject *gFutexArray;
static bool gFutexInit {};
static AuIOIPC::IMutexClosedHook * gFutexCallbacks[kMaxFutexes];
////////////////////////////////////////////////////////////////////////////////////
// IMPLEMENTATION
////////////////////////////////////////////////////////////////////////////////////
static void InitFutexAPI()
{
AU_LOCK_GUARD(gLock);
if (AuExchange(gFutexInit, true))
{
return;
}
gFutexSharedMemory = AuIOIPC::NewSharedMemory(kFutexArraySize);
SysAssert(gFutexSharedMemory);
gFutexArray = gFutexSharedMemory->GetMemory().Begin<FutexObject>();
}
struct MagicFutexLinkHeader : robust_list_head
{
AuUInt32 linkCount {};
};
struct FutexContext
{
bool bInit {};
AuThreadPrimitives::SpinLock lock;
AuUInt32 tid {};
bool Init();
bool SetGrugSelf();
bool Link(AuUInt32 *ptr);
bool Unlink(AuUInt32 *ptr);
MagicFutexLinkHeader futexArrayHeader {};
};
static FutexContext gFutexContext;
void LinuxSuperSecretFuckGlibc();
static void LinuxLockFutex(AuUInt32 *futex);
void LinuxSuperSecretFuckGlibc()
{
// Fun: You have to define an entry-relative-offset within a linked list header, hard coding an ABI, for every user
// program in the tasks/processes address space. Low-iq linux kernel comments state "tehe" check with the glibc folk.
//
// Counter, they aren't "folk," they aren't even real people.
// Fuck you, and fuck your buzzword mutex that hasn't evolved into a stable API since its initial hack of an
// implementation 20 years ago. I don't want to make a fucking array of pthreads (40+ bytes for a single atomic).
// I don't want to fucking define a system-wide abi, matching ONE OF MANY fucking glibc ABIs, any given process
// could be arbitrarily linked against.
//
// As a wise man once said, "Linus says fuck you Nvidia, I say f............! That's what I say."
// ---------------------------------------------------------------------------------------------------------------------
// Unrelated, here's what FreeBSD says about the UMUTEX_ROBUST list set operation:
// Note that if any 32-bit ABI compatibility is being requested, then care
// must be taken with robust lists. A single thread may not mix 32-bit com-
// patible robust lists with native robust lists. The first
// UMTX_OP_ROBUST_LISTS call in a given thread determines which ABI that
// thread will use for robust lists going forward.
//
// Code: https://github.com/freebsd/freebsd-src/blob/27a9392d543933f1aaa4e4ddae2a1585a72db1b2/sys/kern/kern_umtx.c#L4150
//
// ...FreeBSD just copied the fucking glibc tards between 2012-present didn't they? I bet they were "inspired" by glibcs
// nptl implementation on Linux, and thought hey, let's just copy this so we can have a functional CRT with shared pt-mutexes
// ---------------------------------------------------------------------------------------------------------------------
// I'll just have to use a dedicated watcher process that'll teardown a robust list of only standard
// entries limited to the Aurora IPC origin. Will use the grug thread for this bc i dont foresee user
// code locking shared pthread-mutexs under the few user-interfaces grug may call out to.
//
SysAssert(gFutexContext.SetGrugSelf());
}
bool FutexContext::SetGrugSelf()
{
// Specify the magic word to use...
this->tid = gettid();
// Update the header
this->futexArrayHeader.list.next = &this->futexArrayHeader.list;
this->futexArrayHeader.futex_offset = 8;
this->futexArrayHeader.list_op_pending = NULL;
// Update TLS
if (::set_robust_list(AuReinterpretCast<robust_list_head *>(&this->futexArrayHeader.list), sizeof(robust_list_head)) != 0)
{
SysPushErrorIO("Set robust list failed");
return false;
}
return true;
}
bool FutexContext::Init()
{
AU_LOCK_GUARD(this->lock);
if (AuExchange(this->bInit, true))
{
return true;
}
InitFutexAPI();
return true;
}
bool FutexContext::Link(AuUInt32 *ptr)
{
if (!ptr)
{
return false;
}
if (!Init())
{
return false;
}
AU_LOCK_GUARD(this->lock);
auto temp = AuReinterpretCast<FutexObject *>(AuUInt(ptr) - offsetof(FutexObject, futex));
auto firstElement = futexArrayHeader.list.next;
temp->nextPtr = firstElement;
futexArrayHeader.list_op_pending = nullptr;
futexArrayHeader.list.next = (robust_list *)temp;
futexArrayHeader.linkCount++;
return true;
}
bool FutexContext::Unlink(AuUInt32 *ptr)
{
if (!ptr)
{
return false;
}
if (!Init())
{
return false;
}
AU_LOCK_GUARD(this->lock);
FutexObject *cur { (FutexObject *)this->futexArrayHeader.list.next };
FutexObject *prevLink { cur };
for (int i = 0; i < this->futexArrayHeader.linkCount; i++)
{
if (&cur->futex != ptr)
{
prevLink = cur;
cur = (FutexObject *)cur->nextPtr;
continue;
}
prevLink->nextPtr = cur->nextPtr;
this->futexArrayHeader.linkCount--;
return true;
}
return false;
}
static AuUInt8 AllocateFutex()
{
gFutexContext.Init();
if (!gFutexArray)
{
return 255;
}
for (AuUInt32 i = 0;
i < kMaxFutexes;
i++)
{
if (AuAtomicCompareExchange<AuUInt32>(&gFutexArray[i].futex, kFutexValueUnlocked, kFutexValueNULL) == kFutexValueNULL)
{
if (gFutexCallbacks[i])
{
continue;
}
return i;
}
}
return 255;
}
static bool TryReleaseFutex(AuUInt8 index)
{
auto old = gFutexCallbacks[index];
auto oldState = gFutexArray[index].futexPadded;
if ((old) &&
(!old->OnClosed()))
{
return false;
}
if (AuAtomicCompareExchange<AuUInt32>(&gFutexArray[index].futex, kFutexValueUnlocked, oldState) != oldState)
{
return false;
}
return true;
}
static void FreeFutex(AuUInt8 index)
{
gFutexArray[index].futexPadded = 0;
gFutexCallbacks[index] = nullptr;
}
static void LinuxAddRobustFutexSlow(AuUInt32 *futex)
{
SysAssert(gFutexContext.Init());
SysAssert(gFutexContext.Link(futex));
}
static void LinuxRemoveRobustFutexSlow(AuUInt32 *futex)
{
SysAssert(gFutexContext.Init());
if (!gFutexContext.Unlink(futex))
{
SysPushErrorIO("Unlink futex error: {}", fmt::ptr(futex));
}
}
static bool LinuxLockFutex(AuUInt32 *futex, AuUInt32 timeout)
{
bool bContended;
struct timespec tspec;
AuUInt32 value = gFutexContext.tid;
if (timeout)
{
AuTime::ms2tsabs(&tspec, timeout);
}
do
{
bContended = AuAtomicCompareExchange<AuUInt32>(futex, value, kFutexValueUnlocked) != kFutexValueUnlocked;
if (bContended)
{
int res = ::futex_wait(futex, kFutexValueUnlocked, timeout ? &tspec : nullptr);
if (res < 0)
{
if (res != -EAGAIN)
{
SysPushErrorIO("FUTEX ERROR: {}", res);
return false;
}
else
{
//EAGAIN
bContended = true;
}
}
else
{
// SUCCESS
bContended = false;
}
}
}
while (bContended);
::LinuxAddRobustFutexSlow(futex);
return true;
}
static bool LinuxTryLockFutex(AuUInt32 *futex)
{
if (AuAtomicCompareExchange<AuUInt32>(futex,
gFutexContext.tid,
kFutexValueUnlocked) != kFutexValueUnlocked)
{
return false;
}
LinuxAddRobustFutexSlow(futex);
return true;
}
static bool LinuxUnlockFutex(AuUInt32 *futex)
{
LinuxRemoveRobustFutexSlow(futex);
if (AuAtomicCompareExchange<AuUInt32>(futex,
kFutexValueUnlocked,
gFutexContext.tid) != gFutexContext.tid)
{
return false;
}
::futex_wake(futex, 1);
return true;
}
void LinuxSuperSecretIOTick()
{
if (!gFutexInit)
{
return;
}
if (!gFutexArray)
{
return;
}
static Aurora::Utility::RateLimiter gLimiter;
if (!gLimiter.nextTriggerTime)
{
gLimiter.noCatchUp = true;
gLimiter.SetNextStep(AuMSToNS<AuUInt64>(3'000));
}
if (!gLimiter.CheckExchangePass())
{
return;
}
for (AuUInt32 i = 0;
i < kMaxFutexes;
i++)
{
auto val = gFutexArray[i].futex;
val &= ~(0x3fffffff);
if ((val & kFutexIsDead) != 0)
{
::TryReleaseFutex(i);
}
}
}
namespace Aurora::IO::IPC
{
static AuThreadPrimitives::SpinLock gLock;
static AuBST<AuPair<AuUInt64 /*cookie*/, AuUInt32 /*pid*/>, AuWPtr<IPCSharedMemory>> gSharedViewCache;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Mutexes
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
IPCMutexProxy::IPCMutexProxy(AuUInt32 index) :
mutex_(),
bOwned(true),
index_(index)
{
if (this->mutex_.HasValidHandle())
{
if (IO::UNIX::FDServe(this->GetHandle(), this->token_))
{
//this->handle_.PushId(EIPCHandleType::eIPCPrimitiveMutex, token);
}
else
{
this->mutex_.~LSMutex();
}
}
this->mem_ = gFutexSharedMemory;
this->mutex_.bNoAutoRel = true;
gFutexCallbacks[index] = this;
}
IPCMutexProxy::IPCMutexProxy(int handle, AuSPtr<IPCSharedMemory> mem, AuUInt32 index) :
mutex_(handle),
mem_(mem),
index_(index)
{
if (this->mutex_.HasValidHandle())
{
if (IO::UNIX::FDServe(this->GetHandle(), this->token_))
{
//this->handle_.PushId(EIPCHandleType::eIPCPrimitiveMutex, token);
}
else
{
this->mutex_.~LSMutex();
}
}
this->mutex_.bNoAutoRel = true;
}
IPCMutexProxy::~IPCMutexProxy()
{
IO::UNIX::FDServeEnd(this->token_);
if (this->bOwned)
{
::FreeFutex(this->index_);
}
}
AuUInt32 *IPCMutexProxy::GetFutex()
{
return &this->mem_->GetMemory().Begin<FutexObject>()[this->index_].futex;
}
bool IPCMutexProxy::Unlock()
{
auto futex = this->GetFutex();
if (!futex)
{
return false;
}
if (!::LinuxUnlockFutex(futex))
{
return false;
}
SysAssert(this->mutex_.Unlock());
this->leakSelf_.reset();
return true;
}
bool IPCMutexProxy::IsSignaled()
{
auto futex = this->GetFutex();
if (!futex)
{
return false;
}
if (!::LinuxTryLockFutex(futex))
{
return false;
}
if (!this->mutex_.IsSignaled())
{
::LinuxUnlockFutex(futex);
return false;
}
this->leakSelf_ = AuSharedFromThis();
return true;
}
bool IPCMutexProxy::WaitOn(AuUInt32 timeout)
{
auto futex = this->GetFutex();
if (!futex)
{
return false;
}
if (!::LinuxLockFutex(futex, timeout))
{
return false;
}
if (!this->mutex_.IsSignaled())
{
::LinuxUnlockFutex(futex);
return false;
}
this->leakSelf_ = AuSharedFromThis();
return true;
}
bool IPCMutexProxy::OnClosed()
{
auto futex = this->GetFutex();
if (this->pMutexClosedHook)
{
if (!this->pMutexClosedHook->OnClosed())
{
return false;
}
}
if (futex)
{
auto a = *futex;
if (*futex == kFutexIsDead)
{
this->mutex_.Unlock();
}
}
return true;
}
Loop::ELoopSource IPCMutexProxy::GetType()
{
return this->mutex_.GetType();
}
AuString IPCMutexProxy::ExportToString()
{
IPC::IPCHandle handle;
handle.PushId(EIPCHandleType::eIPCPrimitiveMutex, this->token_);
SysAssert(this->mem_);
handle.PushId(EIPCHandleType::eIPCMemory, AuStaticCast<IPCSharedMemoryImpl>(this->mem_)->handle_.values[0].token);
handle.values[0].token.word = this->index_;
return handle.ToString();
}
AUKN_SYM AuSPtr<IPCMutex> NewMutex()
{
auto futex = ::AllocateFutex();
if (futex == 255)
{
return {};
}
auto object = AuMakeShared<IPCMutexProxy>(futex);
if (!object)
{
SysPushErrorMem();
return {};
}
if (!object->HasValidHandle())
{
SysPushErrorIO();
return {};
}
return object;
}
static AuSPtr<IPCSharedMemory> GetFutexPagesFromCacheOrImport(const IPCToken &mem)
{
AU_LOCK_GUARD(gLock);
auto id = AuMakePair(mem.cookie, mem.pid);
auto itr = gSharedViewCache.find(id);
if (itr != gSharedViewCache.end())
{
auto test = itr->second.lock();
if (test)
{
return test;
}
}
auto shared = ImportSharedMemoryEx(mem);
if (!shared)
{
SysPushErrorNested();
return {};
}
if (!AuTryInsert(gSharedViewCache, id, shared))
{
SysPushErrorMem();
// We don't need to fail. We can leak next time. It'll be fine.
}
return shared;
}
AuSPtr<IPCMutexProxy> ImportMutexEx(const IPCToken &handle, const IPCToken &mem, AuUInt32 index)
{
int fd {-1};
if (!IO::UNIX::FDAccept(handle, fd))
{
SysPushErrorNested();
return {};
}
auto view = GetFutexPagesFromCacheOrImport(mem);
if (!view)
{
SysPushErrorNested();
return {};
}
auto object = AuMakeShared<IPCMutexProxy>(fd, view, index);
if (!object)
{
SysPushErrorMem();
::close(fd);
return {};
}
if (!object->HasValidHandle())
{
SysPushErrorIO();
return {};
}
return object;
}
AUKN_SYM AuSPtr<IPCMutex> ImportMutex(const AuString &handle)
{
IPC::IPCHandle decodedHandle;
if (!decodedHandle.FromString(handle))
{
SysPushErrorParseError("Invalid handle: {}", handle);
return {};
}
auto val = decodedHandle.GetToken(IPC::EIPCHandleType::eIPCPrimitiveMutex, 0);
if (!val)
{
SysPushErrorParseError("Invalid handle: {}", handle);
return {};
}
auto mem = decodedHandle.GetToken(IPC::EIPCHandleType::eIPCMemory, 1);
if (!mem)
{
SysPushErrorParseError("Invalid handle: {}", handle);
return {};
}
return ImportMutexEx(val->token, mem->token, val->token.word);
}
}