Add debug mode to shared mutex.

Review URL: https://codereview.chromium.org/1307863009
This commit is contained in:
herb 2015-09-18 07:00:48 -07:00 committed by Commit bot
parent b1c56ec01e
commit 966e3d30ba
3 changed files with 294 additions and 130 deletions

View File

@ -106,6 +106,7 @@ inline void operator delete(void* p) {
#ifdef SK_DEBUG
#define SkASSERT(cond) SK_ALWAYSBREAK(cond)
#define SkDEBUGFAIL(message) SkASSERT(false && message)
#define SkDEBUGFAILF(fmt, ...) SkASSERTF(false, fmt, ##__VA_ARGS__)
#define SkDEBUGCODE(code) code
#define SkDECLAREPARAM(type, var) , type var
#define SkPARAM(var) , var

View File

@ -66,144 +66,293 @@ void AnnotateRWLockReleased(
#endif
// The fQueueCounts fields holds many counts in an int32_t in order to make managing them atomic.
// These three counts must be the same size, so each gets 10 bits. The 10 bits represent
// the log of the count which is 1024.
//
// The three counts held in fQueueCounts are:
// * Shared - the number of shared lock holders currently running.
// * WaitingExclusive - the number of threads waiting for an exclusive lock.
// * WaitingShared - the number of threads waiting to run while waiting for an exclusive thread
// to finish.
static const int kLogThreadCount = 10;
#ifdef SK_DEBUG
enum {
kSharedOffset = (0 * kLogThreadCount),
kWaitingExlusiveOffset = (1 * kLogThreadCount),
kWaitingSharedOffset = (2 * kLogThreadCount),
kSharedMask = ((1 << kLogThreadCount) - 1) << kSharedOffset,
kWaitingExclusiveMask = ((1 << kLogThreadCount) - 1) << kWaitingExlusiveOffset,
kWaitingSharedMask = ((1 << kLogThreadCount) - 1) << kWaitingSharedOffset,
};
#include "SkTDArray.h"
#ifdef SK_BUILD_FOR_WIN
#include <windows.h>
static int64_t get_thread_id() { return GetCurrentThreadId(); }
#else
#include <pthread.h>
static int64_t get_thread_id() { return (int64_t)pthread_self(); }
#endif
SkSharedMutex::SkSharedMutex() : fQueueCounts(0) { ANNOTATE_RWLOCK_CREATE(this); }
SkSharedMutex::~SkSharedMutex() { ANNOTATE_RWLOCK_DESTROY(this); }
void SkSharedMutex::acquire() {
// Increment the count of exclusive queue waiters.
int32_t oldQueueCounts = fQueueCounts.fetch_add(1 << kWaitingExlusiveOffset,
sk_memory_order_acquire);
typedef int64_t ThreadID;
// If there are no other exclusive waiters and no shared threads are running then run
// else wait.
if ((oldQueueCounts & kWaitingExclusiveMask) > 0 || (oldQueueCounts & kSharedMask) > 0) {
fExclusiveQueue.wait();
class SkSharedMutex::ThreadIDSet {
public:
// Returns true if threadID is in the set.
bool find(ThreadID threadID) const {
for (auto& t : fThreadIDs) {
if (t == threadID) return true;
}
return false;
}
// Returns true if did not already exist.
bool tryAdd(ThreadID threadID) {
for (auto& t : fThreadIDs) {
if (t == threadID) return false;
}
fThreadIDs.append(1, &threadID);
return true;
}
// Returns true if already exists in Set.
bool tryRemove(ThreadID threadID) {
for (int i = 0; i < fThreadIDs.count(); ++i) {
if (fThreadIDs[i] == threadID) {
fThreadIDs.remove(i);
return true;
}
}
return false;
}
void swap(ThreadIDSet& other) {
fThreadIDs.swap(other.fThreadIDs);
}
int count() const {
return fThreadIDs.count();
}
private:
SkTDArray<ThreadID> fThreadIDs;
};
SkSharedMutex::SkSharedMutex()
: fCurrentShared(new ThreadIDSet)
, fWaitingExclusive(new ThreadIDSet)
, fWaitingShared(new ThreadIDSet){
ANNOTATE_RWLOCK_CREATE(this);
}
ANNOTATE_RWLOCK_ACQUIRED(this, 1);
}
void SkSharedMutex::release() {
ANNOTATE_RWLOCK_RELEASED(this, 1);
SkSharedMutex::~SkSharedMutex() { ANNOTATE_RWLOCK_DESTROY(this); }
int32_t oldQueueCounts = fQueueCounts.load(sk_memory_order_relaxed);
int32_t waitingShared;
int32_t newQueueCounts;
do {
newQueueCounts = oldQueueCounts;
void SkSharedMutex::acquire() {
ThreadID threadID(get_thread_id());
int currentSharedCount;
int waitingExclusiveCount;
{
SkAutoMutexAcquire l(&fMu);
// Decrement exclusive waiters.
newQueueCounts -= 1 << kWaitingExlusiveOffset;
if (!fWaitingExclusive->tryAdd(threadID)) {
SkDEBUGFAILF("Thread %lx already has an exclusive lock\n", threadID);
}
// The number of threads waiting to acquire a shared lock.
waitingShared = (oldQueueCounts & kWaitingSharedMask) >> kWaitingSharedOffset;
currentSharedCount = fCurrentShared->count();
waitingExclusiveCount = fWaitingExclusive->count();
}
if (currentSharedCount > 0 || waitingExclusiveCount > 1) {
fExclusiveQueue.wait();
}
ANNOTATE_RWLOCK_ACQUIRED(this, 1);
}
// Implementation Detail:
// The shared threads need two seperate queues to keep the threads that were added after the
// exclusive lock separate from the threads added before.
void SkSharedMutex::release() {
ANNOTATE_RWLOCK_RELEASED(this, 1);
ThreadID threadID(get_thread_id());
int sharedWaitingCount;
int exclusiveWaitingCount;
int sharedQueueSelect;
{
SkAutoMutexAcquire l(&fMu);
SkASSERT(0 == fCurrentShared->count());
if (!fWaitingExclusive->tryRemove(threadID)) {
SkDEBUGFAILF("Thread %lx did not have the lock held.\n", threadID);
}
exclusiveWaitingCount = fWaitingExclusive->count();
sharedWaitingCount = fWaitingShared->count();
fWaitingShared.swap(fCurrentShared);
sharedQueueSelect = fSharedQueueSelect;
if (sharedWaitingCount > 0) {
fSharedQueueSelect = 1 - fSharedQueueSelect;
}
}
if (sharedWaitingCount > 0) {
fSharedQueue[sharedQueueSelect].signal(sharedWaitingCount);
} else if (exclusiveWaitingCount > 0) {
fExclusiveQueue.signal();
}
}
void SkSharedMutex::assertHeld() const {
ThreadID threadID(get_thread_id());
SkAutoMutexAcquire l(&fMu);
SkASSERT(0 == fCurrentShared->count());
SkASSERT(fWaitingExclusive->find(threadID));
}
void SkSharedMutex::acquireShared() {
ThreadID threadID(get_thread_id());
int exclusiveWaitingCount;
int sharedQueueSelect;
{
SkAutoMutexAcquire l(&fMu);
exclusiveWaitingCount = fWaitingExclusive->count();
if (exclusiveWaitingCount > 0) {
if (!fWaitingShared->tryAdd(threadID)) {
SkDEBUGFAILF("Thread %lx was already waiting!\n", threadID);
}
} else {
if (!fCurrentShared->tryAdd(threadID)) {
SkDEBUGFAILF("Thread %lx already holds a shared lock!\n", threadID);
}
}
sharedQueueSelect = fSharedQueueSelect;
}
if (exclusiveWaitingCount > 0) {
fSharedQueue[sharedQueueSelect].wait();
}
ANNOTATE_RWLOCK_ACQUIRED(this, 0);
}
void SkSharedMutex::releaseShared() {
ANNOTATE_RWLOCK_RELEASED(this, 0);
ThreadID threadID(get_thread_id());
int currentSharedCount;
int waitingExclusiveCount;
{
SkAutoMutexAcquire l(&fMu);
if (!fCurrentShared->tryRemove(threadID)) {
SkDEBUGFAILF("Thread %lx does not hold a shared lock.\n", threadID);
}
currentSharedCount = fCurrentShared->count();
waitingExclusiveCount = fWaitingExclusive->count();
}
if (0 == currentSharedCount && waitingExclusiveCount > 0) {
fExclusiveQueue.signal();
}
}
void SkSharedMutex::assertHeldShared() const {
ThreadID threadID(get_thread_id());
SkAutoMutexAcquire l(&fMu);
SkASSERT(fCurrentShared->find(threadID));
}
#else
// The fQueueCounts fields holds many counts in an int32_t in order to make managing them atomic.
// These three counts must be the same size, so each gets 10 bits. The 10 bits represent
// the log of the count which is 1024.
//
// The three counts held in fQueueCounts are:
// * Shared - the number of shared lock holders currently running.
// * WaitingExclusive - the number of threads waiting for an exclusive lock.
// * WaitingShared - the number of threads waiting to run while waiting for an exclusive thread
// to finish.
static const int kLogThreadCount = 10;
enum {
kSharedOffset = (0 * kLogThreadCount),
kWaitingExlusiveOffset = (1 * kLogThreadCount),
kWaitingSharedOffset = (2 * kLogThreadCount),
kSharedMask = ((1 << kLogThreadCount) - 1) << kSharedOffset,
kWaitingExclusiveMask = ((1 << kLogThreadCount) - 1) << kWaitingExlusiveOffset,
kWaitingSharedMask = ((1 << kLogThreadCount) - 1) << kWaitingSharedOffset,
};
SkSharedMutex::SkSharedMutex() : fQueueCounts(0) { ANNOTATE_RWLOCK_CREATE(this); }
SkSharedMutex::~SkSharedMutex() { ANNOTATE_RWLOCK_DESTROY(this); }
void SkSharedMutex::acquire() {
// Increment the count of exclusive queue waiters.
int32_t oldQueueCounts = fQueueCounts.fetch_add(1 << kWaitingExlusiveOffset,
sk_memory_order_acquire);
// If there are no other exclusive waiters and no shared threads are running then run
// else wait.
if ((oldQueueCounts & kWaitingExclusiveMask) > 0 || (oldQueueCounts & kSharedMask) > 0) {
fExclusiveQueue.wait();
}
ANNOTATE_RWLOCK_ACQUIRED(this, 1);
}
void SkSharedMutex::release() {
ANNOTATE_RWLOCK_RELEASED(this, 1);
int32_t oldQueueCounts = fQueueCounts.load(sk_memory_order_relaxed);
int32_t waitingShared;
int32_t newQueueCounts;
do {
newQueueCounts = oldQueueCounts;
// Decrement exclusive waiters.
newQueueCounts -= 1 << kWaitingExlusiveOffset;
// The number of threads waiting to acquire a shared lock.
waitingShared = (oldQueueCounts & kWaitingSharedMask) >> kWaitingSharedOffset;
// If there are any move the counts of all the shared waiters to actual shared. They are
// going to run next.
if (waitingShared > 0) {
// Set waiting shared to zero.
newQueueCounts &= ~kWaitingSharedMask;
// Because this is the exclusive release, then there are zero readers. So, the bits
// for shared locks should be zero. Since those bits are zero, we can just |= in the
// waitingShared count instead of clearing with an &= and then |= the count.
newQueueCounts |= waitingShared << kSharedOffset;
}
} while (!fQueueCounts.compare_exchange(&oldQueueCounts, newQueueCounts,
sk_memory_order_release, sk_memory_order_relaxed));
// If there are any move the counts of all the shared waiters to actual shared. They are
// going to run next.
if (waitingShared > 0) {
// Set waiting shared to zero.
newQueueCounts &= ~kWaitingSharedMask;
// Because this is the exclusive release, then there are zero readers. So, the bits
// for shared locks should be zero. Since those bits are zero, we can just |= in the
// waitingShared count instead of clearing with an &= and then |= the count.
newQueueCounts |= waitingShared << kSharedOffset;
// Run all the shared.
fSharedQueue.signal(waitingShared);
} else if ((newQueueCounts & kWaitingExclusiveMask) > 0) {
// Run a single exclusive waiter.
fExclusiveQueue.signal();
}
} while (!fQueueCounts.compare_exchange(&oldQueueCounts, newQueueCounts,
sk_memory_order_release, sk_memory_order_relaxed));
if (waitingShared > 0) {
// Run all the shared.
fSharedQueue.signal(waitingShared);
} else if ((newQueueCounts & kWaitingExclusiveMask) > 0) {
// Run a single exclusive waiter.
fExclusiveQueue.signal();
}
}
#ifdef SK_DEBUG
void SkSharedMutex::assertHeld() const {
int32_t queueCounts = fQueueCounts.load(sk_memory_order_relaxed);
// These are very loose asserts about the mutex being held exclusively.
SkASSERTF(0 == (queueCounts & kSharedMask),
"running shared: %d, exclusive: %d, waiting shared: %d",
(queueCounts & kSharedMask) >> kSharedOffset,
(queueCounts & kWaitingExclusiveMask) >> kWaitingExlusiveOffset,
(queueCounts & kWaitingSharedMask) >> kWaitingSharedOffset);
SkASSERTF((queueCounts & kWaitingExclusiveMask) > 0,
"running shared: %d, exclusive: %d, waiting shared: %d",
(queueCounts & kSharedMask) >> kSharedOffset,
(queueCounts & kWaitingExclusiveMask) >> kWaitingExlusiveOffset,
(queueCounts & kWaitingSharedMask) >> kWaitingSharedOffset);
}
#endif
void SkSharedMutex::acquireShared() {
int32_t oldQueueCounts = fQueueCounts.load(sk_memory_order_relaxed);
int32_t newQueueCounts;
do {
newQueueCounts = oldQueueCounts;
// If there are waiting exclusives then this shared lock waits else it runs.
if ((newQueueCounts & kWaitingExclusiveMask) > 0) {
newQueueCounts += 1 << kWaitingSharedOffset;
} else {
newQueueCounts += 1 << kSharedOffset;
}
} while (!fQueueCounts.compare_exchange(&oldQueueCounts, newQueueCounts,
sk_memory_order_acquire, sk_memory_order_relaxed));
void SkSharedMutex::acquireShared() {
int32_t oldQueueCounts = fQueueCounts.load(sk_memory_order_relaxed);
int32_t newQueueCounts;
do {
newQueueCounts = oldQueueCounts;
// If there are waiting exclusives then this shared lock waits else it runs.
// If there are waiting exclusives, then this shared waits until after it runs.
if ((newQueueCounts & kWaitingExclusiveMask) > 0) {
newQueueCounts += 1 << kWaitingSharedOffset;
} else {
newQueueCounts += 1 << kSharedOffset;
fSharedQueue.wait();
}
} while (!fQueueCounts.compare_exchange(&oldQueueCounts, newQueueCounts,
sk_memory_order_acquire, sk_memory_order_relaxed));
ANNOTATE_RWLOCK_ACQUIRED(this, 0);
// If there are waiting exclusives, then this shared waits until after it runs.
if ((newQueueCounts & kWaitingExclusiveMask) > 0) {
fSharedQueue.wait();
}
ANNOTATE_RWLOCK_ACQUIRED(this, 0);
}
void SkSharedMutex::releaseShared() {
ANNOTATE_RWLOCK_RELEASED(this, 0);
void SkSharedMutex::releaseShared() {
ANNOTATE_RWLOCK_RELEASED(this, 0);
// Decrement the shared count.
int32_t oldQueueCounts = fQueueCounts.fetch_sub(1 << kSharedOffset,
sk_memory_order_release);
// Decrement the shared count.
int32_t oldQueueCounts = fQueueCounts.fetch_sub(1 << kSharedOffset,
sk_memory_order_release);
// If shared count is going to zero (because the old count == 1) and there are exclusive
// waiters, then run a single exclusive waiter.
if (((oldQueueCounts & kSharedMask) >> kSharedOffset) == 1
&& (oldQueueCounts & kWaitingExclusiveMask) > 0) {
fExclusiveQueue.signal();
// If shared count is going to zero (because the old count == 1) and there are exclusive
// waiters, then run a single exclusive waiter.
if (((oldQueueCounts & kSharedMask) >> kSharedOffset) == 1
&& (oldQueueCounts & kWaitingExclusiveMask) > 0) {
fExclusiveQueue.signal();
}
}
}
#ifdef SK_DEBUG
void SkSharedMutex::assertHeldShared() const {
int32_t queueCounts = fQueueCounts.load(sk_memory_order_relaxed);
// A very loose assert about the mutex being shared.
SkASSERTF((queueCounts & kSharedMask) > 0,
"running shared: %d, exclusive: %d, waiting shared: %d",
(queueCounts & kSharedMask) >> kSharedOffset,
(queueCounts & kWaitingExclusiveMask) >> kWaitingExlusiveOffset,
(queueCounts & kWaitingSharedMask) >> kWaitingSharedOffset);
}
#endif

View File

@ -11,9 +11,16 @@
#include "SkAtomics.h"
#include "SkSemaphore.h"
#include "SkTypes.h"
// This is a shared lock implementation similar to pthreads rwlocks. This implementation is
// cribbed from Preshing's article:
#ifdef SK_DEBUG
#include "SkMutex.h"
#include "../private/SkUniquePtr.h"
#endif // SK_DEBUG
// There are two shared lock implementations one debug the other is high performance. They implement
// an interface similar to pthread's rwlocks.
// This is a shared lock implementation similar to pthreads rwlocks. The high performance
// implementation is cribbed from Preshing's article:
// http://preshing.com/20150316/semaphores-are-surprisingly-versatile/
//
// This lock does not obey strict queue ordering. It will always alternate between readers and
@ -29,11 +36,7 @@ public:
void release();
// Fail if exclusive is not held.
#ifdef SK_DEBUG
void assertHeld() const;
#else
void assertHeld() const {}
#endif
// Acquire lock for shared use.
void acquireShared();
@ -42,17 +45,28 @@ public:
void releaseShared();
// Fail if shared lock not held.
#ifdef SK_DEBUG
void assertHeldShared() const;
#else
void assertHeldShared() const {}
#endif
private:
SkAtomic<int32_t> fQueueCounts;
SkSemaphore fSharedQueue;
#ifdef SK_DEBUG
class ThreadIDSet;
skstd::unique_ptr<ThreadIDSet> fCurrentShared;
skstd::unique_ptr<ThreadIDSet> fWaitingExclusive;
skstd::unique_ptr<ThreadIDSet> fWaitingShared;
int fSharedQueueSelect{0};
mutable SkMutex fMu;
SkSemaphore fSharedQueue[2];
SkSemaphore fExclusiveQueue;
#else
SkAtomic<int32_t> fQueueCounts;
SkSemaphore fSharedQueue;
SkSemaphore fExclusiveQueue;
#endif // SK_DEBUG
};
#ifndef SK_DEBUG
inline void SkSharedMutex::assertHeld() const {};
inline void SkSharedMutex::assertHeldShared() const {};
#endif // SK_DEBUG
#endif // SkSharedLock_DEFINED