diff --git a/dm/DM.cpp b/dm/DM.cpp index 7903c54a21..2862355086 100644 --- a/dm/DM.cpp +++ b/dm/DM.cpp @@ -1108,17 +1108,18 @@ int dm_main() { } SkTaskGroup tg; - tg.batch(run_test, gThreadedTests.begin(), gThreadedTests.count()); + tg.batch([](int i){ run_test(&gThreadedTests[i]); }, gThreadedTests.count()); for (int i = 0; i < kNumEnclaves; i++) { + SkTArray* currentEnclave = &enclaves[i]; switch(i) { case kAnyThread_Enclave: - tg.batch(Task::Run, enclaves[i].begin(), enclaves[i].count()); + tg.batch([currentEnclave](int j) { Task::Run(&(*currentEnclave)[j]); }, currentEnclave->count()); break; case kGPU_Enclave: - tg.add(run_enclave_and_gpu_tests, &enclaves[i]); + tg.add([currentEnclave](){ run_enclave_and_gpu_tests(currentEnclave); }); break; default: - tg.add(run_enclave, &enclaves[i]); + tg.add([currentEnclave](){ run_enclave(currentEnclave); }); break; } } @@ -1174,7 +1175,6 @@ void call_test(TestWithGrContextAndGLContext test, skiatest::Reporter* reporter, #endif } // namespace - template void RunWithGPUTestContexts(T test, GPUTestContexts testContexts, Reporter* reporter, GrContextFactory* factory) { diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp index 863195cfd3..e6b8532bb0 100644 --- a/src/core/SkTaskGroup.cpp +++ b/src/core/SkTaskGroup.cpp @@ -9,6 +9,7 @@ #include "SkRunnable.h" #include "SkSemaphore.h" #include "SkSpinlock.h" +#include "SkTArray.h" #include "SkTDArray.h" #include "SkTaskGroup.h" #include "SkThreadUtils.h" @@ -43,23 +44,22 @@ public: if (!gGlobal) { // If we have no threads, run synchronously. return task->run(); } - gGlobal->add(&CallRunnable, task, pending); + gGlobal->add([task]() { task->run(); }, pending); } - static void Add(void (*fn)(void*), void* arg, SkAtomic* pending) { + static void Add(std::function fn, SkAtomic* pending) { if (!gGlobal) { - return fn(arg); + return fn(); } - gGlobal->add(fn, arg, pending); + gGlobal->add(fn, pending); } - static void Batch(void (*fn)(void*), void* args, int N, size_t stride, - SkAtomic* pending) { + static void Batch(std::function fn, int N, SkAtomic* pending) { if (!gGlobal) { - for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } + for (int i = 0; i < N; i++) { fn(i); } return; } - gGlobal->batch(fn, args, N, stride, pending); + gGlobal->batch(fn, N, pending); } static void Wait(SkAtomic* pending) { @@ -76,16 +76,17 @@ public: // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. // This means fWorkAvailable is only an upper bound on fWork.count(). AutoLock lock(&gGlobal->fWorkLock); - if (gGlobal->fWork.isEmpty()) { + if (gGlobal->fWork.empty()) { // Someone has picked up all the work (including ours). How nice of them! // (They may still be working on it, so we can't assert *pending == 0 here.) continue; } - gGlobal->fWork.pop(&work); + work = gGlobal->fWork.back(); + gGlobal->fWork.pop_back(); } // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. // We threads gotta stick together. We're always making forward progress. - work.fn(work.arg); + work.fn(); work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. } } @@ -101,8 +102,7 @@ private: static void CallRunnable(void* arg) { static_cast(arg)->run(); } struct Work { - void (*fn)(void*); // A function to call, - void* arg; // its argument, + std::function fn; // A function to call SkAtomic* pending; // then decrement pending afterwards. }; @@ -117,39 +117,38 @@ private: } ~ThreadPool() { - SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. + SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now. // Send a poison pill to each thread. SkAtomic dummy(0); for (int i = 0; i < fThreads.count(); i++) { - this->add(nullptr, nullptr, &dummy); + this->add(nullptr, &dummy); } // Wait for them all to swallow the pill and die. for (int i = 0; i < fThreads.count(); i++) { fThreads[i]->join(); } - SkASSERT(fWork.isEmpty()); // Can't hurt to double check. + SkASSERT(fWork.empty()); // Can't hurt to double check. fThreads.deleteAll(); } - void add(void (*fn)(void*), void* arg, SkAtomic* pending) { - Work work = { fn, arg, pending }; + void add(std::function fn, SkAtomic* pending) { + Work work = { fn, pending }; pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. { AutoLock lock(&fWorkLock); - fWork.push(work); + fWork.push_back(work); } fWorkAvailable.signal(1); } - void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic* pending) { + void batch(std::function fn, int N, SkAtomic* pending) { pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. { AutoLock lock(&fWorkLock); - Work* batch = fWork.append(N); for (int i = 0; i < N; i++) { - Work work = { fn, (char*)arg + i*stride, pending }; - batch[i] = work; + Work work = { [i, fn]() { fn(i); }, pending }; + fWork.push_back(work); } } fWorkAvailable.signal(N); @@ -163,24 +162,25 @@ private: pool->fWorkAvailable.wait(); { AutoLock lock(&pool->fWorkLock); - if (pool->fWork.isEmpty()) { + if (pool->fWork.empty()) { // Someone in Wait() stole our work (fWorkAvailable is an upper bound). // Well, that's fine, back to sleep for us. continue; } - pool->fWork.pop(&work); + work = pool->fWork.back(); + pool->fWork.pop_back(); } if (!work.fn) { return; // Poison pill. Time... to die. } - work.fn(work.arg); + work.fn(); work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). } } // fWorkLock must be held when reading or modifying fWork. SkSpinlock fWorkLock; - SkTDArray fWork; + SkTArray fWork; // A thread-safe upper bound for fWork.count(). // @@ -215,9 +215,9 @@ SkTaskGroup::SkTaskGroup() : fPending(0) {} void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } -void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); } -void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { - ThreadPool::Batch(fn, args, N, stride, &fPending); +void SkTaskGroup::add(std::function fn) { ThreadPool::Add(fn, &fPending); } +void SkTaskGroup::batch (std::function fn, int N) { + ThreadPool::Batch(fn, N, &fPending); } int sk_parallel_for_thread_count() { diff --git a/src/core/SkTaskGroup.h b/src/core/SkTaskGroup.h index f68c528a1f..d1daa44494 100644 --- a/src/core/SkTaskGroup.h +++ b/src/core/SkTaskGroup.h @@ -8,6 +8,8 @@ #ifndef SkTaskGroup_DEFINED #define SkTaskGroup_DEFINED +#include + #include "SkTypes.h" #include "SkAtomics.h" #include "SkTemplates.h" @@ -29,24 +31,16 @@ public: // Neither add() method takes owership of any of its parameters. void add(SkRunnable*); - template - void add(void (*fn)(T*), T* arg) { this->add((void_fn)fn, (void*)arg); } + void add(std::function fn); // Add a batch of N tasks, all calling fn with different arguments. - // Equivalent to a loop over add(fn, arg), but with perhaps less synchronization overhead. - template - void batch(void (*fn)(T*), T* args, int N) { this->batch((void_fn)fn, args, N, sizeof(T)); } + void batch(std::function fn, int N); // Block until all Tasks previously add()ed to this SkTaskGroup have run. // You may safely reuse this SkTaskGroup after wait() returns. void wait(); private: - typedef void(*void_fn)(void*); - - void add (void_fn, void* arg); - void batch(void_fn, void* args, int N, size_t stride); - SkAtomic fPending; }; @@ -81,18 +75,20 @@ void sk_parallel_for(int end, const Func& f) { for (int i = 0; i < nchunks; i++) { Chunk& c = chunks[i]; - c.f = &f; - c.start = i * stride; - c.end = SkTMin(c.start + stride, end); + c.f = &f; + c.start = i * stride; + c.end = SkTMin(c.start + stride, end); SkASSERT(c.start < c.end); // Nothing will break if start >= end, but it's a wasted chunk. } - void(*run_chunk)(Chunk*) = [](Chunk* c) { - for (int i = c->start; i < c->end; i++) { - (*c->f)(i); + Chunk* chunkBase = chunks.get(); + auto run_chunk = [chunkBase](int i) { + Chunk& c = chunkBase[i]; + for (int i = c.start; i < c.end; i++) { + (*c.f)(i); } }; - SkTaskGroup().batch(run_chunk, chunks.get(), nchunks); + SkTaskGroup().batch(run_chunk, nchunks); } #endif//SkTaskGroup_DEFINED