Renaming and refactoring to prepare for init-once threaded backend

Bug: skia:
Change-Id: I39b1d73b612a9c133847dd7361d634a0351659f1
Reviewed-on: https://skia-review.googlesource.com/70221
Reviewed-by: Herb Derby <herb@google.com>
Commit-Queue: Yuqian Li <liyuqian@google.com>
This commit is contained in:
Yuqian Li 2017-11-30 12:17:22 -05:00 committed by Skia Commit-Bot
parent 537473317c
commit 07a42411f8
5 changed files with 244 additions and 241 deletions

View File

@ -300,6 +300,8 @@ skia_core_sources = [
"$_src/core/SkSRGB.cpp",
"$_src/core/SkTaskGroup.cpp",
"$_src/core/SkTaskGroup.h",
"$_src/core/SkTaskGroup2D.cpp",
"$_src/core/SkTaskGroup2D.h",
"$_src/core/SkTDPQueue.h",
"$_src/core/SkTDynamicHash.h",
"$_src/core/SkTInternalLList.h",

View File

@ -0,0 +1,83 @@
/*
* Copyright 2017 Google Inc.
*
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#include "SkTaskGroup2D.h"
void SkTaskGroup2D::start() {
fThreadsGroup->batch(fThreadCnt, [this](int threadId){
this->work(threadId);
});
}
void SkTaskGroup2D::addColumn() {
SkASSERT(!fIsFinishing); // we're not supposed to add more work after the calling of finish
fWidth++;
}
void SkTaskGroup2D::finish() {
fIsFinishing.store(true, std::memory_order_relaxed);
fThreadsGroup->wait();
}
void SkSpinningTaskGroup2D::work(int threadId) {
int& nextColumn = fRowData[threadId].fNextColumn;
while (true) {
SkASSERT(nextColumn <= fWidth);
if (this->isFinishing() && nextColumn >= fWidth) {
return;
}
if (nextColumn < fWidth) {
fWork(threadId, nextColumn);
nextColumn++;
}
}
}
SkFlexibleTaskGroup2D::SkFlexibleTaskGroup2D(Work2D&& w, int h, SkExecutor* x, int t)
: SkTaskGroup2D(std::move(w), h, x, t), fRowData(h), fThreadData(t) {
for (int i = 0; i < t; ++i) {
fThreadData[i].fRowIndex = i;
}
}
void SkFlexibleTaskGroup2D::work(int threadId) {
int failCnt = 0;
int& rowIndex = fThreadData[threadId].fRowIndex;
// This loop looks for work to do as long as
// either 1. isFinishing is false
// or 2. isFinishing is true but some rows still have unfinished tasks
while (true) {
RowData& rowData = fRowData[rowIndex];
bool locked = rowData.fMutex.try_lock();
bool processed = false;
if (locked) {
if (rowData.fNextColumn < fWidth) {
fWork(rowIndex, rowData.fNextColumn);
rowData.fNextColumn++;
processed = true;
} else {
// isFinishing can never go from true to false. Once it's true, we count how many
// times that a row is out of work. If that count reaches fHeight, then we're out of
// work for the whole group.
failCnt += this->isFinishing();
}
rowData.fMutex.unlock();
}
if (!processed) {
if (failCnt >= fHeight) {
return;
}
rowIndex = (rowIndex + 1) % fHeight;
}
}
}

108
src/core/SkTaskGroup2D.h Normal file
View File

@ -0,0 +1,108 @@
/*
* Copyright 2017 Google Inc.
*
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#ifndef SkTaskGroup2D_DEFINED
#define SkTaskGroup2D_DEFINED
#include "SkTaskGroup.h"
#include <mutex>
#include <vector>
// A 2D grid (height rows x width columns) of tasks.
//
// The task on row i and column j is abstracted as Work2D(i, j). We guarantee that the task on the
// same row will be executed in order (i.e., Work2D(1, 1) is guaranteed to finish before calling
// Work2D(1, 2)). Tasks in different rows can happen in any order.
//
// The height (number of rows) is fixed. The width (number of columns) may be dynamically expanded.
//
// The tasks will eventually be executed on the executor with threadCnt number of hardware threads.
class SkTaskGroup2D {
public:
using Work2D = std::function<void(int, int)>;
SkTaskGroup2D(Work2D&& work, int height, SkExecutor* executor, int threadCnt)
: fWork(work), fHeight(height), fThreadCnt(threadCnt), fIsFinishing(false), fWidth(0)
, fThreadsGroup(new SkTaskGroup(*executor)) {}
virtual ~SkTaskGroup2D() {}
virtual void addColumn(); // Add a new column of tasks.
void start(); // start threads to execute tasks
void finish(); // wait and finish all tasks (no more tasks can be added after calling this)
SK_ALWAYS_INLINE bool isFinishing() const {
return fIsFinishing.load(std::memory_order_relaxed);
}
protected:
static constexpr int MAX_CACHE_LINE = 64;
// Finish all tasks on the threadId and then return.
virtual void work(int threadId) = 0;
Work2D fWork; // fWork(i, j) is the task to be done on row i and column j
const int fHeight;
const int fThreadCnt;
std::atomic<bool> fIsFinishing;
std::atomic<int> fWidth;
std::unique_ptr<SkTaskGroup> fThreadsGroup;
};
// A simple spinning task group that assumes height equals threadCnt.
class SkSpinningTaskGroup2D final : public SkTaskGroup2D {
public:
SkSpinningTaskGroup2D(Work2D&& w, int h, SkExecutor* x, int t)
: SkTaskGroup2D(std::move(w), h, x, t), fRowData(h) {
SkASSERT(h == t); // height must be equal to threadCnt
}
protected:
void work(int threadId) override;
private:
// alignas(MAX_CACHE_LINE) to avoid false sharing by cache lines
struct alignas(MAX_CACHE_LINE) RowData {
RowData() : fNextColumn(0) {}
int fNextColumn; // next column index to be executed
};
std::vector<RowData> fRowData;
};
class SkFlexibleTaskGroup2D final : public SkTaskGroup2D {
public:
SkFlexibleTaskGroup2D(Work2D&&, int, SkExecutor*, int);
protected:
void work(int threadId) override;
private:
// alignas(MAX_CACHE_LINE) to avoid false sharing by cache lines
struct alignas(MAX_CACHE_LINE) RowData {
RowData() : fNextColumn(0) {}
int fNextColumn; // next column index to be executed
std::mutex fMutex; // the mutex for the thread to acquire
};
struct alignas(MAX_CACHE_LINE) ThreadData {
ThreadData() : fRowIndex(0) {}
int fRowIndex; // the row that the current thread is working on
};
std::vector<RowData> fRowData;
std::vector<ThreadData> fThreadData;
};
#endif//SkTaskGroup2D_DEFINED

View File

@ -11,202 +11,25 @@
#include "SkTaskGroup.h"
#include "SkVertices.h"
#include <mutex>
#include <vector>
constexpr int MAX_CACHE_LINE = 64;
// Some basic logics and data structures that are shared across the current experimental schedulers.
class TiledDrawSchedulerBase : public TiledDrawScheduler {
public:
TiledDrawSchedulerBase(int tiles, WorkFunc work)
: fTileCnt(tiles), fIsFinishing(false), fDrawCnt(0), fWork(std::move(work)) {}
void signal() override {
fDrawCnt++;
}
void finish() override {
fIsFinishing.store(true, std::memory_order_relaxed);
void SkThreadedBMPDevice::DrawQueue::reset() {
if (fTasks) {
fTasks->finish();
}
protected:
const int fTileCnt;
std::atomic<bool> fIsFinishing;
std::atomic<int> fDrawCnt;
WorkFunc fWork;
};
fSize = 0;
class TiledDrawSchedulerBySpinning : public TiledDrawSchedulerBase {
public:
TiledDrawSchedulerBySpinning(int tiles, WorkFunc work)
: TiledDrawSchedulerBase(tiles, std::move(work)), fScheduleData(tiles) {}
void signal() final { this->TiledDrawSchedulerBase::signal(); }
void finish() final { this->TiledDrawSchedulerBase::finish(); }
bool next(int& tileIndex) final {
int& drawIndex = fScheduleData[tileIndex].fDrawIndex;
SkASSERT(drawIndex <= fDrawCnt);
while (true) {
bool isFinishing = fIsFinishing.load(std::memory_order_relaxed);
if (isFinishing && drawIndex >= fDrawCnt) {
return false;
} else if (drawIndex < fDrawCnt) {
fWork(tileIndex, drawIndex++);
return true;
}
// using TaskGroup2D = SkSpinningTaskGroup2D;
using TaskGroup2D = SkFlexibleTaskGroup2D;
auto draw2D = [this](int row, int column){
SkThreadedBMPDevice::DrawElement& drawElement = fElements[column];
if (!SkIRect::Intersects(fDevice->fTileBounds[row], drawElement.fDrawBounds)) {
return;
}
}
private:
// alignas(MAX_CACHE_LINE) to avoid false sharing by cache lines
struct alignas(MAX_CACHE_LINE) TileScheduleData {
TileScheduleData() : fDrawIndex(0) {}
int fDrawIndex; // next draw index for this tile
drawElement.fDrawFn(fDevice->fTileBounds[row]);
};
std::vector<TileScheduleData> fScheduleData;
};
class TiledDrawSchedulerFlexible : public TiledDrawSchedulerBase {
public:
TiledDrawSchedulerFlexible(int tiles, WorkFunc work)
: TiledDrawSchedulerBase(tiles, std::move(work)), fScheduleData(tiles) {}
void signal() final { this->TiledDrawSchedulerBase::signal(); }
void finish() final { this->TiledDrawSchedulerBase::finish(); }
bool next(int& tileIndex) final {
int failCnt = 0;
while (true) {
TileScheduleData& scheduleData = fScheduleData[tileIndex];
bool locked = scheduleData.fMutex.try_lock();
bool processed = false;
if (locked) {
if (scheduleData.fDrawIndex < fDrawCnt) {
fWork(tileIndex, scheduleData.fDrawIndex++);
processed = true;
} else {
failCnt += fIsFinishing.load(std::memory_order_relaxed);
}
scheduleData.fMutex.unlock();
}
if (processed) {
return true;
} else {
if (failCnt >= fTileCnt) {
return false;
}
tileIndex = (tileIndex + 1) % fTileCnt;
}
}
}
private:
// alignas(MAX_CACHE_LINE) to avoid false sharing by cache lines
struct alignas(MAX_CACHE_LINE) TileScheduleData {
TileScheduleData() : fDrawIndex(0) {}
int fDrawIndex; // next draw index for this tile
std::mutex fMutex; // the mutex for the thread to acquire
};
std::vector<TileScheduleData> fScheduleData;
};
class TiledDrawSchedulerBySemaphores : public TiledDrawSchedulerBase {
public:
TiledDrawSchedulerBySemaphores(int tiles, WorkFunc work)
: TiledDrawSchedulerBase(tiles, std::move(work)), fScheduleData(tiles) {}
void signal() final {
this->TiledDrawSchedulerBase::signal();
signalRoot();
}
void finish() final {
this->TiledDrawSchedulerBase::finish();
signalRoot();
}
bool next(int& tileIndex) final {
SkASSERT(tileIndex >= 0 && tileIndex < fTileCnt);
TileScheduleData& scheduleData = fScheduleData[tileIndex];
while (true) {
scheduleData.fSemaphore.wait();
int leftChild = (tileIndex + 1) * 2 - 1;
int rightChild = leftChild + 1;
if (leftChild < fTileCnt) {
fScheduleData[leftChild].fSemaphore.signal();
}
if (rightChild < fTileCnt) {
fScheduleData[rightChild].fSemaphore.signal();
}
bool isFinishing = fIsFinishing.load(std::memory_order_relaxed);
if (isFinishing && scheduleData.fDrawIndex >= fDrawCnt) {
return false;
} else {
SkASSERT(scheduleData.fDrawIndex < fDrawCnt);
fWork(tileIndex, scheduleData.fDrawIndex++);
return true;
}
}
}
private:
// alignas(MAX_CACHE_LINE) to avoid false sharing by cache lines
struct alignas(MAX_CACHE_LINE) TileScheduleData {
TileScheduleData() : fDrawIndex(0) {}
int fDrawIndex;
SkSemaphore fSemaphore;
};
void signalRoot() {
SkASSERT(fTileCnt > 0);
fScheduleData[0].fSemaphore.signal();
}
std::vector<TileScheduleData> fScheduleData;
};
void SkThreadedBMPDevice::startThreads() {
SkASSERT(fQueueSize == 0);
TiledDrawScheduler::WorkFunc work = [this](int tileIndex, int drawIndex){
auto& element = fQueue[drawIndex];
if (SkIRect::Intersects(fTileBounds[tileIndex], element.fDrawBounds)) {
element.fDrawFn(fTileBounds[tileIndex]);
}
};
// using Scheduler = TiledDrawSchedulerBySemaphores;
// using Scheduler = TiledDrawSchedulerBySpinning;
using Scheduler = TiledDrawSchedulerFlexible;
fScheduler.reset(new Scheduler(fTileCnt, work));
// We intentionally call the int parameter tileIndex although it ranges from 0 to fThreadCnt-1.
// For some schedulers (e.g., TiledDrawSchedulerBySemaphores and TiledDrawSchedulerBySpinning),
// fThreadCnt should be equal to fTileCnt so it doesn't make a difference.
//
// For TiledDrawSchedulerFlexible, the input tileIndex provides only a hint about which tile
// the current thread should draw; the scheduler may later modify that tileIndex to draw on
// another tile.
fTaskGroup->batch(fThreadCnt, [this](int tileIndex){
while (fScheduler->next(tileIndex)) {}
});
}
void SkThreadedBMPDevice::finishThreads() {
fScheduler->finish();
fTaskGroup->wait();
fQueueSize = 0;
fScheduler.reset(nullptr);
fTasks.reset(new TaskGroup2D(draw2D, fDevice->fTileCnt, fDevice->fExecutor,
fDevice->fThreadCnt));
fTasks->start();
}
SkThreadedBMPDevice::SkThreadedBMPDevice(const SkBitmap& bitmap,
@ -216,6 +39,7 @@ SkThreadedBMPDevice::SkThreadedBMPDevice(const SkBitmap& bitmap,
: INHERITED(bitmap)
, fTileCnt(tiles)
, fThreadCnt(threads <= 0 ? tiles : threads)
, fQueue(this)
{
if (executor == nullptr) {
fInternalExecutor = SkExecutor::MakeFIFOThreadPool(fThreadCnt);
@ -230,14 +54,11 @@ SkThreadedBMPDevice::SkThreadedBMPDevice(const SkBitmap& bitmap,
for(int tid = 0; tid < fTileCnt; ++tid, top += h) {
fTileBounds.push_back(SkIRect::MakeLTRB(0, top, w, top + h));
}
fQueueSize = 0;
fTaskGroup.reset(new SkTaskGroup(*fExecutor));
startThreads();
fQueue.reset();
}
void SkThreadedBMPDevice::flush() {
finishThreads();
startThreads();
fQueue.reset();
}
// Having this captured in lambda seems to be faster than saving this in DrawElement
@ -279,20 +100,15 @@ SkIRect SkThreadedBMPDevice::transformDrawBounds(const SkRect& drawBounds) const
// The do {...} while (false) is to enforce trailing semicolon as suggested by mtklein@
#define THREADED_DRAW(drawBounds, actualDrawCall) \
do { \
if (fQueueSize == MAX_QUEUE_SIZE) { \
this->flush(); \
} \
DrawState ds(this); \
SkASSERT(fQueueSize < MAX_QUEUE_SIZE); \
fQueue[fQueueSize++] = { \
fQueue.push({ \
this->transformDrawBounds(drawBounds), \
[=](const SkIRect& tileBounds) { \
SkRasterClip tileRC; \
SkDraw draw = ds.getThreadDraw(tileRC, tileBounds); \
draw.actualDrawCall; \
}, \
}; \
fScheduler->signal(); \
}); \
} while (false)
static inline SkRect get_fast_bounds(const SkRect& r, const SkPaint& p) {

View File

@ -8,34 +8,10 @@
#ifndef SkThreadedBMPDevice_DEFINED
#define SkThreadedBMPDevice_DEFINED
#include "SkTaskGroup.h"
#include "SkDraw.h"
#include "SkBitmapDevice.h"
#include "SkDraw.h"
#include "SkTaskGroup2D.h"
class TiledDrawScheduler {
public:
using WorkFunc = std::function<void(int, int)>;
virtual ~TiledDrawScheduler() {}
virtual void signal() = 0; // signal that one more draw is available for all tiles
// Tell scheduler that no more draw calls will be added (no signal will be called).
virtual void finish() = 0;
// Handle the next draw available. This method will block until
// (1) the next draw is finished, or
// (2) the finish is called
// The method will return true for case (1) and false for case (2).
// When there's no draw available and we haven't called finish, we will just wait.
// In many cases, the parameter tileIndex specifies the tile that the next draw should happen.
// However, for some schedulers, that tileIndex may only be a hint and the scheduler is free
// to find another tile to draw. In that case, tileIndex will be changed to the actual tileIndex
// where the draw happens.
virtual bool next(int& tileIndex) = 0;
};
///////////////////////////////////////////////////////////////////////////////
class SkThreadedBMPDevice : public SkBitmapDevice {
public:
// When threads = 0, we make fThreadCnt = tiles. Otherwise fThreadCnt = threads.
@ -43,7 +19,7 @@ public:
SkThreadedBMPDevice(const SkBitmap& bitmap, int tiles, int threads = 0,
SkExecutor* executor = nullptr);
~SkThreadedBMPDevice() override { finishThreads(); }
~SkThreadedBMPDevice() override { fQueue.finish(); }
protected:
void drawPaint(const SkPaint& paint) override;
@ -67,23 +43,44 @@ protected:
void flush() override;
private:
struct DrawState;
struct DrawElement {
SkIRect fDrawBounds;
std::function<void(const SkIRect& threadBounds)> fDrawFn;
};
struct DrawState;
class DrawQueue {
public:
static constexpr int MAX_QUEUE_SIZE = 100000;
DrawQueue(SkThreadedBMPDevice* device) : fDevice(device) {}
void reset();
// For ~SkThreadedBMPDevice() to shutdown tasks, we use this instead of reset because reset
// will start new tasks.
void finish() { fTasks->finish(); }
SK_ALWAYS_INLINE void push(DrawElement&& element) {
if (fSize == MAX_QUEUE_SIZE) {
this->reset();
}
SkASSERT(fSize < MAX_QUEUE_SIZE);
fElements[fSize++] = std::move(element);
fTasks->addColumn();
}
private:
SkThreadedBMPDevice* fDevice;
std::unique_ptr<SkTaskGroup2D> fTasks;
DrawElement fElements[MAX_QUEUE_SIZE];
int fSize;
};
SkIRect transformDrawBounds(const SkRect& drawBounds) const;
void startThreads();
void finishThreads();
static constexpr int MAX_QUEUE_SIZE = 100000;
const int fTileCnt;
const int fThreadCnt;
std::unique_ptr<TiledDrawScheduler> fScheduler;
SkTArray<SkIRect> fTileBounds;
/**
@ -95,10 +92,7 @@ private:
SkExecutor* fExecutor = nullptr;
std::unique_ptr<SkExecutor> fInternalExecutor;
std::unique_ptr<SkTaskGroup> fTaskGroup; // generated from fExecutor
DrawElement fQueue[MAX_QUEUE_SIZE];
int fQueueSize;
DrawQueue fQueue;
typedef SkBitmapDevice INHERITED;
};