Add sk_parallel_for()

This should be a drop-in replacement for most for-loops to make them run in parallel:
   for (int i = 0; i < N; i++) { code... }
   ~~~>
   sk_parallel_for(N, [&](int i) { code... });

This is just syntax sugar over SkTaskGroup to make this use case really easy to write.
There's no more overhead that we weren't already forced to add using an interface like batch(),
and no extra heap allocations.

I've replaced 3 uses of SkTaskGroup with sk_parallel_for:
  1) My unit tests for SkOnce.
  2) Cary's path fuzzer.
  3) SkMultiPictureDraw.
Performance should be the same.  Please compare left and right for readability. :)

BUG=skia:

No public API changes.
TBR=reed@google.com

Review URL: https://codereview.chromium.org/1184373003
This commit is contained in:
mtklein 2015-06-17 15:26:15 -07:00 committed by Commit bot
parent 5a9e2994c9
commit 00b621cfc0
12 changed files with 136 additions and 164 deletions

View File

@ -66,8 +66,6 @@ private:
void draw();
static void Reset(SkTDArray<DrawData>&);
static void Draw(DrawData* d) { d->draw(); }
};
SkTDArray<DrawData> fThreadSafeDrawData;

View File

@ -617,42 +617,31 @@ static bool contains_only_moveTo(const SkPath& path) {
#include "SkTaskGroup.h"
#include "SkTDArray.h"
struct ThreadState {
int fSeed;
const SkBitmap* fBitmap;
};
static void test_fuzz(ThreadState* data) {
FuzzPath fuzzPath;
fuzzPath.setStrokeOnly();
fuzzPath.setSeed(data->fSeed);
fuzzPath.randomize();
const SkPath& path = fuzzPath.getPath();
const SkPaint& paint = fuzzPath.getPaint();
const SkImageInfo& info = data->fBitmap->info();
SkCanvas* canvas(SkCanvas::NewRasterDirect(info, data->fBitmap->getPixels(),
data->fBitmap->rowBytes()));
int w = info.width() / 4;
int h = info.height() / 4;
int x = data->fSeed / 4 % 4;
int y = data->fSeed % 4;
SkRect clipBounds = SkRect::MakeXYWH(SkIntToScalar(x) * w, SkIntToScalar(y) * h,
SkIntToScalar(w), SkIntToScalar(h));
canvas->save();
canvas->clipRect(clipBounds);
canvas->translate(SkIntToScalar(x) * w, SkIntToScalar(y) * h);
canvas->drawPath(path, paint);
canvas->restore();
}
static void path_fuzz_stroker(SkBitmap* bitmap, int seed) {
ThreadState states[100];
for (size_t i = 0; i < SK_ARRAY_COUNT(states); i++) {
states[i].fSeed = seed + (int) i;
states[i].fBitmap = bitmap;
}
SkTaskGroup tg;
tg.batch(test_fuzz, states, SK_ARRAY_COUNT(states));
sk_parallel_for(100, [&](int i) {
int localSeed = seed + i;
FuzzPath fuzzPath;
fuzzPath.setStrokeOnly();
fuzzPath.setSeed(localSeed);
fuzzPath.randomize();
const SkPath& path = fuzzPath.getPath();
const SkPaint& paint = fuzzPath.getPaint();
const SkImageInfo& info = bitmap->info();
SkCanvas* canvas(
SkCanvas::NewRasterDirect(info, bitmap->getPixels(), bitmap->rowBytes()));
int w = info.width() / 4;
int h = info.height() / 4;
int x = localSeed / 4 % 4;
int y = localSeed % 4;
SkRect clipBounds = SkRect::MakeXYWH(SkIntToScalar(x) * w, SkIntToScalar(y) * h,
SkIntToScalar(w), SkIntToScalar(h));
canvas->save();
canvas->clipRect(clipBounds);
canvas->translate(SkIntToScalar(x) * w, SkIntToScalar(y) * h);
canvas->drawPath(path, paint);
canvas->restore();
});
}
class PathFuzzView : public SampleView {
@ -673,7 +662,7 @@ protected:
void onOnceBeforeDraw() override {
fIndex = 0;
SkImageInfo info(SkImageInfo::MakeN32Premul(SkScalarRoundToInt(width()),
SkImageInfo info(SkImageInfo::MakeN32Premul(SkScalarRoundToInt(width()),
SkScalarRoundToInt(height())));
offscreen.allocPixels(info);
path_fuzz_stroker(&offscreen, fIndex);

View File

@ -91,17 +91,16 @@ void SkMultiPictureDraw::draw(bool flush) {
#ifdef FORCE_SINGLE_THREAD_DRAWING_FOR_TESTING
for (int i = 0; i < fThreadSafeDrawData.count(); ++i) {
DrawData* dd = &fThreadSafeDrawData.begin()[i];
dd->fCanvas->drawPicture(dd->fPicture, &dd->fMatrix, dd->fPaint);
fThreadSafeDrawData[i].draw();
}
#else
// we place the taskgroup after the MPDReset, to ensure that we don't delete the DrawData
// objects until after we're finished the tasks (which have pointers to the data).
SkTaskGroup group;
group.batch(DrawData::Draw, fThreadSafeDrawData.begin(), fThreadSafeDrawData.count());
sk_parallel_for(fThreadSafeDrawData.count(), [&](int i) {
fThreadSafeDrawData[i].draw();
});
#endif
// we deliberately don't call wait() here, since the destructor will do that, this allows us
// to continue processing gpu-data without having to wait on the cpu tasks.
// N.B. we could get going on any GPU work from this main thread while the CPU work runs.
// But in practice, we've either got GPU work or CPU work, not both.
const int count = fGPUDrawData.count();
if (0 == count) {

View File

@ -5,6 +5,7 @@
* found in the LICENSE file.
*/
#include "SkOnce.h"
#include "SkRunnable.h"
#include "SkSemaphore.h"
#include "SkSpinlock.h"
@ -13,18 +14,27 @@
#include "SkThreadUtils.h"
#if defined(SK_BUILD_FOR_WIN32)
static inline int num_cores() {
static void query_num_cores(int* num_cores) {
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
*num_cores = sysinfo.dwNumberOfProcessors;
}
#else
#include <unistd.h>
static inline int num_cores() {
return (int) sysconf(_SC_NPROCESSORS_ONLN);
static void query_num_cores(int* num_cores) {
*num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
}
#endif
// We cache sk_num_cores() so we only query the OS once.
SK_DECLARE_STATIC_ONCE(g_query_num_cores_once);
int sk_num_cores() {
static int num_cores = 0;
SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores);
SkASSERT(num_cores > 0);
return num_cores;
}
namespace {
class ThreadPool : SkNoncopyable {
@ -98,7 +108,7 @@ private:
explicit ThreadPool(int threads) {
if (threads == -1) {
threads = num_cores();
threads = sk_num_cores();
}
for (int i = 0; i < threads; i++) {
fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));

View File

@ -10,6 +10,7 @@
#include "SkTypes.h"
#include "SkAtomics.h"
#include "SkTemplates.h"
struct SkRunnable;
@ -49,4 +50,42 @@ private:
SkAtomic<int32_t> fPending;
};
// Returns best estimate of number of CPU cores available to use.
int sk_num_cores();
// Call f(i) for i in [0, end).
template <typename Func>
void sk_parallel_for(int end, const Func& f) {
if (end <= 0) { return; }
struct Chunk {
const Func* f;
int start, end;
};
// TODO(mtklein): this chunking strategy could probably use some tuning.
int max_chunks = sk_num_cores() * 2,
stride = (end + max_chunks - 1 ) / max_chunks,
nchunks = (end + stride - 1 ) / stride;
SkASSERT(nchunks <= max_chunks);
// With the chunking strategy above this won't malloc until we have a machine with >512 cores.
SkAutoSTMalloc<1024, Chunk> chunks(nchunks);
for (int i = 0; i < nchunks; i++) {
Chunk& c = chunks[i];
c.f = &f;
c.start = i * stride;
c.end = SkTMin(c.start + stride, end);
SkASSERT(c.start < c.end); // Nothing will break if start >= end, but it's a wasted chunk.
}
void(*run_chunk)(Chunk*) = [](Chunk* c) {
for (int i = c->start; i < c->end; i++) {
(*c->f)(i);
}
};
SkTaskGroup().batch(run_chunk, chunks.get(), nchunks);
}
#endif//SkTaskGroup_DEFINED

View File

@ -1,3 +1,10 @@
/*
* Copyright 2014 Google Inc.
*
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#include "Test.h"
#include "SkLazyPtr.h"
#include "SkRunnable.h"
@ -44,37 +51,20 @@ DEF_TEST(LazyPtr, r) {
SkDELETE(ptr);
}
namespace {
struct Racer : public SkRunnable {
Racer() : fLazy(NULL), fSeen(NULL) {}
void run() override { fSeen = fLazy->get(); }
SkLazyPtr<int>* fLazy;
int* fSeen;
};
} // namespace
DEF_TEST(LazyPtr_Threaded, r) {
static const int kRacers = 321;
// Race to intialize the pointer by calling .get().
SkLazyPtr<int> lazy;
int* seen[kRacers];
Racer racers[kRacers];
for (int i = 0; i < kRacers; i++) {
racers[i].fLazy = &lazy;
}
SkTaskGroup tg;
for (int i = 0; i < kRacers; i++) {
tg.add(racers + i);
}
tg.wait();
sk_parallel_for(kRacers, [&](int i) {
seen[i] = lazy.get();
});
// lazy.get() should return the same pointer to all threads.
for (int i = 1; i < kRacers; i++) {
REPORTER_ASSERT(r, racers[i].fSeen);
REPORTER_ASSERT(r, racers[i].fSeen == racers[0].fSeen);
REPORTER_ASSERT(r, seen[i] != nullptr);
REPORTER_ASSERT(r, seen[i] == seen[0]);
}
}

View File

@ -28,42 +28,14 @@ DEF_TEST(SkOnce_Singlethreaded, r) {
REPORTER_ASSERT(r, 5 == x);
}
static void add_six(int* x) {
*x += 6;
}
namespace {
class Racer : public SkRunnable {
public:
SkOnceFlag* once;
int* ptr;
void run() override {
SkOnce(once, add_six, ptr);
}
};
} // namespace
SK_DECLARE_STATIC_ONCE(mt_once);
DEF_TEST(SkOnce_Multithreaded, r) {
const int kTasks = 16;
// Make a bunch of tasks that will race to be the first to add six to x.
Racer racers[kTasks];
int x = 0;
for (int i = 0; i < kTasks; i++) {
racers[i].once = &mt_once;
racers[i].ptr = &x;
}
// Let them race.
SkTaskGroup tg;
for (int i = 0; i < kTasks; i++) {
tg.add(&racers[i]);
}
tg.wait();
// Run a bunch of tasks to be the first to add six to x.
sk_parallel_for(1021, [&](int) {
void(*add_six)(int*) = [](int* p) { *p += 6; };
SkOnce(&mt_once, add_six, &x);
});
// Only one should have done the +=.
REPORTER_ASSERT(r, 6 == x);

View File

@ -14,7 +14,6 @@
#include "SkPaint.h"
#include "SkRTConf.h"
#include "SkStream.h"
#include "SkTaskGroup.h"
#include "SkThread.h"
#ifdef SK_BUILD_FOR_MAC

View File

@ -303,10 +303,11 @@ TestRunner::~TestRunner() {
}
void TestRunner::render() {
SkTaskGroup tg;
for (int index = 0; index < fRunnables.count(); ++ index) {
tg.add(fRunnables[index]);
}
// TODO: this doesn't really need to use SkRunnables any more.
// We can just write the code to run in the for-loop directly.
sk_parallel_for(fRunnables.count(), [&](int i) {
fRunnables[i]->run();
});
}
////////////////////////////////////////////////

View File

@ -16,8 +16,7 @@ PathOpsThreadedTestRunner::~PathOpsThreadedTestRunner() {
}
void PathOpsThreadedTestRunner::render() {
SkTaskGroup tg;
for (int index = 0; index < fRunnables.count(); ++ index) {
tg.add(fRunnables[index]);
}
sk_parallel_for(fRunnables.count(), [&](int i) {
fRunnables[i]->run();
});
}

View File

@ -1,6 +1,9 @@
#if !SK_SUPPORT_GPU
#error "GPU support required"
#endif
/*
* Copyright 2013 Google Inc.
*
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#include "GrContext.h"
#include "GrContextFactory.h"
@ -27,6 +30,10 @@
#include "SkTime.h"
#include "Test.h"
#if !SK_SUPPORT_GPU
#error "GPU support required"
#endif
#ifdef SK_BUILD_FOR_WIN
#define PATH_SLASH "\\"
#define IN_DIR "D:\\9-30-13\\"
@ -162,10 +169,11 @@ SkpSkGrThreadedTestRunner::~SkpSkGrThreadedTestRunner() {
}
void SkpSkGrThreadedTestRunner::render() {
SkTaskGroup tg;
for (int index = 0; index < fRunnables.count(); ++ index) {
tg.add(fRunnables[index]);
}
// TODO: we don't really need to be using SkRunnables here anymore.
// We can just write the code we'd run right in the for loop.
sk_parallel_for(fRunnables.count(), [&](int i) {
fRunnables[i]->run();
});
}
////////////////////////////////////////////////

View File

@ -209,26 +209,6 @@ void SkDiffContext::addDiff(const char* baselinePath, const char* testPath) {
}
}
class SkThreadedDiff : public SkRunnable {
public:
SkThreadedDiff() : fDiffContext(NULL) { }
void setup(SkDiffContext* diffContext, const SkString& baselinePath, const SkString& testPath) {
fDiffContext = diffContext;
fBaselinePath = baselinePath;
fTestPath = testPath;
}
void run() override {
fDiffContext->addDiff(fBaselinePath.c_str(), fTestPath.c_str());
}
private:
SkDiffContext* fDiffContext;
SkString fBaselinePath;
SkString fTestPath;
};
void SkDiffContext::diffDirectories(const char baselinePath[], const char testPath[]) {
// Get the files in the baseline, we will then look for those inside the test path
SkTArray<SkString> baselineEntries;
@ -237,12 +217,8 @@ void SkDiffContext::diffDirectories(const char baselinePath[], const char testPa
return;
}
SkTaskGroup tg;
SkTArray<SkThreadedDiff> runnableDiffs;
runnableDiffs.reset(baselineEntries.count());
for (int x = 0; x < baselineEntries.count(); x++) {
const char* baseFilename = baselineEntries[x].c_str();
sk_parallel_for(baselineEntries.count(), [&](int i) {
const char* baseFilename = baselineEntries[i].c_str();
// Find the real location of each file to compare
SkString baselineFile = SkOSPath::Join(baselinePath, baseFilename);
@ -250,13 +226,11 @@ void SkDiffContext::diffDirectories(const char baselinePath[], const char testPa
// Check that the test file exists and is a file
if (sk_exists(testFile.c_str()) && !sk_isdir(testFile.c_str())) {
// Queue up the comparison with the differ
runnableDiffs[x].setup(this, baselineFile, testFile);
tg.add(&runnableDiffs[x]);
this->addDiff(baselineFile.c_str(), testFile.c_str());
} else {
SkDebugf("Baseline file \"%s\" has no corresponding test file\n", baselineFile.c_str());
}
}
});
}
@ -281,15 +255,9 @@ void SkDiffContext::diffPatterns(const char baselinePattern[], const char testPa
return;
}
SkTaskGroup tg;
SkTArray<SkThreadedDiff> runnableDiffs;
runnableDiffs.reset(baselineEntries.count());
for (int x = 0; x < baselineEntries.count(); x++) {
runnableDiffs[x].setup(this, baselineEntries[x], testEntries[x]);
tg.add(&runnableDiffs[x]);
}
tg.wait();
sk_parallel_for(baselineEntries.count(), [&](int i) {
this->addDiff(baselineEntries[i].c_str(), testEntries[i].c_str());
});
}
void SkDiffContext::outputRecords(SkWStream& stream, bool useJSONP) {