From 443061331390306ae436fe134897031efddb9f02 Mon Sep 17 00:00:00 2001 From: Sigurd Schneider Date: Tue, 2 Apr 2019 18:48:06 +0000 Subject: [PATCH] Revert "[wasm] Use work-stealing queues for background compilation" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit d746be9cebe72ea8497097bfe21b86a1bbc61dfd. Reason for revert: Speculative revert because this CL *might* cause timeouts on several bots: https://ci.chromium.org/p/v8/builders/ci/V8%20Linux%20-%20predictable/23351 Original change's description: > [wasm] Use work-stealing queues for background compilation > > This reduces contention on the mutex protecting the {CompilationState} > by splitting the compilation unit queues into several queues (one per > background task). Each task executes its own queue first, and steals > from other queues once it runs out of work. > The implementation of the set of work-stealing queues is encapsulated > in the new {CompilationUnitQueues} class in module-compiler.cc. > > R=​titzer@chromium.org > > Bug: v8:8916 > Change-Id: I5a40314917e7d4a35d7ff9e8ec124ec212beacab > Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1543350 > Commit-Queue: Clemens Hammacher > Reviewed-by: Michael Starzinger > Cr-Commit-Position: refs/heads/master@{#60572} TBR=mstarzinger@chromium.org,titzer@chromium.org,clemensh@chromium.org Change-Id: I92d8862ec0ff0002160a62ed9df9b044fc95c93d No-Presubmit: true No-Tree-Checks: true No-Try: true Bug: v8:8916 Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1549166 Reviewed-by: Sigurd Schneider Commit-Queue: Sigurd Schneider Cr-Commit-Position: refs/heads/master@{#60576} --- src/wasm/module-compiler.cc | 380 ++++++++++++------------------------ 1 file changed, 125 insertions(+), 255 deletions(-) diff --git a/src/wasm/module-compiler.cc b/src/wasm/module-compiler.cc index ff228771f5..0c4211c51e 100644 --- a/src/wasm/module-compiler.cc +++ b/src/wasm/module-compiler.cc @@ -4,13 +4,10 @@ #include "src/wasm/module-compiler.h" -#include - #include "src/api.h" #include "src/asmjs/asm-js.h" #include "src/base/enum-set.h" #include "src/base/optional.h" -#include "src/base/platform/mutex.h" #include "src/base/template-utils.h" #include "src/base/utils/random-number-generator.h" #include "src/compiler/wasm-compiler.h" @@ -114,170 +111,6 @@ class BackgroundCompileScope { std::shared_ptr const native_module_; }; -// A set of work-stealing queues (vectors of units). Each background compile -// task owns one of the queues and steals from all others once its own queue -// runs empty. -class CompilationUnitQueues { - public: - explicit CompilationUnitQueues(int max_tasks) : queues_(max_tasks) { - DCHECK_LT(0, max_tasks); - for (int task_id = 0; task_id < max_tasks; ++task_id) { - queues_[task_id].next_steal_task_id_ = next_task_id(task_id); - } - for (auto& atomic_counter : num_units_) { - std::atomic_init(&atomic_counter, size_t{0}); - } - } - - std::unique_ptr GetNextUnit(int task_id) { - DCHECK_LE(0, task_id); - DCHECK_GT(queues_.size(), task_id); - - // As long as any lower-tier units are outstanding we need to steal them - // before executing own higher-tier units. - for (int tier = GetLowestTierWithUnits(); tier < kNumTiers; ++tier) { - Queue* queue = &queues_[task_id]; - // First, check whether our own queue has a unit of the wanted tier. If - // so, return it, otherwise get the task id to steal from. - int steal_task_id; - { - base::MutexGuard mutex_guard(&queue->mutex_); - if (!queue->units_[tier].empty()) { - auto unit = std::move(queue->units_[tier].back()); - queue->units_[tier].pop_back(); - DecrementUnitCount(tier); - return unit; - } - steal_task_id = queue->next_steal_task_id_; - } - - // Try to steal from all other queues. If none of this succeeds, the outer - // loop increases the tier and retries. - size_t steal_trials = queues_.size(); - for (; steal_trials > 0; - --steal_trials, steal_task_id = next_task_id(steal_task_id)) { - if (steal_task_id == task_id) continue; - if (auto unit = StealUnitsAndGetFirst(task_id, steal_task_id, tier)) { - DecrementUnitCount(tier); - return unit; - } - } - } - return {}; - } - - void AddUnits(Vector> baseline_units, - Vector> top_tier_units) { - DCHECK_LT(0, baseline_units.size() + top_tier_units.size()); - // Add to the individual queues in a round-robin fashion. No special care is - // taken to balance them; they will be balanced by work stealing. - int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed); - while (!next_queue_to_add.compare_exchange_weak( - queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) { - // Retry with updated {queue_to_add}. - } - - Queue* queue = &queues_[queue_to_add]; - base::MutexGuard guard(&queue->mutex_); - if (!baseline_units.is_empty()) { - queue->units_[kBaseline].insert( - queue->units_[kBaseline].end(), - std::make_move_iterator(baseline_units.begin()), - std::make_move_iterator(baseline_units.end())); - num_units_[kBaseline].fetch_add(baseline_units.size(), - std::memory_order_relaxed); - } - if (!top_tier_units.is_empty()) { - queue->units_[kTopTier].insert( - queue->units_[kTopTier].end(), - std::make_move_iterator(top_tier_units.begin()), - std::make_move_iterator(top_tier_units.end())); - num_units_[kTopTier].fetch_add(top_tier_units.size(), - std::memory_order_relaxed); - } - } - - // Get the current total number of units in all queues. This is only a - // momentary snapshot, it's not guaranteed that {GetNextUnit} returns a unit - // if this method returns non-zero. - size_t GetTotalSize() const { - size_t total = 0; - for (auto& atomic_counter : num_units_) { - total += atomic_counter.load(std::memory_order_relaxed); - } - return total; - } - - private: - // Store tier in int so we can easily loop over it: - static constexpr int kBaseline = 0; - static constexpr int kTopTier = 1; - static constexpr int kNumTiers = kTopTier + 1; - - struct Queue { - base::Mutex mutex_; - - // Protected by {mutex_}: - std::vector> units_[kNumTiers]; - int next_steal_task_id_; - // End of fields protected by {mutex_}. - }; - - std::vector queues_; - - std::atomic num_units_[kNumTiers]; - std::atomic next_queue_to_add{0}; - - int next_task_id(int task_id) const { - int next = task_id + 1; - return next == static_cast(queues_.size()) ? 0 : next; - } - - int GetLowestTierWithUnits() const { - for (int tier = 0; tier < kNumTiers; ++tier) { - if (num_units_[tier].load(std::memory_order_relaxed) > 0) return tier; - } - return kNumTiers; - } - - void DecrementUnitCount(int tier) { - size_t old_units_count = num_units_[tier].fetch_sub(1); - DCHECK_LE(1, old_units_count); - USE(old_units_count); - } - - // Steal units of {wanted_tier} from {steal_from_task_id} to {task_id}. Return - // first stolen unit (rest put in queue of {task_id}), or {nullptr} if - // {steal_from_task_id} had no units of {wanted_tier}. - std::unique_ptr StealUnitsAndGetFirst( - int task_id, int steal_from_task_id, int wanted_tier) { - DCHECK_NE(task_id, steal_from_task_id); - std::vector> stolen; - { - Queue* steal_queue = &queues_[steal_from_task_id]; - base::MutexGuard guard(&steal_queue->mutex_); - if (steal_queue->units_[wanted_tier].empty()) return {}; - auto* steal_from_vector = &steal_queue->units_[wanted_tier]; - size_t remaining = steal_from_vector->size() / 2; - stolen.assign( - std::make_move_iterator(steal_from_vector->begin()) + remaining, - std::make_move_iterator(steal_from_vector->end())); - steal_from_vector->resize(remaining); - } - DCHECK(!stolen.empty()); - auto returned_unit = std::move(stolen.back()); - stolen.pop_back(); - Queue* queue = &queues_[task_id]; - base::MutexGuard guard(&queue->mutex_); - auto* target_queue = &queue->units_[wanted_tier]; - target_queue->insert(target_queue->end(), - std::make_move_iterator(stolen.begin()), - std::make_move_iterator(stolen.end())); - queue->next_steal_task_id_ = next_task_id(steal_from_task_id); - return returned_unit; - } -}; - // The {CompilationStateImpl} keeps track of the compilation state of the // owning NativeModule, i.e. which functions are left to be compiled. // It contains a task manager to allow parallel and asynchronous background @@ -304,18 +137,19 @@ class CompilationStateImpl { // Inserts new functions to compile and kicks off compilation. void AddCompilationUnits( - Vector> baseline_units, - Vector> top_tier_units); + std::vector>& baseline_units, + std::vector>& top_tier_units); void AddTopTierCompilationUnit(std::unique_ptr); - std::unique_ptr GetNextCompilationUnit(int task_id); + std::unique_ptr GetNextCompilationUnit(); void FinishUnit(WasmCompilationResult); void FinishUnits(Vector); - void OnBackgroundTaskStopped(int task_id, const WasmFeatures& detected); + void ReportDetectedFeatures(const WasmFeatures& detected); + void OnBackgroundTaskStopped(const WasmFeatures& detected); void PublishDetectedFeatures(Isolate* isolate, const WasmFeatures& detected); - // Hold {mutex_} when calling {RestartBackgroundTasksLocked}. - void RestartBackgroundTasksLocked(); + void RestartBackgroundCompileTask(); + void RestartBackgroundTasks(); void SetError(); @@ -354,10 +188,6 @@ class CompilationStateImpl { // using relaxed semantics. std::atomic compile_failed_{false}; - const int max_background_tasks_ = 0; - - CompilationUnitQueues compilation_unit_queues_; - // This mutex protects all information of this {CompilationStateImpl} which is // being accessed concurrently. mutable base::Mutex mutex_; @@ -365,8 +195,10 @@ class CompilationStateImpl { ////////////////////////////////////////////////////////////////////////////// // Protected by {mutex_}: - // Set of unused task ids; <= {max_background_tasks_} many. - std::vector available_task_ids_; + std::vector> baseline_compilation_units_; + std::vector> top_tier_compilation_units_; + + int num_background_tasks_ = 0; // Features detected to be used in this module. Features can be detected // as a module is being compiled. @@ -396,6 +228,8 @@ class CompilationStateImpl { // End of fields protected by {callbacks_mutex_}. ////////////////////////////////////////////////////////////////////////////// + + const int max_background_tasks_ = 0; }; CompilationStateImpl* Impl(CompilationState* compilation_state) { @@ -572,8 +406,7 @@ class CompilationUnitBuilder { bool Commit() { if (baseline_units_.empty() && tiering_units_.empty()) return false; - compilation_state()->AddCompilationUnits(VectorOf(baseline_units_), - VectorOf(tiering_units_)); + compilation_state()->AddCompilationUnits(baseline_units_, tiering_units_); Clear(); return true; } @@ -682,8 +515,8 @@ double MonotonicallyIncreasingTimeInMs() { base::Time::kMillisecondsPerSecond; } -// Run by the main thread to take part in compilation. Only used for synchronous -// compilation. +// Run by each compilation task and by the main thread (i.e. in both +// foreground and background threads). bool FetchAndExecuteCompilationUnit(CompilationEnv* env, NativeModule* native_module, CompilationStateImpl* compilation_state, @@ -691,12 +524,9 @@ bool FetchAndExecuteCompilationUnit(CompilationEnv* env, Counters* counters) { DisallowHeapAccess no_heap_access; - // The main thread uses task id 0, which might collide with one of the - // background tasks. This is fine, as it will only cause some contention on - // the one queue, but work otherwise. - constexpr int kMainThreadTaskId = 0; std::unique_ptr unit = - compilation_state->GetNextCompilationUnit(kMainThreadTaskId); + compilation_state->GetNextCompilationUnit(); + if (unit == nullptr) return false; WasmCompilationResult result = unit->ExecuteCompilation( @@ -810,7 +640,7 @@ void CompileInParallel(Isolate* isolate, NativeModule* native_module) { // 1) The main thread allocates a compilation unit for each wasm function // and stores them in the vector {compilation_units} within the // {compilation_state}. By adding units to the {compilation_state}, new - // {BackgroundCompileTask} instances are spawned which run on + // {BackgroundCompileTasks} instances are spawned which run on // the background threads. // 2) The background threads and the main thread pick one compilation unit at // a time and execute the parallel phase of the compilation unit. @@ -920,12 +750,10 @@ class BackgroundCompileTask : public CancelableTask { public: explicit BackgroundCompileTask(CancelableTaskManager* manager, std::shared_ptr token, - std::shared_ptr async_counters, - int task_id) + std::shared_ptr async_counters) : CancelableTask(manager), token_(std::move(token)), - async_counters_(std::move(async_counters)), - task_id_(task_id) {} + async_counters_(std::move(async_counters)) {} void RunInternal() override { TRACE_COMPILE("(3b) Compiling...\n"); @@ -950,11 +778,10 @@ class BackgroundCompileTask : public CancelableTask { env.emplace(compile_scope.native_module()->CreateCompilationEnv()); wire_bytes = compile_scope.compilation_state()->GetWireBytesStorage(); module = compile_scope.native_module()->shared_module(); - unit = - compile_scope.compilation_state()->GetNextCompilationUnit(task_id_); + unit = compile_scope.compilation_state()->GetNextCompilationUnit(); if (unit == nullptr) { compile_scope.compilation_state()->OnBackgroundTaskStopped( - task_id_, detected_features); + detected_features); return; } } @@ -985,7 +812,7 @@ class BackgroundCompileTask : public CancelableTask { // Compile error. compile_scope.compilation_state()->SetError(); compile_scope.compilation_state()->OnBackgroundTaskStopped( - task_id_, detected_features); + detected_features); compilation_failed = true; break; } @@ -994,18 +821,20 @@ class BackgroundCompileTask : public CancelableTask { publish_results(&compile_scope); } - // Get next unit. if (deadline < MonotonicallyIncreasingTimeInMs()) { - unit = nullptr; - } else { - unit = compile_scope.compilation_state()->GetNextCompilationUnit( - task_id_); + publish_results(&compile_scope); + compile_scope.compilation_state()->ReportDetectedFeatures( + detected_features); + compile_scope.compilation_state()->RestartBackgroundCompileTask(); + return; } + // Get next unit. + unit = compile_scope.compilation_state()->GetNextCompilationUnit(); if (unit == nullptr) { publish_results(&compile_scope); compile_scope.compilation_state()->OnBackgroundTaskStopped( - task_id_, detected_features); + detected_features); return; } } @@ -1019,7 +848,6 @@ class BackgroundCompileTask : public CancelableTask { private: std::shared_ptr token_; std::shared_ptr async_counters_; - const int task_id_; }; } // namespace @@ -1842,15 +1670,7 @@ CompilationStateImpl::CompilationStateImpl( async_counters_(std::move(async_counters)), max_background_tasks_(std::max( 1, std::min(FLAG_wasm_num_compilation_tasks, - V8::GetCurrentPlatform()->NumberOfWorkerThreads()))), - compilation_unit_queues_(max_background_tasks_), - available_task_ids_(max_background_tasks_) { - for (int i = 0; i < max_background_tasks_; ++i) { - // Ids are popped on task creation, so reverse this list. This ensures that - // the first background task gets id 0. - available_task_ids_[i] = max_background_tasks_ - 1 - i; - } -} + V8::GetCurrentPlatform()->NumberOfWorkerThreads()))) {} void CompilationStateImpl::AbortCompilation() { background_compile_token_->Cancel(); @@ -1875,22 +1695,63 @@ void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) { } void CompilationStateImpl::AddCompilationUnits( - Vector> baseline_units, - Vector> top_tier_units) { - compilation_unit_queues_.AddUnits(baseline_units, top_tier_units); + std::vector>& baseline_units, + std::vector>& top_tier_units) { + { + base::MutexGuard guard(&mutex_); - base::MutexGuard guard(&mutex_); - RestartBackgroundTasksLocked(); + DCHECK_IMPLIES(compile_mode_ == CompileMode::kRegular, + top_tier_compilation_units_.empty()); + + baseline_compilation_units_.insert( + baseline_compilation_units_.end(), + std::make_move_iterator(baseline_units.begin()), + std::make_move_iterator(baseline_units.end())); + if (!top_tier_units.empty()) { + top_tier_compilation_units_.insert( + top_tier_compilation_units_.end(), + std::make_move_iterator(top_tier_units.begin()), + std::make_move_iterator(top_tier_units.end())); + } + } + + RestartBackgroundTasks(); } void CompilationStateImpl::AddTopTierCompilationUnit( std::unique_ptr unit) { - AddCompilationUnits({}, {&unit, 1}); + { + base::MutexGuard guard(&mutex_); + + DCHECK_EQ(compile_mode_, CompileMode::kTiering); + DCHECK(FLAG_wasm_lazy_compilation || FLAG_asm_wasm_lazy_compilation || + native_module_->enabled_features().compilation_hints); + + top_tier_compilation_units_.emplace_back(std::move(unit)); + } + + RestartBackgroundTasks(); } std::unique_ptr -CompilationStateImpl::GetNextCompilationUnit(int task_id) { - return compilation_unit_queues_.GetNextUnit(task_id); +CompilationStateImpl::GetNextCompilationUnit() { + base::MutexGuard guard(&mutex_); + + std::vector>* units = nullptr; + + if (!baseline_compilation_units_.empty()) { + units = &baseline_compilation_units_; + } else if (!top_tier_compilation_units_.empty()) { + units = &top_tier_compilation_units_; + } else { + return std::unique_ptr(); + } + DCHECK_NOT_NULL(units); + DCHECK(!units->empty()); + + std::unique_ptr unit = std::move(units->back()); + units->pop_back(); + return unit; } void CompilationStateImpl::FinishUnit(WasmCompilationResult result) { @@ -1978,19 +1839,31 @@ void CompilationStateImpl::FinishUnits( } } -void CompilationStateImpl::OnBackgroundTaskStopped( - int task_id, const WasmFeatures& detected) { - base::MutexGuard guard(&mutex_); - DCHECK_EQ(0, std::count(available_task_ids_.begin(), - available_task_ids_.end(), task_id)); - DCHECK_GT(max_background_tasks_, available_task_ids_.size()); - available_task_ids_.push_back(task_id); - UnionFeaturesInto(&detected_features_, detected); +void CompilationStateImpl::RestartBackgroundCompileTask() { + auto task = + native_module_->engine()->NewBackgroundCompileTask( + background_compile_token_, async_counters_); - // The background task could have stopped while we were adding new units, or - // because it reached its deadline. In both cases we need to restart tasks to - // avoid a potential deadlock. - RestartBackgroundTasksLocked(); + if (baseline_compilation_finished()) { + V8::GetCurrentPlatform()->CallLowPriorityTaskOnWorkerThread( + std::move(task)); + } else { + V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(task)); + } +} + +void CompilationStateImpl::ReportDetectedFeatures( + const WasmFeatures& detected) { + base::MutexGuard guard(&mutex_); + UnionFeaturesInto(&detected_features_, detected); +} + +void CompilationStateImpl::OnBackgroundTaskStopped( + const WasmFeatures& detected) { + base::MutexGuard guard(&mutex_); + DCHECK_LE(1, num_background_tasks_); + --num_background_tasks_; + UnionFeaturesInto(&detected_features_, detected); } void CompilationStateImpl::PublishDetectedFeatures( @@ -2003,30 +1876,27 @@ void CompilationStateImpl::PublishDetectedFeatures( UpdateFeatureUseCounts(isolate, detected_features_); } -void CompilationStateImpl::RestartBackgroundTasksLocked() { - // The caller must hold {mutex_}. - DCHECK(!mutex_.TryLock()); - // Explicit fast path (quite common): If no more task ids are available (i.e. - // {max_background_tasks_} tasks are already running), spawn nothing. - if (available_task_ids_.empty()) return; - // No need to restart tasks if compilation already failed. - if (failed()) return; +void CompilationStateImpl::RestartBackgroundTasks() { + int num_restart; + { + base::MutexGuard guard(&mutex_); + // No need to restart tasks if compilation already failed. + if (failed()) return; - size_t max_num_restart = compilation_unit_queues_.GetTotalSize(); - - while (!available_task_ids_.empty() && max_num_restart-- > 0) { - int task_id = available_task_ids_.back(); - available_task_ids_.pop_back(); - auto task = native_module_->engine() - ->NewBackgroundCompileTask( - background_compile_token_, async_counters_, task_id); - - if (baseline_compilation_finished()) { - V8::GetCurrentPlatform()->CallLowPriorityTaskOnWorkerThread( - std::move(task)); - } else { - V8::GetCurrentPlatform()->CallOnWorkerThread(std::move(task)); + DCHECK_LE(num_background_tasks_, max_background_tasks_); + if (num_background_tasks_ == max_background_tasks_) return; + size_t num_compilation_units = + baseline_compilation_units_.size() + top_tier_compilation_units_.size(); + num_restart = max_background_tasks_ - num_background_tasks_; + DCHECK_LE(0, num_restart); + if (num_compilation_units < static_cast(num_restart)) { + num_restart = static_cast(num_compilation_units); } + num_background_tasks_ += num_restart; + } + + for (; num_restart > 0; --num_restart) { + RestartBackgroundCompileTask(); } }