SkRWBuffer for thread-safe 'stream' sharing
WIP - Can accumulate (write) data in one thread, and share snapshots of it in other threads ... e.g. network accumulates image data, and periodically we want to decode/draw it - If this sort of thing sticks, should we promote SkData to have the same generality as SkRBuffer? BUG=skia: TBR= Review URL: https://codereview.chromium.org/1106113002
This commit is contained in:
parent
a73239a009
commit
5b6db07fb5
@ -178,6 +178,7 @@
|
||||
'<(skia_src_path)/core/SkRRect.cpp',
|
||||
'<(skia_src_path)/core/SkRTree.h',
|
||||
'<(skia_src_path)/core/SkRTree.cpp',
|
||||
'<(skia_src_path)/core/SkRWBuffer.cpp',
|
||||
'<(skia_src_path)/core/SkScalar.cpp',
|
||||
'<(skia_src_path)/core/SkScalerContext.cpp',
|
||||
'<(skia_src_path)/core/SkScalerContext.h',
|
||||
|
353
src/core/SkRWBuffer.cpp
Normal file
353
src/core/SkRWBuffer.cpp
Normal file
@ -0,0 +1,353 @@
|
||||
/*
|
||||
* Copyright 2015 Google Inc.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license that can be
|
||||
* found in the LICENSE file.
|
||||
*/
|
||||
|
||||
#include "SkRWBuffer.h"
|
||||
#include "SkStream.h"
|
||||
|
||||
// Force small chunks to be a page's worth
|
||||
static const size_t kMinAllocSize = 4096;
|
||||
|
||||
struct SkBufferBlock {
|
||||
SkBufferBlock* fNext;
|
||||
size_t fUsed;
|
||||
size_t fCapacity;
|
||||
|
||||
const void* startData() const { return this + 1; };
|
||||
|
||||
size_t avail() const { return fCapacity - fUsed; }
|
||||
void* availData() { return (char*)this->startData() + fUsed; }
|
||||
|
||||
static SkBufferBlock* Alloc(size_t length) {
|
||||
size_t capacity = LengthToCapacity(length);
|
||||
SkBufferBlock* block = (SkBufferBlock*)sk_malloc_throw(sizeof(SkBufferBlock) + capacity);
|
||||
block->fNext = NULL;
|
||||
block->fUsed = 0;
|
||||
block->fCapacity = capacity;
|
||||
return block;
|
||||
}
|
||||
|
||||
// Return number of bytes actually appended
|
||||
size_t append(const void* src, size_t length) {
|
||||
this->validate();
|
||||
size_t amount = SkTMin(this->avail(), length);
|
||||
memcpy(this->availData(), src, amount);
|
||||
fUsed += amount;
|
||||
this->validate();
|
||||
return amount;
|
||||
}
|
||||
|
||||
void validate() const {
|
||||
#ifdef SK_DEBUG
|
||||
SkASSERT(fCapacity > 0);
|
||||
SkASSERT(fUsed <= fCapacity);
|
||||
#endif
|
||||
}
|
||||
|
||||
private:
|
||||
static size_t LengthToCapacity(size_t length) {
|
||||
const size_t minSize = kMinAllocSize - sizeof(SkBufferBlock);
|
||||
return SkTMax(length, minSize);
|
||||
}
|
||||
};
|
||||
|
||||
struct SkBufferHead {
|
||||
mutable int32_t fRefCnt;
|
||||
SkBufferBlock fBlock;
|
||||
|
||||
static size_t LengthToCapacity(size_t length) {
|
||||
const size_t minSize = kMinAllocSize - sizeof(SkBufferHead);
|
||||
return SkTMax(length, minSize);
|
||||
}
|
||||
|
||||
static SkBufferHead* Alloc(size_t length) {
|
||||
size_t capacity = LengthToCapacity(length);
|
||||
size_t size = sizeof(SkBufferHead) + capacity;
|
||||
SkBufferHead* head = (SkBufferHead*)sk_malloc_throw(size);
|
||||
head->fRefCnt = 1;
|
||||
head->fBlock.fNext = NULL;
|
||||
head->fBlock.fUsed = 0;
|
||||
head->fBlock.fCapacity = capacity;
|
||||
return head;
|
||||
}
|
||||
|
||||
void ref() const {
|
||||
SkASSERT(fRefCnt > 0);
|
||||
sk_atomic_inc(&fRefCnt);
|
||||
}
|
||||
|
||||
void unref() const {
|
||||
SkASSERT(fRefCnt > 0);
|
||||
// A release here acts in place of all releases we "should" have been doing in ref().
|
||||
if (1 == sk_atomic_fetch_add(&fRefCnt, -1, sk_memory_order_acq_rel)) {
|
||||
// Like unique(), the acquire is only needed on success.
|
||||
SkBufferBlock* block = fBlock.fNext;
|
||||
sk_free((void*)this);
|
||||
while (block) {
|
||||
SkBufferBlock* next = block->fNext;
|
||||
sk_free(block);
|
||||
block = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void validate(size_t minUsed, SkBufferBlock* tail = NULL) const {
|
||||
#ifdef SK_DEBUG
|
||||
SkASSERT(fRefCnt > 0);
|
||||
size_t totalUsed = 0;
|
||||
const SkBufferBlock* block = &fBlock;
|
||||
const SkBufferBlock* lastBlock = block;
|
||||
while (block) {
|
||||
block->validate();
|
||||
totalUsed += block->fUsed;
|
||||
lastBlock = block;
|
||||
block = block->fNext;
|
||||
}
|
||||
SkASSERT(minUsed <= totalUsed);
|
||||
if (tail) {
|
||||
SkASSERT(tail == lastBlock);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t used) : fHead(head), fUsed(used) {
|
||||
if (head) {
|
||||
fHead->ref();
|
||||
SkASSERT(used > 0);
|
||||
head->validate(used);
|
||||
} else {
|
||||
SkASSERT(0 == used);
|
||||
}
|
||||
}
|
||||
|
||||
SkROBuffer::~SkROBuffer() {
|
||||
if (fHead) {
|
||||
fHead->validate(fUsed);
|
||||
fHead->unref();
|
||||
}
|
||||
}
|
||||
|
||||
SkROBuffer::Iter::Iter(const SkROBuffer* buffer) {
|
||||
this->reset(buffer);
|
||||
}
|
||||
|
||||
void SkROBuffer::Iter::reset(const SkROBuffer* buffer) {
|
||||
if (buffer) {
|
||||
fBlock = &buffer->fHead->fBlock;
|
||||
fRemaining = buffer->fUsed;
|
||||
} else {
|
||||
fBlock = NULL;
|
||||
fRemaining = 0;
|
||||
}
|
||||
}
|
||||
|
||||
const void* SkROBuffer::Iter::data() const {
|
||||
return fRemaining ? fBlock->startData() : NULL;
|
||||
}
|
||||
|
||||
size_t SkROBuffer::Iter::size() const {
|
||||
return SkTMin(fBlock->fUsed, fRemaining);
|
||||
}
|
||||
|
||||
bool SkROBuffer::Iter::next() {
|
||||
if (fRemaining) {
|
||||
fRemaining -= this->size();
|
||||
fBlock = fBlock->fNext;
|
||||
}
|
||||
return fRemaining != 0;
|
||||
}
|
||||
|
||||
SkRWBuffer::SkRWBuffer(size_t initialCapacity) : fHead(NULL), fTail(NULL), fTotalUsed(0) {}
|
||||
|
||||
SkRWBuffer::~SkRWBuffer() {
|
||||
this->validate();
|
||||
fHead->unref();
|
||||
}
|
||||
|
||||
void SkRWBuffer::append(const void* src, size_t length) {
|
||||
this->validate();
|
||||
if (0 == length) {
|
||||
return;
|
||||
}
|
||||
|
||||
fTotalUsed += length;
|
||||
|
||||
if (NULL == fHead) {
|
||||
fHead = SkBufferHead::Alloc(length);
|
||||
fTail = &fHead->fBlock;
|
||||
}
|
||||
|
||||
size_t written = fTail->append(src, length);
|
||||
SkASSERT(written <= length);
|
||||
src = (const char*)src + written;
|
||||
length -= written;
|
||||
|
||||
if (length) {
|
||||
SkBufferBlock* block = SkBufferBlock::Alloc(length);
|
||||
fTail->fNext = block;
|
||||
fTail = block;
|
||||
written = fTail->append(src, length);
|
||||
SkASSERT(written == length);
|
||||
}
|
||||
this->validate();
|
||||
}
|
||||
|
||||
void* SkRWBuffer::append(size_t length) {
|
||||
this->validate();
|
||||
if (0 == length) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
fTotalUsed += length;
|
||||
|
||||
if (NULL == fHead) {
|
||||
fHead = SkBufferHead::Alloc(length);
|
||||
fTail = &fHead->fBlock;
|
||||
} else if (fTail->avail() < length) {
|
||||
SkBufferBlock* block = SkBufferBlock::Alloc(length);
|
||||
fTail->fNext = block;
|
||||
fTail = block;
|
||||
}
|
||||
|
||||
fTail->fUsed += length;
|
||||
this->validate();
|
||||
return (char*)fTail->availData() - length;
|
||||
}
|
||||
|
||||
#ifdef SK_DEBUG
|
||||
void SkRWBuffer::validate() const {
|
||||
if (fHead) {
|
||||
fHead->validate(fTotalUsed, fTail);
|
||||
} else {
|
||||
SkASSERT(NULL == fTail);
|
||||
SkASSERT(0 == fTotalUsed);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
SkROBuffer* SkRWBuffer::newRBufferSnapshot() const {
|
||||
return SkNEW_ARGS(SkROBuffer, (fHead, fTotalUsed));
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class SkROBufferStreamAsset : public SkStreamAsset {
|
||||
void validate() const {
|
||||
#ifdef SK_DEBUG
|
||||
SkASSERT(fGlobalOffset <= fBuffer->size());
|
||||
SkASSERT(fLocalOffset <= fIter.size());
|
||||
SkASSERT(fLocalOffset <= fGlobalOffset);
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef SK_DEBUG
|
||||
class AutoValidate {
|
||||
SkROBufferStreamAsset* fStream;
|
||||
public:
|
||||
AutoValidate(SkROBufferStreamAsset* stream) : fStream(stream) { stream->validate(); }
|
||||
~AutoValidate() { fStream->validate(); }
|
||||
};
|
||||
#define AUTO_VALIDATE AutoValidate av(this);
|
||||
#else
|
||||
#define AUTO_VALIDATE
|
||||
#endif
|
||||
|
||||
public:
|
||||
SkROBufferStreamAsset(const SkROBuffer* buffer) : fBuffer(SkRef(buffer)), fIter(buffer) {
|
||||
fGlobalOffset = fLocalOffset = 0;
|
||||
}
|
||||
|
||||
virtual ~SkROBufferStreamAsset() { fBuffer->unref(); }
|
||||
|
||||
size_t getLength() const override { return fBuffer->size(); }
|
||||
|
||||
bool rewind() override {
|
||||
AUTO_VALIDATE
|
||||
fIter.reset(fBuffer);
|
||||
fGlobalOffset = fLocalOffset = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t read(void* dst, size_t request) override {
|
||||
AUTO_VALIDATE
|
||||
size_t bytesRead = 0;
|
||||
for (;;) {
|
||||
size_t size = fIter.size();
|
||||
SkASSERT(fLocalOffset <= size);
|
||||
size_t avail = SkTMin(size - fLocalOffset, request - bytesRead);
|
||||
if (dst) {
|
||||
memcpy(dst, (const char*)fIter.data() + fLocalOffset, avail);
|
||||
dst = (char*)dst + avail;
|
||||
}
|
||||
bytesRead += avail;
|
||||
fLocalOffset += avail;
|
||||
SkASSERT(bytesRead <= request);
|
||||
if (bytesRead == request) {
|
||||
break;
|
||||
}
|
||||
// If we get here, we've exhausted the current iter
|
||||
SkASSERT(fLocalOffset == size);
|
||||
fLocalOffset = 0;
|
||||
if (!fIter.next()) {
|
||||
break; // ran out of data
|
||||
}
|
||||
}
|
||||
fGlobalOffset += bytesRead;
|
||||
SkASSERT(fGlobalOffset <= fBuffer->size());
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
bool isAtEnd() const override {
|
||||
return fBuffer->size() == fGlobalOffset;
|
||||
}
|
||||
|
||||
SkStreamAsset* duplicate() const override {
|
||||
return SkNEW_ARGS(SkROBufferStreamAsset, (fBuffer));
|
||||
}
|
||||
|
||||
size_t getPosition() const {
|
||||
return fGlobalOffset;
|
||||
}
|
||||
|
||||
bool seek(size_t position) {
|
||||
AUTO_VALIDATE
|
||||
if (position < fGlobalOffset) {
|
||||
this->rewind();
|
||||
}
|
||||
(void)this->skip(position - fGlobalOffset);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool move(long offset) {
|
||||
AUTO_VALIDATE
|
||||
offset += fGlobalOffset;
|
||||
if (offset <= 0) {
|
||||
this->rewind();
|
||||
} else {
|
||||
(void)this->seek(SkToSizeT(offset));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
SkStreamAsset* fork() const override {
|
||||
SkStreamAsset* clone = this->duplicate();
|
||||
clone->seek(this->getPosition());
|
||||
return clone;
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
const SkROBuffer* fBuffer;
|
||||
SkROBuffer::Iter fIter;
|
||||
size_t fLocalOffset;
|
||||
size_t fGlobalOffset;
|
||||
};
|
||||
|
||||
SkStreamAsset* SkRWBuffer::newStreamSnapshot() const {
|
||||
SkAutoTUnref<SkROBuffer> buffer(this->newRBufferSnapshot());
|
||||
return SkNEW_ARGS(SkROBufferStreamAsset, (buffer));
|
||||
}
|
97
src/core/SkRWBuffer.h
Normal file
97
src/core/SkRWBuffer.h
Normal file
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2015 Google Inc.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license that can be
|
||||
* found in the LICENSE file.
|
||||
*/
|
||||
|
||||
#ifndef SkRWBuffer_DEFINED
|
||||
#define SkRWBuffer_DEFINED
|
||||
|
||||
#include "SkRefCnt.h"
|
||||
|
||||
struct SkBufferBlock;
|
||||
struct SkBufferHead;
|
||||
class SkRWBuffer;
|
||||
class SkStreamAsset;
|
||||
|
||||
/**
|
||||
* Contains a read-only, thread-sharable block of memory. To access the memory, the caller must
|
||||
* instantiate a local iterator, as the memory is stored in 1 or more contiguous blocks.
|
||||
*/
|
||||
class SkROBuffer : public SkRefCnt {
|
||||
public:
|
||||
/**
|
||||
* Return the logical length of the data owned/shared by this buffer. It may be stored in
|
||||
* multiple contiguous blocks, accessible via the iterator.
|
||||
*/
|
||||
size_t size() const { return fUsed; }
|
||||
|
||||
class Iter {
|
||||
public:
|
||||
Iter(const SkROBuffer*);
|
||||
|
||||
void reset(const SkROBuffer*);
|
||||
|
||||
/**
|
||||
* Return the current continuous block of memory, or NULL if the iterator is exhausted
|
||||
*/
|
||||
const void* data() const;
|
||||
|
||||
/**
|
||||
* Returns the number of bytes in the current continguous block of memory, or 0 if the
|
||||
* iterator is exhausted.
|
||||
*/
|
||||
size_t size() const;
|
||||
|
||||
/**
|
||||
* Advance to the next contiguous block of memory, returning true if there is another
|
||||
* block, or false if the iterator is exhausted.
|
||||
*/
|
||||
bool next();
|
||||
|
||||
private:
|
||||
const SkBufferBlock* fBlock;
|
||||
size_t fRemaining;
|
||||
};
|
||||
|
||||
private:
|
||||
SkROBuffer(const SkBufferHead* head, size_t used);
|
||||
virtual ~SkROBuffer();
|
||||
|
||||
const SkBufferHead* fHead;
|
||||
const size_t fUsed;
|
||||
|
||||
friend class SkRWBuffer;
|
||||
};
|
||||
|
||||
/**
|
||||
* Accumulates bytes of memory that are "appended" to it, growing internal storage as needed.
|
||||
* The growth is done such that at any time, a RBuffer or StreamAsset can be snapped off, which
|
||||
* can see the previously stored bytes, but which will be unaware of any future writes.
|
||||
*/
|
||||
class SkRWBuffer {
|
||||
public:
|
||||
SkRWBuffer(size_t initialCapacity = 0);
|
||||
~SkRWBuffer();
|
||||
|
||||
size_t size() const { return fTotalUsed; }
|
||||
void append(const void* buffer, size_t length);
|
||||
void* append(size_t length);
|
||||
|
||||
SkROBuffer* newRBufferSnapshot() const;
|
||||
SkStreamAsset* newStreamSnapshot() const;
|
||||
|
||||
#ifdef SK_DEBUG
|
||||
void validate() const;
|
||||
#else
|
||||
void validate() const {}
|
||||
#endif
|
||||
|
||||
private:
|
||||
SkBufferHead* fHead;
|
||||
SkBufferBlock* fTail;
|
||||
size_t fTotalUsed;
|
||||
};
|
||||
|
||||
#endif
|
@ -232,3 +232,84 @@ DEF_TEST(Data, reporter) {
|
||||
test_cstring(reporter);
|
||||
test_files(reporter);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#include "SkRWBuffer.h"
|
||||
|
||||
const char gABC[] = "abcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
static void check_abcs(skiatest::Reporter* reporter, const char buffer[], size_t size) {
|
||||
REPORTER_ASSERT(reporter, size % 26 == 0);
|
||||
for (size_t offset = 0; offset < size; offset += 26) {
|
||||
REPORTER_ASSERT(reporter, !memcmp(&buffer[offset], gABC, 26));
|
||||
}
|
||||
}
|
||||
|
||||
// stream should contains an integral number of copies of gABC.
|
||||
static void check_alphabet_stream(skiatest::Reporter* reporter, SkStream* stream) {
|
||||
REPORTER_ASSERT(reporter, stream->hasLength());
|
||||
size_t size = stream->getLength();
|
||||
REPORTER_ASSERT(reporter, size % 26 == 0);
|
||||
|
||||
SkAutoTMalloc<char> storage(size);
|
||||
char* array = storage.get();
|
||||
size_t bytesRead = stream->read(array, size);
|
||||
REPORTER_ASSERT(reporter, bytesRead == size);
|
||||
check_abcs(reporter, array, size);
|
||||
|
||||
// try checking backwards
|
||||
for (size_t offset = size; offset > 0; offset -= 26) {
|
||||
REPORTER_ASSERT(reporter, stream->seek(offset - 26));
|
||||
REPORTER_ASSERT(reporter, stream->getPosition() == offset - 26);
|
||||
REPORTER_ASSERT(reporter, stream->read(array, 26) == 26);
|
||||
check_abcs(reporter, array, 26);
|
||||
REPORTER_ASSERT(reporter, stream->getPosition() == offset);
|
||||
}
|
||||
}
|
||||
|
||||
// reader should contains an integral number of copies of gABC.
|
||||
static void check_alphabet_buffer(skiatest::Reporter* reporter, const SkROBuffer* reader) {
|
||||
size_t size = reader->size();
|
||||
REPORTER_ASSERT(reporter, size % 26 == 0);
|
||||
|
||||
SkAutoTMalloc<char> storage(size);
|
||||
SkROBuffer::Iter iter(reader);
|
||||
size_t offset = 0;
|
||||
do {
|
||||
SkASSERT(offset + iter.size() <= size);
|
||||
memcpy(storage.get() + offset, iter.data(), iter.size());
|
||||
offset += iter.size();
|
||||
} while (iter.next());
|
||||
REPORTER_ASSERT(reporter, offset == size);
|
||||
check_abcs(reporter, storage.get(), size);
|
||||
}
|
||||
|
||||
DEF_TEST(RWBuffer, reporter) {
|
||||
// Knowing that the default capacity is 4096, choose N large enough so we force it to use
|
||||
// multiple buffers internally.
|
||||
const int N = 1000;
|
||||
SkROBuffer* readers[N];
|
||||
SkStream* streams[N];
|
||||
|
||||
{
|
||||
SkRWBuffer buffer;
|
||||
for (int i = 0; i < N; ++i) {
|
||||
if (0 == (i & 1)) {
|
||||
buffer.append(gABC, 26);
|
||||
} else {
|
||||
memcpy(buffer.append(26), gABC, 26);
|
||||
}
|
||||
readers[i] = buffer.newRBufferSnapshot();
|
||||
streams[i] = buffer.newStreamSnapshot();
|
||||
}
|
||||
REPORTER_ASSERT(reporter, N*26 == buffer.size());
|
||||
}
|
||||
|
||||
for (int i = 0; i < N; ++i) {
|
||||
REPORTER_ASSERT(reporter, (i + 1) * 26U == readers[i]->size());
|
||||
check_alphabet_buffer(reporter, readers[i]);
|
||||
check_alphabet_stream(reporter, streams[i]);
|
||||
readers[i]->unref();
|
||||
SkDELETE(streams[i]);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user