factor Engine out of ok core
This makes Engines (task execution strategies: serial, thread, fork) pluggable just like most of the rest of ok. It removes the thread and process limits, as I find myself rarely caring about what they are exactly. Instead of limiting to num-cores, we just allow any number of concurrent threads, and any number of concurrent child processes subject to OS limitations. Change-Id: Icef49d86818fe9a4b7380efb60e73e40bc2e6b73 Reviewed-on: https://skia-review.googlesource.com/27140 Reviewed-by: Mike Klein <mtklein@chromium.org> Commit-Queue: Mike Klein <mtklein@chromium.org>
This commit is contained in:
parent
2890fbfe14
commit
154e6dadea
5
BUILD.gn
5
BUILD.gn
@ -891,14 +891,14 @@ if (skia_enable_tools) {
|
||||
if (defined(invoker.is_shared_library) && invoker.is_shared_library) {
|
||||
shared_library("lib" + target_name) {
|
||||
forward_variables_from(invoker, "*", [ "is_shared_library" ])
|
||||
configs += [ ":skia_private", ]
|
||||
configs += [ ":skia_private" ]
|
||||
testonly = true
|
||||
}
|
||||
} else {
|
||||
_executable = target_name
|
||||
executable(_executable) {
|
||||
forward_variables_from(invoker, "*", [ "is_shared_library" ])
|
||||
configs += [ ":skia_private", ]
|
||||
configs += [ ":skia_private" ]
|
||||
testonly = true
|
||||
}
|
||||
}
|
||||
@ -1268,6 +1268,7 @@ if (skia_enable_tools) {
|
||||
sources = [
|
||||
"tools/ok.cpp",
|
||||
"tools/ok_dsts.cpp",
|
||||
"tools/ok_engines.cpp",
|
||||
"tools/ok_srcs.cpp",
|
||||
"tools/ok_test.cpp",
|
||||
"tools/ok_vias.cpp",
|
||||
|
151
tools/ok.cpp
151
tools/ok.cpp
@ -13,11 +13,9 @@
|
||||
#include "SkImage.h"
|
||||
#include "ok.h"
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
#include <list>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#if !defined(__has_include)
|
||||
@ -97,80 +95,11 @@ static thread_local const char* tls_currently_running = "";
|
||||
}
|
||||
#endif
|
||||
|
||||
struct Engine {
|
||||
virtual ~Engine() {}
|
||||
virtual bool spawn(std::function<Status(void)>) = 0;
|
||||
virtual Status wait_one() = 0;
|
||||
struct EngineType {
|
||||
const char *name, *help;
|
||||
std::unique_ptr<Engine> (*factory)(Options);
|
||||
};
|
||||
|
||||
struct SerialEngine : Engine {
|
||||
Status last = Status::None;
|
||||
|
||||
bool spawn(std::function<Status(void)> fn) override {
|
||||
last = fn();
|
||||
return true;
|
||||
}
|
||||
|
||||
Status wait_one() override {
|
||||
Status s = last;
|
||||
last = Status::None;
|
||||
return s;
|
||||
}
|
||||
};
|
||||
|
||||
struct ThreadEngine : Engine {
|
||||
std::list<std::future<Status>> live;
|
||||
const std::chrono::steady_clock::time_point the_past = std::chrono::steady_clock::now();
|
||||
|
||||
bool spawn(std::function<Status(void)> fn) override {
|
||||
live.push_back(std::async(std::launch::async, fn));
|
||||
return true;
|
||||
}
|
||||
|
||||
Status wait_one() override {
|
||||
if (live.empty()) {
|
||||
return Status::None;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
for (auto it = live.begin(); it != live.end(); it++) {
|
||||
if (it->wait_until(the_past) == std::future_status::ready) {
|
||||
Status s = it->get();
|
||||
live.erase(it);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#if defined(_MSC_VER)
|
||||
using ForkEngine = ThreadEngine;
|
||||
#else
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
|
||||
struct ForkEngine : Engine {
|
||||
bool spawn(std::function<Status(void)> fn) override {
|
||||
switch (fork()) {
|
||||
case 0: _exit((int)fn());
|
||||
case -1: return false;
|
||||
default: return true;
|
||||
}
|
||||
}
|
||||
|
||||
Status wait_one() override {
|
||||
do {
|
||||
int status;
|
||||
if (wait(&status) > 0) {
|
||||
return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
|
||||
: Status::Crashed;
|
||||
}
|
||||
} while (errno == EINTR);
|
||||
return Status::None;
|
||||
}
|
||||
};
|
||||
#endif
|
||||
static std::vector<EngineType> engine_types;
|
||||
|
||||
struct StreamType {
|
||||
const char *name, *help;
|
||||
@ -206,7 +135,7 @@ int main(int argc, char** argv) {
|
||||
SkGraphics::Init();
|
||||
setup_crash_handler();
|
||||
|
||||
int jobs{1};
|
||||
std::unique_ptr<Engine> engine;
|
||||
std::unique_ptr<Stream> stream;
|
||||
std::function<std::unique_ptr<Dst>(void)> dst_factory = []{
|
||||
// A default Dst that's enough for unit tests and not much else.
|
||||
@ -218,28 +147,36 @@ int main(int argc, char** argv) {
|
||||
};
|
||||
|
||||
auto help = [&] {
|
||||
std::string stream_help = help_for(stream_types),
|
||||
std::string engine_help = help_for(engine_types),
|
||||
stream_help = help_for(stream_types),
|
||||
dst_help = help_for( dst_types),
|
||||
via_help = help_for( via_types);
|
||||
|
||||
printf("%s [-j N] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n"
|
||||
" -j: Run at most N processes at any time. \n"
|
||||
" If <0, use -N threads instead. \n"
|
||||
" If 0, use one thread in one process. \n"
|
||||
" If 1 (default) or -1, auto-detect N. \n"
|
||||
printf("%s [engine] src[:k=v,...] dst[:k=v,...] [via[:k=v,...] ...] \n"
|
||||
" engine: how to execute tasks%s \n"
|
||||
" src: content to draw%s \n"
|
||||
" dst: how to draw that content%s \n"
|
||||
" via: wrappers around dst%s \n"
|
||||
" Most srcs, dsts and vias have options, e.g. skp:dir=skps sw:ct=565 \n",
|
||||
argv[0], stream_help.c_str(), dst_help.c_str(), via_help.c_str());
|
||||
argv[0],
|
||||
engine_help.c_str(), stream_help.c_str(), dst_help.c_str(), via_help.c_str());
|
||||
return 1;
|
||||
};
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (0 == strcmp("-j", argv[i])) { jobs = atoi(argv[++i]); }
|
||||
if (0 == strcmp("-h", argv[i])) { return help(); }
|
||||
if (0 == strcmp("--help", argv[i])) { return help(); }
|
||||
|
||||
for (auto e : engine_types) {
|
||||
size_t len = strlen(e.name);
|
||||
if (0 == strncmp(e.name, argv[i], len)) {
|
||||
switch (argv[i][len]) {
|
||||
case ':': len++;
|
||||
case '\0': engine = e.factory(Options{argv[i]+len});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto s : stream_types) {
|
||||
size_t len = strlen(s.name);
|
||||
if (0 == strncmp(s.name, argv[i], len)) {
|
||||
@ -275,12 +212,12 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
if (!stream) { return help(); }
|
||||
|
||||
std::unique_ptr<Engine> engine;
|
||||
if (jobs == 0) { engine.reset(new SerialEngine); }
|
||||
if (jobs > 0) { engine.reset(new ForkEngine); defer_logging(); }
|
||||
if (jobs < 0) { engine.reset(new ThreadEngine); jobs = -jobs; }
|
||||
if (!engine) { engine = engine_types.back().factory(Options{}); }
|
||||
|
||||
if (jobs == 1) { jobs = std::thread::hardware_concurrency(); }
|
||||
// If we know engine->spawn() will never crash, we can defer logging until we exit.
|
||||
if (engine->crashproof()) {
|
||||
defer_logging();
|
||||
}
|
||||
|
||||
int ok = 0, failed = 0, crashed = 0, skipped = 0;
|
||||
|
||||
@ -306,13 +243,35 @@ int main(int argc, char** argv) {
|
||||
fflush(stdout);
|
||||
};
|
||||
|
||||
std::list<std::future<Status>> live;
|
||||
const auto the_past = std::chrono::steady_clock::now();
|
||||
|
||||
auto wait_one = [&] {
|
||||
if (live.empty()) {
|
||||
return Status::None;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
for (auto it = live.begin(); it != live.end(); it++) {
|
||||
if (it->wait_until(the_past) != std::future_status::timeout) {
|
||||
Status s = it->get();
|
||||
live.erase(it);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto spawn = [&](std::function<Status(void)> fn) {
|
||||
if (--jobs < 0) {
|
||||
update_stats(engine->wait_one());
|
||||
std::future<Status> status;
|
||||
for (;;) {
|
||||
status = engine->spawn(fn);
|
||||
if (status.valid()) {
|
||||
break;
|
||||
}
|
||||
while (!engine->spawn(fn)) {
|
||||
update_stats(engine->wait_one());
|
||||
update_stats(wait_one());
|
||||
}
|
||||
live.push_back(std::move(status));
|
||||
};
|
||||
|
||||
for (std::unique_ptr<Src> owned = stream->next(); owned; owned = stream->next()) {
|
||||
@ -328,7 +287,7 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
|
||||
for (Status s = Status::OK; s != Status::None; ) {
|
||||
s = engine->wait_one();
|
||||
s = wait_one();
|
||||
update_stats(s);
|
||||
}
|
||||
printf("\n");
|
||||
@ -336,6 +295,10 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
|
||||
|
||||
Register::Register(const char* name, const char* help,
|
||||
std::unique_ptr<Engine> (*factory)(Options)) {
|
||||
engine_types.push_back(EngineType{name, help, factory});
|
||||
}
|
||||
Register::Register(const char* name, const char* help,
|
||||
std::unique_ptr<Stream> (*factory)(Options)) {
|
||||
stream_types.push_back(StreamType{name, help, factory});
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
#include "SkCanvas.h"
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
@ -24,6 +25,12 @@ void ok_log(const char*);
|
||||
|
||||
enum class Status { OK, Failed, Crashed, Skipped, None };
|
||||
|
||||
struct Engine {
|
||||
virtual ~Engine() {}
|
||||
virtual bool crashproof() = 0;
|
||||
virtual std::future<Status> spawn(std::function<Status(void)>) = 0;
|
||||
};
|
||||
|
||||
struct Src {
|
||||
virtual ~Src() {}
|
||||
virtual std::string name() = 0;
|
||||
@ -52,6 +59,7 @@ public:
|
||||
|
||||
// Create globals to register your new type of Stream or Dst.
|
||||
struct Register {
|
||||
Register(const char* name, const char* help, std::unique_ptr<Engine> (*factory)(Options));
|
||||
Register(const char* name, const char* help, std::unique_ptr<Stream> (*factory)(Options));
|
||||
Register(const char* name, const char* help, std::unique_ptr<Dst> (*factory)(Options));
|
||||
Register(const char* name, const char* help,
|
||||
|
85
tools/ok_engines.cpp
Normal file
85
tools/ok_engines.cpp
Normal file
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2017 Google Inc.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license that can be
|
||||
* found in the LICENSE file.
|
||||
*/
|
||||
|
||||
#include "ok.h"
|
||||
|
||||
struct SerialEngine : Engine {
|
||||
static std::unique_ptr<Engine> Factory(Options) {
|
||||
SerialEngine engine;
|
||||
return move_unique(engine);
|
||||
}
|
||||
|
||||
bool crashproof() override { return false; }
|
||||
|
||||
std::future<Status> spawn(std::function<Status(void)> fn) override {
|
||||
return std::async(std::launch::deferred, fn);
|
||||
}
|
||||
};
|
||||
static Register serial("serial",
|
||||
"Run tasks serially on the main thread of a single process.",
|
||||
SerialEngine::Factory);
|
||||
|
||||
struct ThreadEngine : Engine {
|
||||
static std::unique_ptr<Engine> Factory(Options) {
|
||||
ThreadEngine engine;
|
||||
return move_unique(engine);
|
||||
}
|
||||
|
||||
bool crashproof() override { return false; }
|
||||
|
||||
std::future<Status> spawn(std::function<Status(void)> fn) override {
|
||||
return std::async(std::launch::async, fn);
|
||||
}
|
||||
};
|
||||
static Register thread("thread",
|
||||
"Run each task on its own thread of a single process.",
|
||||
ThreadEngine::Factory);
|
||||
|
||||
#if !defined(_MSC_VER)
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
|
||||
struct ForkEngine : Engine {
|
||||
static std::unique_ptr<Engine> Factory(Options) {
|
||||
ForkEngine engine;
|
||||
return move_unique(engine);
|
||||
}
|
||||
|
||||
bool crashproof() override { return true; }
|
||||
|
||||
std::future<Status> spawn(std::function<Status(void)> fn) override {
|
||||
switch (fork()) {
|
||||
case 0:
|
||||
// We are the spawned child process.
|
||||
// Run fn() and exit() with its Status as our return code.
|
||||
_exit((int)fn());
|
||||
|
||||
case -1:
|
||||
// The OS won't let us fork() another process right now.
|
||||
// We'll need to wait for at least one live task to finish and try again.
|
||||
return std::future<Status>();
|
||||
|
||||
default:
|
||||
// We succesfully spawned a child process!
|
||||
// This will wait for any spawned process to finish and return its Status.
|
||||
return std::async(std::launch::deferred, [] {
|
||||
do {
|
||||
int status;
|
||||
if (wait(&status) > 0) {
|
||||
return WIFEXITED(status) ? (Status)WEXITSTATUS(status)
|
||||
: Status::Crashed;
|
||||
}
|
||||
} while (errno == EINTR);
|
||||
return Status::None;
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
static Register _fork("fork",
|
||||
"Run each task in an independent process with fork().",
|
||||
ForkEngine::Factory);
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user