[wasm] Merge deserialization tasks

In order to create less tasks that each need to swap permissions for
writing to the code space, merge the two {CopyAndRelocTask} and
{PublishTask} into a single {DeserializeCodeTask}.

This also makes the code a lot shorter, and removes stress from the
scheduler.

R=ahaas@chromium.org

Bug: v8:11974, chromium:1297999
Change-Id: I8866bf7225b0bc2dd4caef79e64cacca9de15519
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3468902
Reviewed-by: Thibaud Michaud <thibaudm@chromium.org>
Commit-Queue: Clemens Backes <clemensb@chromium.org>
Cr-Commit-Position: refs/heads/main@{#79139}
This commit is contained in:
Clemens Backes 2022-02-17 11:34:36 +01:00 committed by V8 LUCI CQ
parent f60ae6ed71
commit a75fb00b67

View File

@ -533,13 +533,13 @@ class DeserializationQueue {
return units;
}
size_t NumBatches() {
size_t NumBatches() const {
base::MutexGuard guard(&mutex_);
return queue_.size();
}
private:
base::Mutex mutex_;
mutable base::Mutex mutex_;
std::queue<std::vector<DeserializationUnit>> queue_;
};
@ -560,8 +560,7 @@ class V8_EXPORT_PRIVATE NativeModuleDeserializer {
}
private:
friend class CopyAndRelocTask;
friend class PublishTask;
friend class DeserializeCodeTask;
void ReadHeader(Reader* reader);
DeserializationUnit ReadCode(int fn_index, Reader* reader);
@ -581,67 +580,64 @@ class V8_EXPORT_PRIVATE NativeModuleDeserializer {
std::vector<int> liftoff_functions_;
};
class CopyAndRelocTask : public JobTask {
class DeserializeCodeTask : public JobTask {
public:
CopyAndRelocTask(NativeModuleDeserializer* deserializer,
DeserializationQueue* from_queue,
DeserializationQueue* to_queue,
std::shared_ptr<JobHandle> publish_handle)
: deserializer_(deserializer),
from_queue_(from_queue),
to_queue_(to_queue),
publish_handle_(std::move(publish_handle)) {}
DeserializeCodeTask(NativeModuleDeserializer* deserializer,
DeserializationQueue* reloc_queue)
: deserializer_(deserializer), reloc_queue_(reloc_queue) {}
void Run(JobDelegate* delegate) override {
CodeSpaceWriteScope code_space_write_scope(deserializer_->native_module_);
do {
auto batch = from_queue_->Pop();
// Repeatedly publish everything that was copied already.
TryPublishing(delegate);
auto batch = reloc_queue_->Pop();
if (batch.empty()) break;
for (const auto& unit : batch) {
deserializer_->CopyAndRelocate(unit);
}
to_queue_->Add(std::move(batch));
publish_handle_->NotifyConcurrencyIncrease();
publish_queue_.Add(std::move(batch));
delegate->NotifyConcurrencyIncrease();
} while (!delegate->ShouldYield());
}
size_t GetMaxConcurrency(size_t /* worker_count */) const override {
return from_queue_->NumBatches();
// Number of copy&reloc batches, plus 1 if there is also something to
// publish.
bool publish = publishing_.load(std::memory_order_relaxed) == false &&
publish_queue_.NumBatches() > 0;
return reloc_queue_->NumBatches() + (publish ? 1 : 0);
}
private:
NativeModuleDeserializer* const deserializer_;
DeserializationQueue* const from_queue_;
DeserializationQueue* const to_queue_;
std::shared_ptr<JobHandle> const publish_handle_;
};
void TryPublishing(JobDelegate* delegate) {
// Publishing is sequential, so only start publishing if no one else is.
if (publishing_.exchange(true, std::memory_order_relaxed)) return;
class PublishTask : public JobTask {
public:
PublishTask(NativeModuleDeserializer* deserializer,
DeserializationQueue* from_queue)
: deserializer_(deserializer), from_queue_(from_queue) {}
void Run(JobDelegate* delegate) override {
CodeSpaceWriteScope code_space_write_scope(deserializer_->native_module_);
WasmCodeRefScope code_scope;
do {
auto to_publish = from_queue_->PopAll();
if (to_publish.empty()) break;
deserializer_->Publish(std::move(to_publish));
} while (!delegate->ShouldYield());
while (true) {
bool yield = false;
while (!yield) {
auto to_publish = publish_queue_.PopAll();
if (to_publish.empty()) break;
deserializer_->Publish(std::move(to_publish));
yield = delegate->ShouldYield();
}
publishing_.store(false, std::memory_order_relaxed);
if (yield) break;
// After finishing publishing, check again if new work arrived in the mean
// time. If so, continue publishing.
if (publish_queue_.NumBatches() == 0) break;
if (publishing_.exchange(true, std::memory_order_relaxed)) break;
// We successfully reset {publishing_} from {false} to {true}.
}
}
size_t GetMaxConcurrency(size_t worker_count) const override {
// Publishing is sequential anyway, so never return more than 1. If a
// worker is already running, don't spawn a second one.
if (worker_count > 0) return 0;
return std::min(size_t{1}, from_queue_->NumBatches());
}
private:
NativeModuleDeserializer* const deserializer_;
DeserializationQueue* const from_queue_;
DeserializationQueue* const reloc_queue_;
DeserializationQueue publish_queue_;
std::atomic<bool> publishing_{false};
};
NativeModuleDeserializer::NativeModuleDeserializer(NativeModule* native_module)
@ -660,17 +656,10 @@ bool NativeModuleDeserializer::Read(Reader* reader) {
WasmCodeRefScope wasm_code_ref_scope;
DeserializationQueue reloc_queue;
DeserializationQueue publish_queue;
std::shared_ptr<JobHandle> publish_handle = V8::GetCurrentPlatform()->PostJob(
std::unique_ptr<JobHandle> job_handle = V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible,
std::make_unique<PublishTask>(this, &publish_queue));
std::unique_ptr<JobHandle> copy_and_reloc_handle =
V8::GetCurrentPlatform()->PostJob(
TaskPriority::kUserVisible,
std::make_unique<CopyAndRelocTask>(this, &reloc_queue, &publish_queue,
publish_handle));
std::make_unique<DeserializeCodeTask>(this, &reloc_queue));
std::vector<DeserializationUnit> batch;
const byte* batch_start = reader->current_location();
@ -685,7 +674,7 @@ bool NativeModuleDeserializer::Read(Reader* reader) {
reloc_queue.Add(std::move(batch));
DCHECK(batch.empty());
batch_start = reader->current_location();
copy_and_reloc_handle->NotifyConcurrencyIncrease();
job_handle->NotifyConcurrencyIncrease();
}
}
@ -696,12 +685,11 @@ bool NativeModuleDeserializer::Read(Reader* reader) {
if (!batch.empty()) {
reloc_queue.Add(std::move(batch));
copy_and_reloc_handle->NotifyConcurrencyIncrease();
job_handle->NotifyConcurrencyIncrease();
}
// Wait for all tasks to finish, while participating in their work.
copy_and_reloc_handle->Join();
publish_handle->Join();
job_handle->Join();
return reader->current_size() == 0;
}