[wasm] Use NumOutstandingCompilations() in BackgroundCompileJob:GetMaxConcurrency()

This simplifies current_compile_job_ since ScheduleCompileJobForNewUnits
is only called on the main thread.

From pinpoint:
v8:wasm:sync_instantiate:wall_time: 19.1% improvement
v8-gc-incremental-step: 20.5% improvement
https://pinpoint-dot-chromeperf.appspot.com/job/152920d8520000

Change-Id: Id560080937f5439cf3321ce9306c7cae49e74798
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2442383
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70386}
This commit is contained in:
Etienne Pierre-doray 2020-10-07 10:08:42 -04:00 committed by Commit Bot
parent 18ac6024ea
commit 84eec6e920

View File

@ -540,39 +540,6 @@ class CompilationUnitQueues {
}
};
// {JobHandle} is not thread safe in general (at least both the
// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect
// concurrent accesses via a mutex.
class ThreadSafeJobHandle {
public:
explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle)
: job_handle_(std::move(job_handle)) {}
void NotifyConcurrencyIncrease() {
base::MutexGuard guard(&mutex_);
job_handle_->NotifyConcurrencyIncrease();
}
void Join() {
base::MutexGuard guard(&mutex_);
job_handle_->Join();
}
void Cancel() {
base::MutexGuard guard(&mutex_);
job_handle_->Cancel();
}
bool IsRunning() const {
base::MutexGuard guard(&mutex_);
return job_handle_->IsRunning();
}
private:
mutable base::Mutex mutex_;
std::shared_ptr<JobHandle> job_handle_;
};
// The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background
@ -635,7 +602,7 @@ class CompilationStateImpl {
void PublishDetectedFeatures(Isolate*);
// Ensure that a compilation job is running, and increase its concurrency if
// needed.
void ScheduleCompileJobForNewUnits(int new_units);
void ScheduleCompileJobForNewUnits();
size_t NumOutstandingCompilations() const;
@ -694,19 +661,6 @@ class CompilationStateImpl {
// using relaxed semantics.
std::atomic<bool> compile_failed_{false};
// The atomic counter is shared with the compilation job. It's increased if
// more units are added, and decreased when the queue drops to zero. Hence
// it's an approximation of the current number of available units in the
// queue, but it's not updated after popping a single unit, because that
// would create too much contention.
// This counter is not used for synchronization, hence relaxed memory ordering
// can be used. The thread that increases the counter is the same that calls
// {NotifyConcurrencyIncrease} later. The only reduction of the counter is a
// drop to zero after a worker does not find any unit in the queue, and after
// that drop another check is executed to ensure that any left-over units are
// still processed.
std::shared_ptr<std::atomic<int>> scheduled_units_approximation_ =
std::make_shared<std::atomic<int>>(0);
const int max_compile_concurrency_ = 0;
CompilationUnitQueues compilation_unit_queues_;
@ -725,7 +679,7 @@ class CompilationStateImpl {
//////////////////////////////////////////////////////////////////////////////
// Protected by {mutex_}:
std::shared_ptr<ThreadSafeJobHandle> current_compile_job_;
std::shared_ptr<JobHandle> current_compile_job_;
// Features detected to be used in this module. Features can be detected
// as a module is being compiled.
@ -1634,51 +1588,31 @@ void CompileNativeModule(Isolate* isolate,
// The runnable task that performs compilations in the background.
class BackgroundCompileJob : public JobTask {
public:
explicit BackgroundCompileJob(
std::shared_ptr<BackgroundCompileToken> token,
std::shared_ptr<Counters> async_counters,
std::shared_ptr<std::atomic<int>> scheduled_units_approximation,
size_t max_concurrency)
explicit BackgroundCompileJob(std::shared_ptr<BackgroundCompileToken> token,
std::shared_ptr<Counters> async_counters,
size_t max_concurrency)
: token_(std::move(token)),
async_counters_(std::move(async_counters)),
scheduled_units_approximation_(
std::move(scheduled_units_approximation)),
max_concurrency_(max_concurrency) {}
void Run(JobDelegate* delegate) override {
if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
kBaselineOrTopTier) == kYield) {
return;
}
// Otherwise we didn't find any more units to execute. Reduce the atomic
// counter of the approximated number of available units to zero, but then
// check whether any more units were added in the meantime, and increase
// back if necessary.
scheduled_units_approximation_->store(0, std::memory_order_relaxed);
BackgroundCompileScope scope(token_);
if (scope.cancelled()) return;
size_t outstanding_units =
scope.compilation_state()->NumOutstandingCompilations();
if (outstanding_units == 0) return;
// On a race between this thread and the thread which scheduled the units,
// this might increase concurrency more than needed, which is fine. It
// will be reduced again when the first task finds no more work to do.
scope.compilation_state()->ScheduleCompileJobForNewUnits(
static_cast<int>(outstanding_units));
ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
kBaselineOrTopTier);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
// {current_concurrency_} does not reflect the units that running workers
// are processing, thus add the current worker count to that number.
return std::min(max_concurrency_,
worker_count + scheduled_units_approximation_->load());
BackgroundCompileScope scope(token_);
if (scope.cancelled()) return 0;
// NumOutstandingCompilations() does not reflect the units that running
// workers are processing, thus add the current worker count to that number.
return std::min<size_t>(
max_concurrency_,
worker_count + scope.compilation_state()->NumOutstandingCompilations());
}
private:
const std::shared_ptr<BackgroundCompileToken> token_;
const std::shared_ptr<Counters> async_counters_;
const std::shared_ptr<std::atomic<int>> scheduled_units_approximation_;
const size_t max_concurrency_;
};
@ -3003,10 +2937,7 @@ void CompilationStateImpl::AddCompilationUnits(
js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
js_to_wasm_wrapper_units.begin(),
js_to_wasm_wrapper_units.end());
size_t total_units = baseline_units.size() + top_tier_units.size() +
js_to_wasm_wrapper_units.size();
ScheduleCompileJobForNewUnits(static_cast<int>(total_units));
ScheduleCompileJobForNewUnits();
}
void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
@ -3016,7 +2947,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
void CompilationStateImpl::AddTopTierPriorityCompilationUnit(
WasmCompilationUnit unit, size_t priority) {
compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority);
ScheduleCompileJobForNewUnits(1);
ScheduleCompileJobForNewUnits();
}
std::shared_ptr<JSToWasmWrapperCompilationUnit>
@ -3241,35 +3172,20 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) {
UpdateFeatureUseCounts(isolate, detected_features_);
}
void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) {
// Increase the {scheduled_units_approximation_} counter and remember the old
// value to check whether it increased towards {max_compile_concurrency_}.
// In that case, we need to notify the compile job about the increased
// concurrency.
DCHECK_LT(0, new_units);
int old_units = scheduled_units_approximation_->fetch_add(
new_units, std::memory_order_relaxed);
bool concurrency_increased = old_units < max_compile_concurrency_;
base::MutexGuard guard(&mutex_);
void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (current_compile_job_ && current_compile_job_->IsRunning()) {
if (concurrency_increased) {
current_compile_job_->NotifyConcurrencyIncrease();
}
current_compile_job_->NotifyConcurrencyIncrease();
return;
}
if (failed()) return;
std::unique_ptr<JobTask> new_compile_job =
std::make_unique<BackgroundCompileJob>(
background_compile_token_, async_counters_,
scheduled_units_approximation_, max_compile_concurrency_);
background_compile_token_, async_counters_, max_compile_concurrency_);
// TODO(wasm): Lower priority for TurboFan-only jobs.
std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob(
current_compile_job_ = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible, std::move(new_compile_job));
native_module_->engine()->ShepherdCompileJobHandle(handle);
current_compile_job_ =
std::make_unique<ThreadSafeJobHandle>(std::move(handle));
native_module_->engine()->ShepherdCompileJobHandle(current_compile_job_);
}
size_t CompilationStateImpl::NumOutstandingCompilations() const {