[Jobs API]: Implement missing Jobs pieces in the various v8 implementations.

- JobHandle::IsCompleted()
- JobDelegate::GetTaskId()
- worker_count passed as argument to GetMaxConcurrency().
  Jobs implementation must call the new GetMaxConcurrency(), but Jobs
  users aren't migrated yet.

Bug: chromium:1114823
Change-Id: I0f4295ccaf9eba866dd771f30e2e49aa3eae9551
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2352484
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: Ulan Degenbaev <ulan@chromium.org>
Cr-Commit-Position: refs/heads/master@{#69553}
This commit is contained in:
Etienne Pierre-doray 2020-08-13 17:14:37 -04:00 committed by Commit Bot
parent 718c110866
commit d77e12bcb9
9 changed files with 186 additions and 24 deletions

View File

@ -247,7 +247,7 @@ class JobTask {
* TODO(etiennep): Replace the version above by this once custom embedders are
* migrated.
*/
size_t GetMaxConcurrency(size_t worker_count) const {
virtual size_t GetMaxConcurrency(size_t worker_count) const {
return GetMaxConcurrency();
}
};

View File

@ -280,6 +280,10 @@ class DelayedTasksPlatform final : public Platform {
return job_task_->GetMaxConcurrency();
}
size_t GetMaxConcurrency(size_t worker_count) const override {
return job_task_->GetMaxConcurrency(worker_count);
}
private:
std::unique_ptr<JobTask> job_task_;
int32_t delay_ms_;

View File

@ -24,6 +24,7 @@ class DefaultJobHandle : public JobHandle {
if (thread_->joinable()) thread_->join();
}
void Cancel() override { Join(); }
bool IsCompleted() override { return !IsRunning(); }
bool IsRunning() override { return thread_->joinable(); }
private:
@ -111,6 +112,7 @@ std::unique_ptr<cppgc::JobHandle> DefaultPlatform::PostJob(
public:
bool ShouldYield() override { return false; }
void NotifyConcurrencyIncrease() override {}
uint8_t GetTaskId() override { return 0; }
} delegate;
if (task) task->Run(&delegate);

View File

@ -4,8 +4,38 @@
#include "src/libplatform/default-job.h"
#include "src/base/bits.h"
#include "src/base/macros.h"
namespace v8 {
namespace platform {
namespace {
// Capped to allow assigning task_ids from a bitfield.
constexpr size_t kMaxWorkersPerJob = 32;
} // namespace
DefaultJobState::JobDelegate::~JobDelegate() {
static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
"kInvalidTaskId must be outside of the range of valid task_ids "
"[0, kMaxWorkersPerJob)");
if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_);
}
uint8_t DefaultJobState::JobDelegate::GetTaskId() {
if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
return task_id_;
}
DefaultJobState::DefaultJobState(Platform* platform,
std::unique_ptr<JobTask> job_task,
TaskPriority priority,
size_t num_worker_threads)
: platform_(platform),
job_task_(std::move(job_task)),
priority_(priority),
num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }
@ -15,7 +45,7 @@ void DefaultJobState::NotifyConcurrencyIncrease() {
size_t num_tasks_to_post = 0;
{
base::MutexGuard guard(&mutex_);
const size_t max_concurrency = CappedMaxConcurrency();
const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
// Consider |pending_tasks_| to avoid posting too many tasks.
if (max_concurrency > (active_workers_ + pending_tasks_)) {
num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
@ -29,6 +59,32 @@ void DefaultJobState::NotifyConcurrencyIncrease() {
}
}
uint8_t DefaultJobState::AcquireTaskId() {
static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
"TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
uint32_t assigned_task_ids =
assigned_task_ids_.load(std::memory_order_relaxed);
DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
kMaxWorkersPerJob);
uint32_t new_assigned_task_ids = 0;
uint8_t task_id = 0;
do {
// Count trailing one bits. This is the id of the right-most 0-bit in
// |assigned_task_ids|.
task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
} while (!assigned_task_ids_.compare_exchange_weak(
assigned_task_ids, new_assigned_task_ids, std::memory_order_relaxed));
return task_id;
}
void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
uint32_t previous_task_ids =
assigned_task_ids_.fetch_and(~(uint32_t(1) << task_id));
DCHECK(previous_task_ids & (uint32_t(1) << task_id));
USE(previous_task_ids);
}
void DefaultJobState::Join() {
bool can_run = false;
{
@ -42,7 +98,8 @@ void DefaultJobState::Join() {
can_run = WaitForParticipationOpportunityLockRequired();
}
while (can_run) {
job_task_->Run(this);
DefaultJobState::JobDelegate delegate(this);
job_task_->Run(&delegate);
base::MutexGuard guard(&mutex_);
can_run = WaitForParticipationOpportunityLockRequired();
}
@ -58,12 +115,18 @@ void DefaultJobState::CancelAndWait() {
}
}
bool DefaultJobState::IsCompleted() {
base::MutexGuard guard(&mutex_);
return job_task_->GetMaxConcurrency(active_workers_) == 0 &&
active_workers_ == 0;
}
bool DefaultJobState::CanRunFirstTask() {
base::MutexGuard guard(&mutex_);
--pending_tasks_;
if (is_canceled_.load(std::memory_order_relaxed)) return false;
if (active_workers_ >=
std::min(job_task_->GetMaxConcurrency(), num_worker_threads_)) {
if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
num_worker_threads_)) {
return false;
}
// Acquire current worker.
@ -75,7 +138,7 @@ bool DefaultJobState::DidRunTask() {
size_t num_tasks_to_post = 0;
{
base::MutexGuard guard(&mutex_);
const size_t max_concurrency = CappedMaxConcurrency();
const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
if (is_canceled_.load(std::memory_order_relaxed) ||
active_workers_ > max_concurrency) {
// Release current worker and notify.
@ -102,10 +165,10 @@ bool DefaultJobState::DidRunTask() {
}
bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
size_t max_concurrency = CappedMaxConcurrency();
size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
while (active_workers_ > max_concurrency && active_workers_ > 1) {
worker_released_condition_.Wait(&mutex_);
max_concurrency = CappedMaxConcurrency();
max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
}
if (active_workers_ <= max_concurrency) return true;
DCHECK_EQ(1U, active_workers_);
@ -115,8 +178,9 @@ bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
return false;
}
size_t DefaultJobState::CappedMaxConcurrency() const {
return std::min(job_task_->GetMaxConcurrency(), num_worker_threads_);
size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
return std::min(job_task_->GetMaxConcurrency(worker_count),
num_worker_threads_);
}
void DefaultJobState::CallOnWorkerThread(std::unique_ptr<Task> task) {
@ -146,5 +210,7 @@ void DefaultJobHandle::Cancel() {
state_ = nullptr;
}
bool DefaultJobHandle::IsCompleted() { return state_->IsCompleted(); }
} // namespace platform
} // namespace v8

View File

@ -17,25 +17,41 @@ namespace v8 {
namespace platform {
class V8_PLATFORM_EXPORT DefaultJobState
: NON_EXPORTED_BASE(public JobDelegate),
public std::enable_shared_from_this<DefaultJobState> {
: public std::enable_shared_from_this<DefaultJobState> {
public:
class JobDelegate : public v8::JobDelegate {
public:
explicit JobDelegate(DefaultJobState* outer) : outer_(outer) {}
~JobDelegate();
void NotifyConcurrencyIncrease() override {
outer_->NotifyConcurrencyIncrease();
}
bool ShouldYield() override {
// Thread-safe but may return an outdated result.
return outer_->is_canceled_.load(std::memory_order_relaxed);
}
uint8_t GetTaskId() override;
private:
static constexpr uint8_t kInvalidTaskId =
std::numeric_limits<uint8_t>::max();
DefaultJobState* outer_;
uint8_t task_id_ = kInvalidTaskId;
};
DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task,
TaskPriority priority, size_t num_worker_threads)
: platform_(platform),
job_task_(std::move(job_task)),
priority_(priority),
num_worker_threads_(num_worker_threads) {}
TaskPriority priority, size_t num_worker_threads);
virtual ~DefaultJobState();
void NotifyConcurrencyIncrease() override;
bool ShouldYield() override {
// Thread-safe but may return an outdated result.
return is_canceled_.load(std::memory_order_relaxed);
}
void NotifyConcurrencyIncrease();
uint8_t AcquireTaskId();
void ReleaseTaskId(uint8_t task_id);
void Join();
void CancelAndWait();
bool IsCompleted();
// Must be called before running |job_task_| for the first time. If it returns
// true, then the worker thread must contribute and must call DidRunTask(), or
@ -55,7 +71,7 @@ class V8_PLATFORM_EXPORT DefaultJobState
// Returns GetMaxConcurrency() capped by the number of threads used by this
// job.
size_t CappedMaxConcurrency() const;
size_t CappedMaxConcurrency(size_t worker_count) const;
void CallOnWorkerThread(std::unique_ptr<Task> task);
@ -75,6 +91,8 @@ class V8_PLATFORM_EXPORT DefaultJobState
size_t num_worker_threads_;
// Signaled when a worker returns.
base::ConditionVariable worker_released_condition_;
std::atomic<uint32_t> assigned_task_ids_{0};
};
class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle {
@ -88,6 +106,7 @@ class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle {
void Join() override;
void Cancel() override;
bool IsCompleted() override;
bool IsRunning() override { return state_ != nullptr; }
private:
@ -105,9 +124,10 @@ class DefaultJobWorker : public Task {
void Run() override {
auto shared_state = state_.lock();
if (!shared_state) return;
DefaultJobState::JobDelegate delegate(shared_state.get());
if (!shared_state->CanRunFirstTask()) return;
do {
job_task_->Run(shared_state.get());
job_task_->Run(&delegate);
} while (shared_state->DidRunTask());
}

View File

@ -111,6 +111,7 @@ class MockPlatform final : public TestPlatform {
}
void Join() override { orig_handle_->Join(); }
void Cancel() override { orig_handle_->Cancel(); }
bool IsCompleted() override { return orig_handle_->IsCompleted(); }
bool IsRunning() override { return orig_handle_->IsRunning(); }
private:

View File

@ -73,6 +73,7 @@ class TestPlatform::TestJobHandle : public v8::JobHandle {
void NotifyConcurrencyIncrease() override {}
void Join() override { thread_->Join(); }
void Cancel() override { Join(); }
bool IsCompleted() override { return true; }
bool IsRunning() override { return true; }
private:

View File

@ -91,6 +91,7 @@ class TestPlatform : public Platform {
public:
bool ShouldYield() override { return false; }
void NotifyConcurrencyIncrease() override {}
uint8_t GetTaskId() override { return 0; }
} delegate;
if (task_) task_->Run(&delegate);

View File

@ -105,6 +105,46 @@ TEST(DefaultJobTest, JoinJobContributes) {
EXPECT_EQ(0U, job_raw->max_concurrency);
}
// Verify that Join() on a job that uses |worker_count| eventually converges
// and doesn't hang.
TEST(DefaultJobTest, WorkerCount) {
static constexpr size_t kMaxTask = 4;
DefaultPlatform platform(kMaxTask);
// This Job spawns a workers until the first worker task completes.
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {
base::MutexGuard guard(&mutex);
if (max_concurrency > 0) --max_concurrency;
}
size_t GetMaxConcurrency() const override {
DCHECK(false); // not called.
return 0;
}
size_t GetMaxConcurrency(size_t worker_count) const override {
return worker_count + max_concurrency.load(std::memory_order_relaxed);
}
base::Mutex mutex;
std::atomic_size_t max_concurrency{kMaxTask};
};
auto job = std::make_unique<JobTest>();
JobTest* job_raw = job.get();
auto state = std::make_shared<DefaultJobState>(
&platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
state->NotifyConcurrencyIncrease();
// GetMaxConcurrency() eventually returns 0 thus, Join() should not hang.
state->Join();
EXPECT_EQ(0U, job_raw->max_concurrency);
}
// Verify that calling NotifyConcurrencyIncrease() (re-)schedules tasks with the
// intended concurrency.
TEST(DefaultJobTest, JobNotifyConcurrencyIncrease) {
@ -228,6 +268,33 @@ TEST(DefaultJobTest, LeakHandle) {
handle->Join();
}
TEST(DefaultJobTest, AcquireTaskId) {
class JobTest : public JobTask {
public:
~JobTest() override = default;
void Run(JobDelegate* delegate) override {}
size_t GetMaxConcurrency() const override { return 0; }
};
DefaultPlatform platform(0);
auto job = std::make_unique<JobTest>();
auto state = std::make_shared<DefaultJobState>(&platform, std::move(job),
TaskPriority::kUserVisible, 1);
EXPECT_EQ(0U, state->AcquireTaskId());
EXPECT_EQ(1U, state->AcquireTaskId());
EXPECT_EQ(2U, state->AcquireTaskId());
EXPECT_EQ(3U, state->AcquireTaskId());
EXPECT_EQ(4U, state->AcquireTaskId());
state->ReleaseTaskId(1);
state->ReleaseTaskId(3);
EXPECT_EQ(1U, state->AcquireTaskId());
EXPECT_EQ(3U, state->AcquireTaskId());
EXPECT_EQ(5U, state->AcquireTaskId());
}
} // namespace default_job_unittest
} // namespace platform
} // namespace v8