// Copyright 2020 the V8 project authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "src/libplatform/default-job.h" #include "src/base/platform/condition-variable.h" #include "src/base/platform/platform.h" #include "src/libplatform/default-platform.h" #include "testing/gtest/include/gtest/gtest.h" namespace v8 { namespace platform { namespace default_job_unittest { // Verify that Cancel() on a job stops running the worker task and causes // current workers to yield. TEST(DefaultJobTest, CancelJob) { static constexpr size_t kTooManyTasks = 1000; static constexpr size_t kMaxTask = 4; DefaultPlatform platform(kMaxTask); // This Job notifies |threads_running| once started and loops until // ShouldYield() returns true, and then returns. class JobTest : public JobTask { public: ~JobTest() override = default; void Run(JobDelegate* delegate) override { { base::MutexGuard guard(&mutex); worker_count++; } threads_running.NotifyOne(); while (!delegate->ShouldYield()) { } } size_t GetMaxConcurrency() const override { return max_concurrency.load(std::memory_order_relaxed); } base::Mutex mutex; base::ConditionVariable threads_running; size_t worker_count = 0; std::atomic_size_t max_concurrency{kTooManyTasks}; }; auto job = std::make_unique(); JobTest* job_raw = job.get(); auto state = std::make_shared( &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask); state->NotifyConcurrencyIncrease(); { base::MutexGuard guard(&job_raw->mutex); while (job_raw->worker_count < kMaxTask) { job_raw->threads_running.Wait(&job_raw->mutex); } EXPECT_EQ(kMaxTask, job_raw->worker_count); } state->CancelAndWait(); // Workers should return and this test should not hang. } // Verify that Join() on a job contributes to max concurrency and waits for all // workers to return. TEST(DefaultJobTest, JoinJobContributes) { static constexpr size_t kMaxTask = 4; DefaultPlatform platform(kMaxTask); // This Job notifies |threads_running| once started and blocks on a barrier // until kMaxTask + 1 threads reach that point, and then returns. class JobTest : public JobTask { public: ~JobTest() override = default; void Run(JobDelegate* delegate) override { base::MutexGuard guard(&mutex); worker_count++; threads_running.NotifyAll(); while (worker_count < kMaxTask + 1) threads_running.Wait(&mutex); --max_concurrency; } size_t GetMaxConcurrency() const override { return max_concurrency.load(std::memory_order_relaxed); } base::Mutex mutex; base::ConditionVariable threads_running; size_t worker_count = 0; std::atomic_size_t max_concurrency{kMaxTask + 1}; }; auto job = std::make_unique(); JobTest* job_raw = job.get(); auto state = std::make_shared( &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask); state->NotifyConcurrencyIncrease(); // The main thread contributing is necessary for |worker_count| to reach // kMaxTask + 1 thus, Join() should not hang. state->Join(); EXPECT_EQ(0U, job_raw->max_concurrency); } // Verify that calling NotifyConcurrencyIncrease() (re-)schedules tasks with the // intended concurrency. TEST(DefaultJobTest, JobNotifyConcurrencyIncrease) { static constexpr size_t kMaxTask = 4; DefaultPlatform platform(kMaxTask); // This Job notifies |threads_running| once started and blocks on a barrier // until kMaxTask threads reach that point, and then returns. class JobTest : public JobTask { public: ~JobTest() override = default; void Run(JobDelegate* delegate) override { base::MutexGuard guard(&mutex); worker_count++; threads_running.NotifyAll(); // Wait synchronously until |kMaxTask| workers reach this point. while (worker_count < kMaxTask) threads_running.Wait(&mutex); --max_concurrency; } size_t GetMaxConcurrency() const override { return max_concurrency.load(std::memory_order_relaxed); } base::Mutex mutex; base::ConditionVariable threads_running; bool continue_flag = false; size_t worker_count = 0; std::atomic_size_t max_concurrency{kMaxTask / 2}; }; auto job = std::make_unique(); JobTest* job_raw = job.get(); auto state = std::make_shared( &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask); state->NotifyConcurrencyIncrease(); { base::MutexGuard guard(&job_raw->mutex); while (job_raw->worker_count < kMaxTask / 2) job_raw->threads_running.Wait(&job_raw->mutex); EXPECT_EQ(kMaxTask / 2, job_raw->worker_count); job_raw->max_concurrency = kMaxTask; } state->NotifyConcurrencyIncrease(); // Workers should reach |continue_flag| and eventually return thus, Join() // should not hang. state->Join(); EXPECT_EQ(0U, job_raw->max_concurrency); } // Verify that Join() doesn't contribute if the Job is already finished. TEST(DefaultJobTest, FinishBeforeJoin) { static constexpr size_t kMaxTask = 4; DefaultPlatform platform(kMaxTask); // This Job notifies |threads_running| once started and returns. class JobTest : public JobTask { public: ~JobTest() override = default; void Run(JobDelegate* delegate) override { base::MutexGuard guard(&mutex); worker_count++; // Assert that main thread doesn't contribute in this test. EXPECT_NE(main_thread_id, base::OS::GetCurrentThreadId()); worker_ran.NotifyAll(); --max_concurrency; } size_t GetMaxConcurrency() const override { return max_concurrency.load(std::memory_order_relaxed); } const int main_thread_id = base::OS::GetCurrentThreadId(); base::Mutex mutex; base::ConditionVariable worker_ran; size_t worker_count = 0; std::atomic_size_t max_concurrency{kMaxTask * 5}; }; auto job = std::make_unique(); JobTest* job_raw = job.get(); auto state = std::make_shared( &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask); state->NotifyConcurrencyIncrease(); { base::MutexGuard guard(&job_raw->mutex); while (job_raw->worker_count < kMaxTask * 5) job_raw->worker_ran.Wait(&job_raw->mutex); EXPECT_EQ(kMaxTask * 5, job_raw->worker_count); } state->Join(); EXPECT_EQ(0U, job_raw->max_concurrency); } // Verify that destroying a DefaultJobHandle triggers a DCHECK if neither Join() // or Cancel() was called. TEST(DefaultJobTest, LeakHandle) { class JobTest : public JobTask { public: ~JobTest() override = default; void Run(JobDelegate* delegate) override {} size_t GetMaxConcurrency() const override { return 0; } }; DefaultPlatform platform(0); auto job = std::make_unique(); auto state = std::make_shared(&platform, std::move(job), TaskPriority::kUserVisible, 1); auto handle = std::make_unique(std::move(state)); #ifdef DEBUG EXPECT_DEATH_IF_SUPPORTED({ handle.reset(); }, ""); #endif // DEBUG handle->Join(); } } // namespace default_job_unittest } // namespace platform } // namespace v8