From bd18ee7d520fbea49eae91270b79dd59ab592bd1 Mon Sep 17 00:00:00 2001 From: Camillo Bruni Date: Thu, 16 Dec 2021 18:59:27 +0100 Subject: [PATCH] [d8] Fix worker state race conditions We clear the worker state in the worker thread after processing all messages (and getting the terminate signal). This could cause a race condition when interacting with the worker from the main thread. This was previously working and broke with https://crrev.com/c/3318669 - Add is_joined_ variable which is mutex guarded - Simplify Worker::State - Mutex guard task_runner_ access Bug: v8:12487 Change-Id: Ib53e5a1a636cb29db50efdb63526b0023a5ea768 Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3345005 Reviewed-by: Leszek Swirski Commit-Queue: Camillo Bruni Cr-Commit-Position: refs/heads/main@{#78415} --- src/d8/d8.cc | 83 +++++++++++++++++++--------------------------------- src/d8/d8.h | 4 +-- 2 files changed, 32 insertions(+), 55 deletions(-) diff --git a/src/d8/d8.cc b/src/d8/d8.cc index 050cbdc78d..16bc8946ea 100644 --- a/src/d8/d8.cc +++ b/src/d8/d8.cc @@ -4007,8 +4007,8 @@ Worker::Worker(const char* script) : script_(i::StrDup(script)) { } Worker::~Worker() { + CHECK(state_.load() == State::kTerminated); DCHECK_NULL(isolate_); - delete thread_; thread_ = nullptr; delete[] script_; @@ -4019,16 +4019,15 @@ bool Worker::is_running() const { return state_.load() == State::kRunning; } bool Worker::StartWorkerThread(std::shared_ptr worker) { auto expected = State::kReady; - CHECK(worker->state_.compare_exchange_strong(expected, State::kRunning)); + CHECK( + worker->state_.compare_exchange_strong(expected, State::kPrepareRunning)); auto thread = new WorkerThread(worker); worker->thread_ = thread; - if (thread->Start()) { - // Wait until the worker is ready to receive messages. - worker->started_semaphore_.Wait(); - Shell::AddRunningWorker(std::move(worker)); - return true; - } - return false; + if (!thread->Start()) return false; + // Wait until the worker is ready to receive messages. + worker->started_semaphore_.Wait(); + Shell::AddRunningWorker(std::move(worker)); + return true; } void Worker::WorkerThread::Run() { @@ -4060,10 +4059,8 @@ class ProcessMessageTask : public i::CancelableTask { }; void Worker::PostMessage(std::unique_ptr data) { - if (!is_running()) return; - // Hold the worker_mutex_ so that the worker thread can't delete task_runner_ - // after we've checked is_running(). base::MutexGuard lock_guard(&worker_mutex_); + if (!is_running()) return; std::unique_ptr task(new ProcessMessageTask( task_manager_, shared_from_this(), std::move(data))); task_runner_->PostNonNestableTask(std::move(task)); @@ -4077,11 +4074,8 @@ class TerminateTask : public i::CancelableTask { void RunInternal() override { auto expected = Worker::State::kTerminating; - if (!worker_->state_.compare_exchange_strong(expected, - Worker::State::kTerminated)) { - // Thread was joined in the meantime. - CHECK_EQ(worker_->state_.load(), Worker::State::kTerminatingAndJoining); - } + CHECK(worker_->state_.compare_exchange_strong(expected, + Worker::State::kTerminated)); } private: @@ -4101,34 +4095,19 @@ std::unique_ptr Worker::GetMessage() { void Worker::TerminateAndWaitForThread() { Terminate(); - // Don't double-join a terminated thread. - auto expected = State::kTerminating; - if (!state_.compare_exchange_strong(expected, - State::kTerminatingAndJoining)) { - expected = State::kTerminated; - if (!state_.compare_exchange_strong(expected, - State::kTerminatingAndJoining)) { - // Avoid double-joining thread. - DCHECK(state_.load() == State::kTerminatingAndJoining || - state_.load() == State::kTerminatedAndJoined); - return; - } + { + base::MutexGuard lock_guard(&worker_mutex_); + // Prevent double-joining. + if (is_joined_) return; + is_joined_ = true; } - thread_->Join(); - expected = State::kTerminatingAndJoining; - CHECK(state_.compare_exchange_strong(expected, State::kTerminatedAndJoined)); } void Worker::Terminate() { + base::MutexGuard lock_guard(&worker_mutex_); auto expected = State::kRunning; if (!state_.compare_exchange_strong(expected, State::kTerminating)) return; - // Hold the worker_mutex_ so that the worker thread can't delete task_runner_ - // after we've checked state_. - base::MutexGuard lock_guard(&worker_mutex_); - CHECK(state_.load() == State::kTerminating || - state_.load() == State::kTerminatingAndJoining); - // Post a task to wake up the worker thread. std::unique_ptr task( new TerminateTask(task_manager_, shared_from_this())); task_runner_->PostTask(std::move(task)); @@ -4147,9 +4126,7 @@ void Worker::ProcessMessage(std::unique_ptr data) { context, String::NewFromUtf8Literal(isolate_, "onmessage", NewStringType::kInternalized)); Local onmessage; - if (!maybe_onmessage.ToLocal(&onmessage) || !onmessage->IsFunction()) { - return; - } + if (!maybe_onmessage.ToLocal(&onmessage) || !onmessage->IsFunction()) return; Local onmessage_fun = onmessage.As(); v8::TryCatch try_catch(isolate_); @@ -4180,12 +4157,14 @@ void Worker::ExecuteInThread() { create_params.array_buffer_allocator = Shell::array_buffer_allocator; create_params.experimental_attach_to_shared_isolate = Shell::shared_isolate; isolate_ = Isolate::New(create_params); - { - base::MutexGuard lock_guard(&worker_mutex_); - task_runner_ = g_default_platform->GetForegroundTaskRunner(isolate_); - task_manager_ = - reinterpret_cast(isolate_)->cancelable_task_manager(); - } + + task_runner_ = g_default_platform->GetForegroundTaskRunner(isolate_); + task_manager_ = + reinterpret_cast(isolate_)->cancelable_task_manager(); + + auto expected = State::kPrepareRunning; + CHECK(state_.compare_exchange_strong(expected, State::kRunning)); + // The Worker is now ready to receive messages. started_semaphore_.Signal(); @@ -4242,17 +4221,15 @@ void Worker::ExecuteInThread() { } Shell::CollectGarbage(isolate_); } - // TODO(cbruni): Check for unhandled promises here. + { - // Hold the mutex to ensure task_runner_ changes state - // atomically (see Worker::PostMessage which reads them). base::MutexGuard lock_guard(&worker_mutex_); - // Mark worker as terminated if it's still running. - auto expected = State::kRunning; - state_.compare_exchange_strong(expected, State::kTerminated); + state_.store(State::kTerminated); + CHECK(!is_running()); task_runner_.reset(); task_manager_ = nullptr; } + context_.Reset(); platform::NotifyIsolateShutdown(g_default_platform, isolate_); isolate_->Dispose(); diff --git a/src/d8/d8.h b/src/d8/d8.h index 61f44455fb..bbf317fcbc 100644 --- a/src/d8/d8.h +++ b/src/d8/d8.h @@ -211,11 +211,10 @@ class Worker : public std::enable_shared_from_this { enum class State { kReady, + kPrepareRunning, kRunning, kTerminating, kTerminated, - kTerminatingAndJoining, - kTerminatedAndJoined }; bool is_running() const; @@ -242,6 +241,7 @@ class Worker : public std::enable_shared_from_this { base::Thread* thread_ = nullptr; char* script_; std::atomic state_; + bool is_joined_ = false; // For signalling that the worker has started. base::Semaphore started_semaphore_{0};