From a75fb00b672ae3837eac7e8e36da4236009a2f33 Mon Sep 17 00:00:00 2001 From: Clemens Backes Date: Thu, 17 Feb 2022 11:34:36 +0100 Subject: [PATCH] [wasm] Merge deserialization tasks In order to create less tasks that each need to swap permissions for writing to the code space, merge the two {CopyAndRelocTask} and {PublishTask} into a single {DeserializeCodeTask}. This also makes the code a lot shorter, and removes stress from the scheduler. R=ahaas@chromium.org Bug: v8:11974, chromium:1297999 Change-Id: I8866bf7225b0bc2dd4caef79e64cacca9de15519 Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3468902 Reviewed-by: Thibaud Michaud Commit-Queue: Clemens Backes Cr-Commit-Position: refs/heads/main@{#79139} --- src/wasm/wasm-serialization.cc | 102 +++++++++++++++------------------ 1 file changed, 45 insertions(+), 57 deletions(-) diff --git a/src/wasm/wasm-serialization.cc b/src/wasm/wasm-serialization.cc index c2c3bd4465..a8a31a3a1d 100644 --- a/src/wasm/wasm-serialization.cc +++ b/src/wasm/wasm-serialization.cc @@ -533,13 +533,13 @@ class DeserializationQueue { return units; } - size_t NumBatches() { + size_t NumBatches() const { base::MutexGuard guard(&mutex_); return queue_.size(); } private: - base::Mutex mutex_; + mutable base::Mutex mutex_; std::queue> queue_; }; @@ -560,8 +560,7 @@ class V8_EXPORT_PRIVATE NativeModuleDeserializer { } private: - friend class CopyAndRelocTask; - friend class PublishTask; + friend class DeserializeCodeTask; void ReadHeader(Reader* reader); DeserializationUnit ReadCode(int fn_index, Reader* reader); @@ -581,67 +580,64 @@ class V8_EXPORT_PRIVATE NativeModuleDeserializer { std::vector liftoff_functions_; }; -class CopyAndRelocTask : public JobTask { +class DeserializeCodeTask : public JobTask { public: - CopyAndRelocTask(NativeModuleDeserializer* deserializer, - DeserializationQueue* from_queue, - DeserializationQueue* to_queue, - std::shared_ptr publish_handle) - : deserializer_(deserializer), - from_queue_(from_queue), - to_queue_(to_queue), - publish_handle_(std::move(publish_handle)) {} + DeserializeCodeTask(NativeModuleDeserializer* deserializer, + DeserializationQueue* reloc_queue) + : deserializer_(deserializer), reloc_queue_(reloc_queue) {} void Run(JobDelegate* delegate) override { CodeSpaceWriteScope code_space_write_scope(deserializer_->native_module_); do { - auto batch = from_queue_->Pop(); + // Repeatedly publish everything that was copied already. + TryPublishing(delegate); + + auto batch = reloc_queue_->Pop(); if (batch.empty()) break; for (const auto& unit : batch) { deserializer_->CopyAndRelocate(unit); } - to_queue_->Add(std::move(batch)); - publish_handle_->NotifyConcurrencyIncrease(); + publish_queue_.Add(std::move(batch)); + delegate->NotifyConcurrencyIncrease(); } while (!delegate->ShouldYield()); } size_t GetMaxConcurrency(size_t /* worker_count */) const override { - return from_queue_->NumBatches(); + // Number of copy&reloc batches, plus 1 if there is also something to + // publish. + bool publish = publishing_.load(std::memory_order_relaxed) == false && + publish_queue_.NumBatches() > 0; + return reloc_queue_->NumBatches() + (publish ? 1 : 0); } private: - NativeModuleDeserializer* const deserializer_; - DeserializationQueue* const from_queue_; - DeserializationQueue* const to_queue_; - std::shared_ptr const publish_handle_; -}; + void TryPublishing(JobDelegate* delegate) { + // Publishing is sequential, so only start publishing if no one else is. + if (publishing_.exchange(true, std::memory_order_relaxed)) return; -class PublishTask : public JobTask { - public: - PublishTask(NativeModuleDeserializer* deserializer, - DeserializationQueue* from_queue) - : deserializer_(deserializer), from_queue_(from_queue) {} - - void Run(JobDelegate* delegate) override { - CodeSpaceWriteScope code_space_write_scope(deserializer_->native_module_); WasmCodeRefScope code_scope; - do { - auto to_publish = from_queue_->PopAll(); - if (to_publish.empty()) break; - deserializer_->Publish(std::move(to_publish)); - } while (!delegate->ShouldYield()); + while (true) { + bool yield = false; + while (!yield) { + auto to_publish = publish_queue_.PopAll(); + if (to_publish.empty()) break; + deserializer_->Publish(std::move(to_publish)); + yield = delegate->ShouldYield(); + } + publishing_.store(false, std::memory_order_relaxed); + if (yield) break; + // After finishing publishing, check again if new work arrived in the mean + // time. If so, continue publishing. + if (publish_queue_.NumBatches() == 0) break; + if (publishing_.exchange(true, std::memory_order_relaxed)) break; + // We successfully reset {publishing_} from {false} to {true}. + } } - size_t GetMaxConcurrency(size_t worker_count) const override { - // Publishing is sequential anyway, so never return more than 1. If a - // worker is already running, don't spawn a second one. - if (worker_count > 0) return 0; - return std::min(size_t{1}, from_queue_->NumBatches()); - } - - private: NativeModuleDeserializer* const deserializer_; - DeserializationQueue* const from_queue_; + DeserializationQueue* const reloc_queue_; + DeserializationQueue publish_queue_; + std::atomic publishing_{false}; }; NativeModuleDeserializer::NativeModuleDeserializer(NativeModule* native_module) @@ -660,17 +656,10 @@ bool NativeModuleDeserializer::Read(Reader* reader) { WasmCodeRefScope wasm_code_ref_scope; DeserializationQueue reloc_queue; - DeserializationQueue publish_queue; - std::shared_ptr publish_handle = V8::GetCurrentPlatform()->PostJob( + std::unique_ptr job_handle = V8::GetCurrentPlatform()->PostJob( TaskPriority::kUserVisible, - std::make_unique(this, &publish_queue)); - - std::unique_ptr copy_and_reloc_handle = - V8::GetCurrentPlatform()->PostJob( - TaskPriority::kUserVisible, - std::make_unique(this, &reloc_queue, &publish_queue, - publish_handle)); + std::make_unique(this, &reloc_queue)); std::vector batch; const byte* batch_start = reader->current_location(); @@ -685,7 +674,7 @@ bool NativeModuleDeserializer::Read(Reader* reader) { reloc_queue.Add(std::move(batch)); DCHECK(batch.empty()); batch_start = reader->current_location(); - copy_and_reloc_handle->NotifyConcurrencyIncrease(); + job_handle->NotifyConcurrencyIncrease(); } } @@ -696,12 +685,11 @@ bool NativeModuleDeserializer::Read(Reader* reader) { if (!batch.empty()) { reloc_queue.Add(std::move(batch)); - copy_and_reloc_handle->NotifyConcurrencyIncrease(); + job_handle->NotifyConcurrencyIncrease(); } // Wait for all tasks to finish, while participating in their work. - copy_and_reloc_handle->Join(); - publish_handle->Join(); + job_handle->Join(); return reader->current_size() == 0; }