/*** Copyright (C) 2021 J Reece Wilson (a/k/a "Reece"). All rights reserved. File: ByteBuffer.hpp Date: 2021-8-5 Author: Reece ***/ #pragma once namespace Aurora::Memory { static const auto kBufferPageSize = 512; /*** * A bytebuffer object represents a linear, partially-linear resizable, buffer **or** a ring buffer. * * Use cases for a ring buffer include wrapping specific streams whereby your use cases may include * arbitrarily seeking around wrapped blocks of a more limited block stream, a network stream, or other api * IE; * -> peaking a header in a datagram, or tcp stream; where instead of freeing the datagram or saving * the buffered tcp stream, a ring buffer is used to prevent reallocation on each frame * -> peaking, or seeking back after, a compression read transaction. A compression api could be fed on- * demand or ad hoc, writing to its write head pointer, while never running out of space so long as * the decompressed ring read head continues moving * * Small, linear, write/read-once [de]serialization use cases may elect to allocate a buffer and * follow the linear fast paths; perhaps even enabling flagExpandable for hopefully-smarter-than-stdvec-scaling * * Ring buffers scale from the write head, to the read head, potentially going-around in the process * * Linear flagExpandable buffers scale from [0, length]; reallocating at end of buffer if flagExpandable is enabled * if expanding is enabled, * realloc(max(size + offset, (offset / kBufferPageSize + 1) * kBufferPageSize)) * * Deprecates INetworkStream, fixes allocation issues around compression backends * Superseeds abuse of AuList for binary blobs, alongside Memory::Array */ struct ByteBuffer { /////////////////////////////////////////////////////////////////////// // Stable ByteBuffer ABI Header; length and read/write head pointers // /////////////////////////////////////////////////////////////////////// /// Internal capacity to mitigate trivial reallocs AuUInt allocSize; /// Abstract size AuUInt length; /// Buffer pointer AuUInt8 *base; /// Stream pointer AuUInt8 *readPtr; /// Stream pointer AuUInt8 *writePtr; /////////////////////////////////////////////////////////////////////// // Stable ByteBuffer ABI Header; u32 flags // /////////////////////////////////////////////////////////////////////// /// Is ring buffer? AuUInt32 flagCircular : 1; /// Should resize linear buffer to accommodate additional writes AuUInt32 flagExpandable : 1; AuUInt32 flagReadError : 1; AuUInt32 flagWriteError : 1; /////////////////////////////////////////////////////////////////////// ByteBuffer(const ByteBuffer &buffer, bool preservePointers = true) { this->base = ZAlloc(buffer.length); this->length = buffer.length; this->allocSize = buffer.length; if (preservePointers) { this->writePtr = this->base + (buffer.writePtr - buffer.base); this->readPtr = this->base + (buffer.readPtr - buffer.base); } else { this->writePtr = this->base; this->readPtr = this->base; } std::memcpy(this->base, buffer.base, this->length); this->flagCircular = buffer.flagCircular; this->flagExpandable = buffer.flagExpandable; } ByteBuffer(const void *in, AuUInt length, bool circular = false, bool expandable = false) : flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->base = ZAlloc(length); this->length = length; this->allocSize = length; this->readPtr = this->base; this->writePtr = this->readPtr + this->length; std::memcpy(this->base, in, this->length); } ByteBuffer(const AuList &vector, bool circular = false, bool expandable = false) : flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->base = ZAlloc(length); this->length = vector.size(); this->allocSize = vector.size(); this->readPtr = this->base; this->writePtr = this->readPtr + this->length; std::memcpy(this->base, vector.data(), this->length); } ByteBuffer(AuUInt length, bool circular = false, bool expandable = false) : flagCircular(circular), flagExpandable(expandable), flagReadError(0), flagWriteError(0) { this->base = ZAlloc(length); this->length = length; this->allocSize = length; this->readPtr = this->base; this->writePtr = this->base; } ByteBuffer() : flagCircular(0), flagExpandable(0), flagReadError(0), flagWriteError(0) { this->base = {}; this->length = {}; this->allocSize = {}; this->readPtr = {}; this->writePtr = {}; } ~ByteBuffer() { if (this->base) { Free(this->base); } } inline void ResetPositions() { this->flagReadError = 0; this->flagWriteError = 0; this->readPtr = base; this->writePtr = base; } // Iterator AUKN_SYM AuUInt8 *begin(); AUKN_SYM AuUInt8 *end(); // To alternative types AUKN_SYM AuList ToVector(); // Seek AUKN_SYM bool ReaderTryGoForward(AuUInt32 offset); AUKN_SYM bool ReaderTryGoBack(AuUInt32 offset); AUKN_SYM bool WriterTryGoForward(AuUInt32 offset); AUKN_SYM AuOptional WriterTryGetWriteHeadFor(AuUInt32 nBytes); // Template implementation // TODO: remove to .inl template inline bool Read(T &out) { if constexpr (std::is_class_v) { if constexpr (AuIsBaseOfTemplate>::value) { if (Read() != sizeof(typename T::value_type)) { this->flagReadError = true; return false; } auto len = Read(); out.resize(len); for (auto i = 0u; i < len; i++) { Read(out[i]); } return !this->flagReadError; } else if constexpr (std::is_same_v, AuString>) { out.resize(Read()); Read(out.data(), out.size()); return !this->flagReadError; } } auto oldptr = readPtr; auto skipped = Read(&out, sizeof(T)); if (skipped != sizeof(T)) { this->flagReadError = true; return false; } return true; } template inline T Read() { T a{}; Read(a); return a; } template inline bool Write(const T &in) { if constexpr (std::is_class_v) { if constexpr (AuIsBaseOfTemplate>::value) { Write(sizeof(typename T::value_type)); Write(AuUInt32(in.size())); for (const auto &item : in) { Write(item); } return !this->flagWriteError; } else if constexpr (std::is_same_v, AuString>) { Write(AuUInt32(in.size())); Write(in.data(), in.size()); return !this->flagWriteError; } } auto skipped = Write(&in, sizeof(T)); if (skipped != sizeof(T)) { this->flagWriteError = true; return false; } return true; } inline bool Resize(AuUInt length) { AuUInt oldWriteIdx, oldReadIdx, oldLength, newLength; AuUInt8 *nextRead, *nextWrite, *nextPtr; if (this->allocSize > length) { this->length = length; oldLength = this->length; newLength = length; nextPtr = this->base; oldWriteIdx = this->writePtr - this->base; oldReadIdx = this->readPtr - this->base; nextRead = nextPtr + oldReadIdx; nextWrite = nextPtr + oldWriteIdx; } else { oldLength = this->length; newLength = std::max(AuUInt(length), AuUInt(((this->allocSize / kBufferPageSize) + 1) * kBufferPageSize)); nextPtr = ZRealloc(this->base, newLength); if (!nextPtr) { return false; } oldWriteIdx = this->writePtr - this->base; oldReadIdx = this->readPtr - this->base; nextRead = nextPtr + oldReadIdx; nextWrite = nextPtr + oldWriteIdx; this->allocSize = newLength; this->length = length; } this->base = nextPtr; if (!flagCircular) { this->readPtr = nextRead; this->writePtr = nextWrite; } else { if (this->writePtr > this->readPtr) { this->readPtr = nextRead; this->writePtr = nextWrite; } else { auto expansion = newLength - oldLength; auto movableTail = std::min(oldWriteIdx, expansion); std::memcpy(nextPtr + oldLength, nextPtr, movableTail); this->readPtr = nextRead; this->writePtr = nextPtr + oldLength + movableTail; } } return true; } inline AuUInt Write(const void *buffer, AuUInt requestLength) { AuUInt linearOverhead = 0, toReadOverhead = 0, linearWritable = 0, toReadWritable = 0, writable = 0; auto cptr = reinterpret_cast(buffer); if (flagCircular) { // 0 1 2 3 4 5 // W R // 6 - (1) = 5 -> read bound; we have zero overhead not 5 if (writePtr < readPtr) { // Handle read-bound writes linearOverhead = readPtr - writePtr; toReadOverhead = 0; } else { // Handle ordinary stream consume bound IO linearOverhead = length - (writePtr - base); toReadOverhead = readPtr - base; } writable = std::min(linearOverhead + toReadOverhead, requestLength); linearWritable = std::min(linearOverhead, requestLength); toReadWritable = writable - linearWritable; if (cptr) { std::memcpy(writePtr, cptr, linearWritable); } writePtr += linearWritable; if (toReadWritable) { writePtr = base; if (cptr) { std::memcpy(writePtr, cptr + linearOverhead, toReadWritable); } writePtr += toReadWritable; } #if 0 if (writePtr == base + length) { writePtr = base; } #endif return linearWritable + toReadWritable; } else { auto offset = writePtr - base; auto overhead = length - offset; AuUInt len = std::min(overhead, requestLength); if ((len != requestLength) && (flagExpandable)) { if (!Resize(offset + requestLength)) { return 0; } overhead = length - offset; len = std::min(overhead, requestLength); } if (buffer) { std::memcpy(writePtr, buffer, len); } writePtr += len; return len; } } inline AuUInt RemainingBytes(bool endAtWrite = true) { if (flagCircular) { if ((readPtr < writePtr) && (endAtWrite)) { return length - (writePtr - readPtr); } else { auto linearOverhead = length - (readPtr - base); auto toWriteOverhead = writePtr - base; return linearOverhead + toWriteOverhead; } } else { if (endAtWrite) { if (writePtr < readPtr) { return 0; } else { return writePtr - readPtr; } } else { return (length - (readPtr - base)); } } } inline AuUInt RemainingWrite(bool endAtRead = true) { if (flagCircular) { if ((writePtr < readPtr) && (endAtRead)) { return length - (readPtr - writePtr); } else { auto linearOverhead = length - (writePtr - base); auto toWriteOverhead = readPtr - base; return linearOverhead + toWriteOverhead; } } else { return length - (writePtr - base); } } inline AuList RemainingBytesToVector(bool endAtWrite = true) { AuList vec; if (flagCircular) { if ((readPtr < writePtr) && (endAtWrite)) { auto len = length - (writePtr - readPtr); vec.resize(len); std::memcpy(vec.data(), readPtr, len); } else { auto linearOverhead = length - (readPtr - base); auto toWriteOverhead = endAtWrite ? (writePtr - base) : (readPtr - base); vec.resize(linearOverhead + toWriteOverhead); std::memcpy(vec.data(), readPtr, linearOverhead); std::memcpy(vec.data() + linearOverhead, base, linearOverhead); } } else { AuUInt len; if (endAtWrite) { len = writePtr - readPtr; } else { len = length - (readPtr - base); } vec.resize(len); std::memcpy(vec.data(), readPtr, len); } return vec; } inline bool Skip(AuUInt count) { auto oldptr = readPtr; auto skipped = Read(nullptr, count); if (skipped != count) { readPtr = oldptr; return false; } return true; } inline AuUInt GetReadOffset() { if (flagCircular) { return 0; } else { return readPtr - base; } } inline AuUInt GetWriteOffset() { if (flagCircular) { return 0; } else { return writePtr - base; } } inline AuUInt Read(void *out, AuUInt requestedLength, bool peak = false) { AuUInt linearOverhead = 0, toWriteOverhead = 0, linearReadable = 0, toWriteReadable = 0; if (flagCircular) { if (readPtr < writePtr) { linearOverhead = writePtr - readPtr; toWriteOverhead = 0; } else { linearOverhead = length - (readPtr - base); toWriteOverhead = writePtr - base; } auto readable = std::min(linearOverhead + toWriteOverhead, requestedLength); linearReadable = std::min(linearOverhead, requestedLength); toWriteReadable = readable - linearReadable; if (out) { std::memcpy(out, readPtr, linearOverhead); } if (!peak) { readPtr += linearOverhead; } if (toWriteOverhead) { std::memcpy(reinterpret_cast(out) + linearOverhead, base, toWriteReadable); if (!peak) { readPtr = base + toWriteReadable; } } #if 0 if (readPtr == base + length) { readPtr = base; } #endif return linearReadable + toWriteReadable; } else { AuUInt len = std::min(AuUInt(writePtr - readPtr), requestedLength); if (out) { std::memcpy(out, readPtr, len); } if (!peak) { readPtr += len; } return len; } } }; }