Fixes for SkRWBuffer
Do not call SkBufferHead::validate in SkROBuffer's destructor, which may be called in a separate thread from SkRWBuffer::append. validate() reads SkBufferBlock::fUsed, and append() writes to it, resulting in a data race. Update some comments to be more clear about how it is safe to use these classes across threads. Test the readers in separate threads. In addition, make sure it is safe to create a reader even when no data has been appended. Add tests for this case. Mark a parameter to SkBufferHead::validate() as const, reflecting its use. BUG=chromium:601578 BUG=chromium:605479 GOLD_TRYBOT_URL= https://gold.skia.org/search2?unt=true&query=source_type%3Dgm&master=false&issue=1871953002 Review URL: https://codereview.chromium.org/1871953002
This commit is contained in:
parent
488165e689
commit
6351640285
@ -53,22 +53,25 @@ public:
|
||||
private:
|
||||
const SkBufferBlock* fBlock;
|
||||
size_t fRemaining;
|
||||
const SkROBuffer* fBuffer;
|
||||
};
|
||||
|
||||
private:
|
||||
SkROBuffer(const SkBufferHead* head, size_t available);
|
||||
SkROBuffer(const SkBufferHead* head, size_t available, const SkBufferBlock* fTail);
|
||||
virtual ~SkROBuffer();
|
||||
|
||||
const SkBufferHead* fHead;
|
||||
const size_t fAvailable;
|
||||
const SkBufferHead* fHead;
|
||||
const size_t fAvailable;
|
||||
const SkBufferBlock* fTail;
|
||||
|
||||
friend class SkRWBuffer;
|
||||
};
|
||||
|
||||
/**
|
||||
* Accumulates bytes of memory that are "appended" to it, growing internal storage as needed.
|
||||
* The growth is done such that at any time, a RBuffer or StreamAsset can be snapped off, which
|
||||
* can see the previously stored bytes, but which will be unaware of any future writes.
|
||||
* The growth is done such that at any time in the writer's thread, an RBuffer or StreamAsset
|
||||
* can be snapped off (and safely passed to another thread). The RBuffer/StreamAsset snapshot
|
||||
* can see the previously stored bytes, but will be unaware of any future writes.
|
||||
*/
|
||||
class SK_API SkRWBuffer {
|
||||
public:
|
||||
|
@ -42,6 +42,8 @@ struct SkBufferBlock {
|
||||
return amount;
|
||||
}
|
||||
|
||||
// Do not call in the reader thread, since the writer may be updating fUsed.
|
||||
// (The assertion is still true, but TSAN still may complain about its raciness.)
|
||||
void validate() const {
|
||||
#ifdef SK_DEBUG
|
||||
SkASSERT(fCapacity > 0);
|
||||
@ -94,7 +96,7 @@ struct SkBufferHead {
|
||||
}
|
||||
}
|
||||
|
||||
void validate(size_t minUsed, SkBufferBlock* tail = nullptr) const {
|
||||
void validate(size_t minUsed, const SkBufferBlock* tail = nullptr) const {
|
||||
#ifdef SK_DEBUG
|
||||
SkASSERT(fRefCnt > 0);
|
||||
size_t totalUsed = 0;
|
||||
@ -118,21 +120,21 @@ struct SkBufferHead {
|
||||
// The reader can only access block.fCapacity (which never changes), and cannot access
|
||||
// block.fUsed, which may be updated by the writer.
|
||||
//
|
||||
SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t available)
|
||||
: fHead(head), fAvailable(available)
|
||||
SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t available, const SkBufferBlock* tail)
|
||||
: fHead(head), fAvailable(available), fTail(tail)
|
||||
{
|
||||
if (head) {
|
||||
fHead->ref();
|
||||
SkASSERT(available > 0);
|
||||
head->validate(available);
|
||||
head->validate(available, tail);
|
||||
} else {
|
||||
SkASSERT(0 == available);
|
||||
SkASSERT(!tail);
|
||||
}
|
||||
}
|
||||
|
||||
SkROBuffer::~SkROBuffer() {
|
||||
if (fHead) {
|
||||
fHead->validate(fAvailable);
|
||||
fHead->unref();
|
||||
}
|
||||
}
|
||||
@ -142,7 +144,8 @@ SkROBuffer::Iter::Iter(const SkROBuffer* buffer) {
|
||||
}
|
||||
|
||||
void SkROBuffer::Iter::reset(const SkROBuffer* buffer) {
|
||||
if (buffer) {
|
||||
fBuffer = buffer;
|
||||
if (buffer && buffer->fHead) {
|
||||
fBlock = &buffer->fHead->fBlock;
|
||||
fRemaining = buffer->fAvailable;
|
||||
} else {
|
||||
@ -165,7 +168,13 @@ size_t SkROBuffer::Iter::size() const {
|
||||
bool SkROBuffer::Iter::next() {
|
||||
if (fRemaining) {
|
||||
fRemaining -= this->size();
|
||||
fBlock = fBlock->fNext;
|
||||
if (fBuffer->fTail == fBlock) {
|
||||
// There are more blocks, but fBuffer does not know about them.
|
||||
SkASSERT(0 == fRemaining);
|
||||
fBlock = nullptr;
|
||||
} else {
|
||||
fBlock = fBlock->fNext;
|
||||
}
|
||||
}
|
||||
return fRemaining != 0;
|
||||
}
|
||||
@ -224,7 +233,9 @@ void SkRWBuffer::validate() const {
|
||||
}
|
||||
#endif
|
||||
|
||||
SkROBuffer* SkRWBuffer::newRBufferSnapshot() const { return new SkROBuffer(fHead, fTotalUsed); }
|
||||
SkROBuffer* SkRWBuffer::newRBufferSnapshot() const {
|
||||
return new SkROBuffer(fHead, fTotalUsed, fTail);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
@ -245,7 +245,7 @@ static void check_abcs(skiatest::Reporter* reporter, const char buffer[], size_t
|
||||
}
|
||||
}
|
||||
|
||||
// stream should contains an integral number of copies of gABC.
|
||||
// stream should contain an integral number of copies of gABC.
|
||||
static void check_alphabet_stream(skiatest::Reporter* reporter, SkStream* stream) {
|
||||
REPORTER_ASSERT(reporter, stream->hasLength());
|
||||
size_t size = stream->getLength();
|
||||
@ -284,6 +284,8 @@ static void check_alphabet_buffer(skiatest::Reporter* reporter, const SkROBuffer
|
||||
check_abcs(reporter, storage.get(), size);
|
||||
}
|
||||
|
||||
#include "SkTaskGroup.h"
|
||||
|
||||
DEF_TEST(RWBuffer, reporter) {
|
||||
// Knowing that the default capacity is 4096, choose N large enough so we force it to use
|
||||
// multiple buffers internally.
|
||||
@ -301,6 +303,7 @@ DEF_TEST(RWBuffer, reporter) {
|
||||
REPORTER_ASSERT(reporter, N*26 == buffer.size());
|
||||
}
|
||||
|
||||
// Verify that although the SkRWBuffer's destructor has run, the readers are still valid.
|
||||
for (int i = 0; i < N; ++i) {
|
||||
REPORTER_ASSERT(reporter, (i + 1) * 26U == readers[i]->size());
|
||||
check_alphabet_buffer(reporter, readers[i]);
|
||||
@ -310,6 +313,35 @@ DEF_TEST(RWBuffer, reporter) {
|
||||
}
|
||||
}
|
||||
|
||||
DEF_TEST(RWBuffer_threaded, reporter) {
|
||||
// Knowing that the default capacity is 4096, choose N large enough so we force it to use
|
||||
// multiple buffers internally.
|
||||
const int N = 1000;
|
||||
SkTaskGroup tasks;
|
||||
SkRWBuffer buffer;
|
||||
for (int i = 0; i < N; ++i) {
|
||||
buffer.append(gABC, 26);
|
||||
sk_sp<SkROBuffer> reader = sk_sp<SkROBuffer>(buffer.newRBufferSnapshot());
|
||||
SkStream* stream = buffer.newStreamSnapshot();
|
||||
REPORTER_ASSERT(reporter, reader->size() == buffer.size());
|
||||
REPORTER_ASSERT(reporter, stream->getLength() == buffer.size());
|
||||
|
||||
// reader's copy constructor will ref the SkROBuffer, which will be unreffed
|
||||
// when the task ends.
|
||||
// Ownership of stream is passed to the task, which will delete it.
|
||||
tasks.add([reporter, i, reader, stream] {
|
||||
REPORTER_ASSERT(reporter, (i + 1) * 26U == reader->size());
|
||||
REPORTER_ASSERT(reporter, stream->getLength() == reader->size());
|
||||
check_alphabet_buffer(reporter, reader.get());
|
||||
check_alphabet_stream(reporter, stream);
|
||||
REPORTER_ASSERT(reporter, stream->rewind());
|
||||
delete stream;
|
||||
});
|
||||
}
|
||||
REPORTER_ASSERT(reporter, N*26 == buffer.size());
|
||||
tasks.wait();
|
||||
}
|
||||
|
||||
// Tests that it is safe to call SkROBuffer::Iter::size() when exhausted.
|
||||
DEF_TEST(RWBuffer_size, r) {
|
||||
SkRWBuffer buffer;
|
||||
@ -325,9 +357,27 @@ DEF_TEST(RWBuffer_size, r) {
|
||||
REPORTER_ASSERT(r, 0 == iter.size());
|
||||
}
|
||||
|
||||
// Tests that it is safe to destruct an SkRWBuffer without appending
|
||||
// anything to it.
|
||||
// Tests that operations (including the destructor) are safe on an SkRWBuffer
|
||||
// without any data appended.
|
||||
DEF_TEST(RWBuffer_noAppend, r) {
|
||||
SkRWBuffer buffer;
|
||||
REPORTER_ASSERT(r, 0 == buffer.size());
|
||||
|
||||
sk_sp<SkROBuffer> roBuffer = sk_sp<SkROBuffer>(buffer.newRBufferSnapshot());
|
||||
REPORTER_ASSERT(r, roBuffer);
|
||||
if (roBuffer) {
|
||||
REPORTER_ASSERT(r, roBuffer->size() == 0);
|
||||
SkROBuffer::Iter iter(roBuffer.get());
|
||||
REPORTER_ASSERT(r, iter.size() == 0);
|
||||
REPORTER_ASSERT(r, !iter.data());
|
||||
REPORTER_ASSERT(r, !iter.next());
|
||||
}
|
||||
|
||||
SkAutoTDelete<SkStream> stream(buffer.newStreamSnapshot());
|
||||
REPORTER_ASSERT(r, stream);
|
||||
if (stream) {
|
||||
REPORTER_ASSERT(r, stream->hasLength());
|
||||
REPORTER_ASSERT(r, stream->getLength() == 0);
|
||||
REPORTER_ASSERT(r, stream->skip(10) == 0);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user