Modernize atomics in SkTaskGroup's threadpool.
- Use SkAtomic<int32_t> for pending work count so we're statically forced to operate on it with atomic methods. - Replacing old methods like sk_atomic_inc/dec gives us finer control over which barriers we need for each operation. No public API changes. TBR=reed@google.com BUG=skia: Review URL: https://codereview.chromium.org/1193493003
This commit is contained in:
parent
2334fb655f
commit
942e99b9c4
@ -43,6 +43,7 @@ template <typename T>
|
||||
class SkAtomic : SkNoncopyable {
|
||||
public:
|
||||
SkAtomic() {}
|
||||
explicit SkAtomic(const T& val) : fVal(val) {}
|
||||
|
||||
// It is essential we return by value rather than by const&. fVal may change at any time.
|
||||
T load(sk_memory_order mo = sk_memory_order_seq_cst) const {
|
||||
@ -53,6 +54,10 @@ public:
|
||||
sk_atomic_store(&fVal, val, mo);
|
||||
}
|
||||
|
||||
T fetch_add(const T& val, sk_memory_order mo = sk_memory_order_seq_cst) {
|
||||
return sk_atomic_fetch_add(&fVal, val, mo);
|
||||
}
|
||||
|
||||
bool compare_exchange(T* expected, const T& desired,
|
||||
sk_memory_order success = sk_memory_order_seq_cst,
|
||||
sk_memory_order failure = sk_memory_order_seq_cst) {
|
||||
|
@ -1,3 +1,10 @@
|
||||
/*
|
||||
* Copyright 2014 Google Inc.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license that can be
|
||||
* found in the LICENSE file.
|
||||
*/
|
||||
|
||||
#include "SkTaskGroup.h"
|
||||
|
||||
#include "SkCondVar.h"
|
||||
@ -23,21 +30,22 @@ namespace {
|
||||
|
||||
class ThreadPool : SkNoncopyable {
|
||||
public:
|
||||
static void Add(SkRunnable* task, int32_t* pending) {
|
||||
static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) { // If we have no threads, run synchronously.
|
||||
return task->run();
|
||||
}
|
||||
gGlobal->add(&CallRunnable, task, pending);
|
||||
}
|
||||
|
||||
static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
|
||||
static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) {
|
||||
return fn(arg);
|
||||
}
|
||||
gGlobal->add(fn, arg, pending);
|
||||
}
|
||||
|
||||
static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
|
||||
static void Batch(void (*fn)(void*), void* args, int N, size_t stride,
|
||||
SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) {
|
||||
for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
|
||||
return;
|
||||
@ -45,12 +53,13 @@ public:
|
||||
gGlobal->batch(fn, args, N, stride, pending);
|
||||
}
|
||||
|
||||
static void Wait(int32_t* pending) {
|
||||
static void Wait(SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) { // If we have no threads, the work must already be done.
|
||||
SkASSERT(*pending == 0);
|
||||
SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
|
||||
return;
|
||||
}
|
||||
while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop.
|
||||
// Acquire pairs with decrement release here or in Loop.
|
||||
while (pending->load(sk_memory_order_acquire) > 0) {
|
||||
// Lend a hand until our SkTaskGroup of interest is done.
|
||||
Work work;
|
||||
{
|
||||
@ -65,7 +74,7 @@ public:
|
||||
// This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
|
||||
// We threads gotta stick together. We're always making forward progress.
|
||||
work.fn(work.arg);
|
||||
sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above.
|
||||
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,9 +89,9 @@ private:
|
||||
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
|
||||
|
||||
struct Work {
|
||||
void (*fn)(void*); // A function to call,
|
||||
void* arg; // its argument,
|
||||
int32_t* pending; // then sk_atomic_dec(pending) afterwards.
|
||||
void (*fn)(void*); // A function to call,
|
||||
void* arg; // its argument,
|
||||
SkAtomic<int32_t>* pending; // then decrement pending afterwards.
|
||||
};
|
||||
|
||||
explicit ThreadPool(int threads) : fDraining(false) {
|
||||
@ -109,9 +118,9 @@ private:
|
||||
fThreads.deleteAll();
|
||||
}
|
||||
|
||||
void add(void (*fn)(void*), void* arg, int32_t* pending) {
|
||||
void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
|
||||
Work work = { fn, arg, pending };
|
||||
sk_atomic_inc(pending); // No barrier needed.
|
||||
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
|
||||
{
|
||||
AutoLock lock(&fReady);
|
||||
fWork.push(work);
|
||||
@ -119,8 +128,8 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
|
||||
sk_atomic_add(pending, N); // No barrier needed.
|
||||
void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) {
|
||||
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
|
||||
{
|
||||
AutoLock lock(&fReady);
|
||||
Work* batch = fWork.append(N);
|
||||
@ -147,7 +156,7 @@ private:
|
||||
pool->fWork.pop(&work);
|
||||
}
|
||||
work.fn(work.arg);
|
||||
sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait().
|
||||
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#define SkTaskGroup_DEFINED
|
||||
|
||||
#include "SkTypes.h"
|
||||
#include "SkAtomics.h"
|
||||
|
||||
struct SkRunnable;
|
||||
|
||||
@ -45,7 +46,7 @@ private:
|
||||
void add (void_fn, void* arg);
|
||||
void batch(void_fn, void* args, int N, size_t stride);
|
||||
|
||||
/*atomic*/ int32_t fPending;
|
||||
SkAtomic<int32_t> fPending;
|
||||
};
|
||||
|
||||
#endif//SkTaskGroup_DEFINED
|
||||
|
Loading…
Reference in New Issue
Block a user