Change SkTaskGroup to use std::function. Ripple all the changes.
BUG=skia:4634 Review URL: https://codereview.chromium.org/1519573003
This commit is contained in:
parent
9fba909792
commit
30da5f7a1e
10
dm/DM.cpp
10
dm/DM.cpp
@ -1108,17 +1108,18 @@ int dm_main() {
|
||||
}
|
||||
|
||||
SkTaskGroup tg;
|
||||
tg.batch(run_test, gThreadedTests.begin(), gThreadedTests.count());
|
||||
tg.batch([](int i){ run_test(&gThreadedTests[i]); }, gThreadedTests.count());
|
||||
for (int i = 0; i < kNumEnclaves; i++) {
|
||||
SkTArray<Task>* currentEnclave = &enclaves[i];
|
||||
switch(i) {
|
||||
case kAnyThread_Enclave:
|
||||
tg.batch(Task::Run, enclaves[i].begin(), enclaves[i].count());
|
||||
tg.batch([currentEnclave](int j) { Task::Run(&(*currentEnclave)[j]); }, currentEnclave->count());
|
||||
break;
|
||||
case kGPU_Enclave:
|
||||
tg.add(run_enclave_and_gpu_tests, &enclaves[i]);
|
||||
tg.add([currentEnclave](){ run_enclave_and_gpu_tests(currentEnclave); });
|
||||
break;
|
||||
default:
|
||||
tg.add(run_enclave, &enclaves[i]);
|
||||
tg.add([currentEnclave](){ run_enclave(currentEnclave); });
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1174,7 +1175,6 @@ void call_test(TestWithGrContextAndGLContext test, skiatest::Reporter* reporter,
|
||||
#endif
|
||||
} // namespace
|
||||
|
||||
|
||||
template<typename T>
|
||||
void RunWithGPUTestContexts(T test, GPUTestContexts testContexts, Reporter* reporter,
|
||||
GrContextFactory* factory) {
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "SkRunnable.h"
|
||||
#include "SkSemaphore.h"
|
||||
#include "SkSpinlock.h"
|
||||
#include "SkTArray.h"
|
||||
#include "SkTDArray.h"
|
||||
#include "SkTaskGroup.h"
|
||||
#include "SkThreadUtils.h"
|
||||
@ -43,23 +44,22 @@ public:
|
||||
if (!gGlobal) { // If we have no threads, run synchronously.
|
||||
return task->run();
|
||||
}
|
||||
gGlobal->add(&CallRunnable, task, pending);
|
||||
gGlobal->add([task]() { task->run(); }, pending);
|
||||
}
|
||||
|
||||
static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
|
||||
static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) {
|
||||
return fn(arg);
|
||||
return fn();
|
||||
}
|
||||
gGlobal->add(fn, arg, pending);
|
||||
gGlobal->add(fn, pending);
|
||||
}
|
||||
|
||||
static void Batch(void (*fn)(void*), void* args, int N, size_t stride,
|
||||
SkAtomic<int32_t>* pending) {
|
||||
static void Batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) {
|
||||
if (!gGlobal) {
|
||||
for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
|
||||
for (int i = 0; i < N; i++) { fn(i); }
|
||||
return;
|
||||
}
|
||||
gGlobal->batch(fn, args, N, stride, pending);
|
||||
gGlobal->batch(fn, N, pending);
|
||||
}
|
||||
|
||||
static void Wait(SkAtomic<int32_t>* pending) {
|
||||
@ -76,16 +76,17 @@ public:
|
||||
// so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
|
||||
// This means fWorkAvailable is only an upper bound on fWork.count().
|
||||
AutoLock lock(&gGlobal->fWorkLock);
|
||||
if (gGlobal->fWork.isEmpty()) {
|
||||
if (gGlobal->fWork.empty()) {
|
||||
// Someone has picked up all the work (including ours). How nice of them!
|
||||
// (They may still be working on it, so we can't assert *pending == 0 here.)
|
||||
continue;
|
||||
}
|
||||
gGlobal->fWork.pop(&work);
|
||||
work = gGlobal->fWork.back();
|
||||
gGlobal->fWork.pop_back();
|
||||
}
|
||||
// This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
|
||||
// We threads gotta stick together. We're always making forward progress.
|
||||
work.fn(work.arg);
|
||||
work.fn();
|
||||
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
|
||||
}
|
||||
}
|
||||
@ -101,8 +102,7 @@ private:
|
||||
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
|
||||
|
||||
struct Work {
|
||||
void (*fn)(void*); // A function to call,
|
||||
void* arg; // its argument,
|
||||
std::function<void(void)> fn; // A function to call
|
||||
SkAtomic<int32_t>* pending; // then decrement pending afterwards.
|
||||
};
|
||||
|
||||
@ -117,39 +117,38 @@ private:
|
||||
}
|
||||
|
||||
~ThreadPool() {
|
||||
SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
|
||||
SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now.
|
||||
|
||||
// Send a poison pill to each thread.
|
||||
SkAtomic<int> dummy(0);
|
||||
for (int i = 0; i < fThreads.count(); i++) {
|
||||
this->add(nullptr, nullptr, &dummy);
|
||||
this->add(nullptr, &dummy);
|
||||
}
|
||||
// Wait for them all to swallow the pill and die.
|
||||
for (int i = 0; i < fThreads.count(); i++) {
|
||||
fThreads[i]->join();
|
||||
}
|
||||
SkASSERT(fWork.isEmpty()); // Can't hurt to double check.
|
||||
SkASSERT(fWork.empty()); // Can't hurt to double check.
|
||||
fThreads.deleteAll();
|
||||
}
|
||||
|
||||
void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
|
||||
Work work = { fn, arg, pending };
|
||||
void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
|
||||
Work work = { fn, pending };
|
||||
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
|
||||
{
|
||||
AutoLock lock(&fWorkLock);
|
||||
fWork.push(work);
|
||||
fWork.push_back(work);
|
||||
}
|
||||
fWorkAvailable.signal(1);
|
||||
}
|
||||
|
||||
void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) {
|
||||
void batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) {
|
||||
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
|
||||
{
|
||||
AutoLock lock(&fWorkLock);
|
||||
Work* batch = fWork.append(N);
|
||||
for (int i = 0; i < N; i++) {
|
||||
Work work = { fn, (char*)arg + i*stride, pending };
|
||||
batch[i] = work;
|
||||
Work work = { [i, fn]() { fn(i); }, pending };
|
||||
fWork.push_back(work);
|
||||
}
|
||||
}
|
||||
fWorkAvailable.signal(N);
|
||||
@ -163,24 +162,25 @@ private:
|
||||
pool->fWorkAvailable.wait();
|
||||
{
|
||||
AutoLock lock(&pool->fWorkLock);
|
||||
if (pool->fWork.isEmpty()) {
|
||||
if (pool->fWork.empty()) {
|
||||
// Someone in Wait() stole our work (fWorkAvailable is an upper bound).
|
||||
// Well, that's fine, back to sleep for us.
|
||||
continue;
|
||||
}
|
||||
pool->fWork.pop(&work);
|
||||
work = pool->fWork.back();
|
||||
pool->fWork.pop_back();
|
||||
}
|
||||
if (!work.fn) {
|
||||
return; // Poison pill. Time... to die.
|
||||
}
|
||||
work.fn(work.arg);
|
||||
work.fn();
|
||||
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
|
||||
}
|
||||
}
|
||||
|
||||
// fWorkLock must be held when reading or modifying fWork.
|
||||
SkSpinlock fWorkLock;
|
||||
SkTDArray<Work> fWork;
|
||||
SkTArray<Work> fWork;
|
||||
|
||||
// A thread-safe upper bound for fWork.count().
|
||||
//
|
||||
@ -215,9 +215,9 @@ SkTaskGroup::SkTaskGroup() : fPending(0) {}
|
||||
|
||||
void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); }
|
||||
void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); }
|
||||
void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
|
||||
void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
|
||||
ThreadPool::Batch(fn, args, N, stride, &fPending);
|
||||
void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); }
|
||||
void SkTaskGroup::batch (std::function<void(int)> fn, int N) {
|
||||
ThreadPool::Batch(fn, N, &fPending);
|
||||
}
|
||||
|
||||
int sk_parallel_for_thread_count() {
|
||||
|
@ -8,6 +8,8 @@
|
||||
#ifndef SkTaskGroup_DEFINED
|
||||
#define SkTaskGroup_DEFINED
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "SkTypes.h"
|
||||
#include "SkAtomics.h"
|
||||
#include "SkTemplates.h"
|
||||
@ -29,24 +31,16 @@ public:
|
||||
// Neither add() method takes owership of any of its parameters.
|
||||
void add(SkRunnable*);
|
||||
|
||||
template <typename T>
|
||||
void add(void (*fn)(T*), T* arg) { this->add((void_fn)fn, (void*)arg); }
|
||||
void add(std::function<void(void)> fn);
|
||||
|
||||
// Add a batch of N tasks, all calling fn with different arguments.
|
||||
// Equivalent to a loop over add(fn, arg), but with perhaps less synchronization overhead.
|
||||
template <typename T>
|
||||
void batch(void (*fn)(T*), T* args, int N) { this->batch((void_fn)fn, args, N, sizeof(T)); }
|
||||
void batch(std::function<void(int)> fn, int N);
|
||||
|
||||
// Block until all Tasks previously add()ed to this SkTaskGroup have run.
|
||||
// You may safely reuse this SkTaskGroup after wait() returns.
|
||||
void wait();
|
||||
|
||||
private:
|
||||
typedef void(*void_fn)(void*);
|
||||
|
||||
void add (void_fn, void* arg);
|
||||
void batch(void_fn, void* args, int N, size_t stride);
|
||||
|
||||
SkAtomic<int32_t> fPending;
|
||||
};
|
||||
|
||||
@ -81,18 +75,20 @@ void sk_parallel_for(int end, const Func& f) {
|
||||
|
||||
for (int i = 0; i < nchunks; i++) {
|
||||
Chunk& c = chunks[i];
|
||||
c.f = &f;
|
||||
c.start = i * stride;
|
||||
c.end = SkTMin(c.start + stride, end);
|
||||
c.f = &f;
|
||||
c.start = i * stride;
|
||||
c.end = SkTMin(c.start + stride, end);
|
||||
SkASSERT(c.start < c.end); // Nothing will break if start >= end, but it's a wasted chunk.
|
||||
}
|
||||
|
||||
void(*run_chunk)(Chunk*) = [](Chunk* c) {
|
||||
for (int i = c->start; i < c->end; i++) {
|
||||
(*c->f)(i);
|
||||
Chunk* chunkBase = chunks.get();
|
||||
auto run_chunk = [chunkBase](int i) {
|
||||
Chunk& c = chunkBase[i];
|
||||
for (int i = c.start; i < c.end; i++) {
|
||||
(*c.f)(i);
|
||||
}
|
||||
};
|
||||
SkTaskGroup().batch(run_chunk, chunks.get(), nchunks);
|
||||
SkTaskGroup().batch(run_chunk, nchunks);
|
||||
}
|
||||
|
||||
#endif//SkTaskGroup_DEFINED
|
||||
|
Loading…
Reference in New Issue
Block a user