Use mutex/condition variables to synchronize threads

Instead of semaphore and atomics

BUG=v8:3608
R=yangguo@chromium.org
LOG=n

Review URL: https://codereview.chromium.org/950323002

Cr-Commit-Position: refs/heads/master@{#26823}
This commit is contained in:
jochen 2015-02-24 05:35:13 -08:00 committed by Commit bot
parent 675f7d2d5a
commit 3a98175469
2 changed files with 44 additions and 90 deletions

View File

@ -43,8 +43,9 @@ void DisposeOptimizedCompileJob(OptimizedCompileJob* job,
class OptimizingCompilerThread::CompileTask : public v8::Task {
public:
explicit CompileTask(Isolate* isolate) : isolate_(isolate) {
base::NoBarrier_AtomicIncrement(
&isolate_->optimizing_compiler_thread()->ref_count_, 1);
OptimizingCompilerThread* thread = isolate_->optimizing_compiler_thread();
base::LockGuard<base::Mutex> lock_guard(&thread->ref_count_mutex_);
++thread->ref_count_;
}
virtual ~CompileTask() {}
@ -57,8 +58,6 @@ class OptimizingCompilerThread::CompileTask : public v8::Task {
DisallowHandleDereference no_deref;
OptimizingCompilerThread* thread = isolate_->optimizing_compiler_thread();
StopFlag flag = CONTINUE;
{
TimerEventScope<TimerEventRecompileConcurrent> timer(isolate_);
@ -66,32 +65,14 @@ class OptimizingCompilerThread::CompileTask : public v8::Task {
base::OS::Sleep(thread->recompilation_delay_);
}
OptimizedCompileJob* job = thread->NextInput(&flag);
switch (flag) {
case CONTINUE:
thread->CompileNext(job);
break;
case STOP:
case FLUSH: {
AllowHandleDereference allow_handle_dereference;
if (!job->info()->is_osr()) {
DisposeOptimizedCompileJob(job, true);
}
break;
}
thread->CompileNext(thread->NextInput(true));
}
{
base::LockGuard<base::Mutex> lock_guard(&thread->ref_count_mutex_);
if (--thread->ref_count_ == 0) {
thread->ref_count_zero_.NotifyOne();
}
}
if (flag == STOP) {
base::Release_Store(&thread->stop_thread_,
static_cast<base::AtomicWord>(CONTINUE));
thread->stop_semaphore_.Signal();
}
if (base::NoBarrier_AtomicIncrement(&thread->ref_count_, -1) == 0) {
thread->stop_semaphore_.Signal();
}
}
Isolate* isolate_;
@ -101,9 +82,12 @@ class OptimizingCompilerThread::CompileTask : public v8::Task {
OptimizingCompilerThread::~OptimizingCompilerThread() {
if (base::NoBarrier_AtomicIncrement(&ref_count_, -1) > 0) {
stop_semaphore_.Wait();
#ifdef DEBUG
{
base::LockGuard<base::Mutex> lock_guard(&ref_count_mutex_);
DCHECK_EQ(0, ref_count_);
}
#endif
DCHECK_EQ(0, input_queue_length_);
DeleteArray(input_queue_);
if (FLAG_concurrent_osr) {
@ -175,37 +159,18 @@ void OptimizingCompilerThread::Run() {
}
OptimizedCompileJob* OptimizingCompilerThread::NextInput(StopFlag* flag) {
OptimizedCompileJob* OptimizingCompilerThread::NextInput(
bool check_if_flushing) {
base::LockGuard<base::Mutex> access_input_queue_(&input_queue_mutex_);
if (input_queue_length_ == 0) {
if (flag) {
UNREACHABLE();
*flag = CONTINUE;
}
return NULL;
}
if (input_queue_length_ == 0) return NULL;
OptimizedCompileJob* job = input_queue_[InputQueueIndex(0)];
DCHECK_NOT_NULL(job);
input_queue_shift_ = InputQueueIndex(1);
input_queue_length_--;
if (flag) {
switch (static_cast<StopFlag>(base::Acquire_Load(&stop_thread_))) {
case CONTINUE:
*flag = CONTINUE;
break;
case FLUSH:
if (input_queue_length_ == 0) {
*flag = STOP;
} else {
*flag = FLUSH;
}
break;
case STOP:
UNREACHABLE();
*flag = CONTINUE;
break;
if (check_if_flushing) {
if (static_cast<StopFlag>(base::Acquire_Load(&stop_thread_)) != CONTINUE) {
if (!job->info()->is_osr()) DisposeOptimizedCompileJob(job, true);
return NULL;
}
}
return job;
@ -213,7 +178,7 @@ OptimizedCompileJob* OptimizingCompilerThread::NextInput(StopFlag* flag) {
void OptimizingCompilerThread::CompileNext(OptimizedCompileJob* job) {
DCHECK_NOT_NULL(job);
if (!job) return;
// The function may have already been optimized by OSR. Simply continue.
OptimizedCompileJob::Status status = job->OptimizeGraph();
@ -269,23 +234,16 @@ void OptimizingCompilerThread::FlushOsrBuffer(bool restore_function_code) {
void OptimizingCompilerThread::Flush() {
DCHECK(!IsOptimizerThread());
bool block = true;
if (job_based_recompilation_) {
if (FLAG_block_concurrent_recompilation) Unblock();
{
base::LockGuard<base::Mutex> lock(&input_queue_mutex_);
block = input_queue_length_ > 0;
if (block) {
base::Release_Store(&stop_thread_,
static_cast<base::AtomicWord>(FLUSH));
}
}
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(FLUSH));
if (FLAG_block_concurrent_recompilation) Unblock();
if (!job_based_recompilation_) {
input_queue_semaphore_.Signal();
stop_semaphore_.Wait();
} else {
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(FLUSH));
if (FLAG_block_concurrent_recompilation) Unblock();
base::LockGuard<base::Mutex> lock_guard(&ref_count_mutex_);
while (ref_count_ > 0) ref_count_zero_.Wait(&ref_count_mutex_);
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(CONTINUE));
}
if (!job_based_recompilation_) input_queue_semaphore_.Signal();
if (block) stop_semaphore_.Wait();
FlushOutputQueue(true);
if (FLAG_concurrent_osr) FlushOsrBuffer(true);
if (tracing_enabled_) {
@ -296,23 +254,16 @@ void OptimizingCompilerThread::Flush() {
void OptimizingCompilerThread::Stop() {
DCHECK(!IsOptimizerThread());
bool block = true;
if (job_based_recompilation_) {
if (FLAG_block_concurrent_recompilation) Unblock();
{
base::LockGuard<base::Mutex> lock(&input_queue_mutex_);
block = input_queue_length_ > 0;
if (block) {
base::Release_Store(&stop_thread_,
static_cast<base::AtomicWord>(FLUSH));
}
}
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(STOP));
if (FLAG_block_concurrent_recompilation) Unblock();
if (!job_based_recompilation_) {
input_queue_semaphore_.Signal();
stop_semaphore_.Wait();
} else {
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(STOP));
if (FLAG_block_concurrent_recompilation) Unblock();
base::LockGuard<base::Mutex> lock_guard(&ref_count_mutex_);
while (ref_count_ > 0) ref_count_zero_.Wait(&ref_count_mutex_);
base::Release_Store(&stop_thread_, static_cast<base::AtomicWord>(CONTINUE));
}
if (!job_based_recompilation_) input_queue_semaphore_.Signal();
if (block) stop_semaphore_.Wait();
if (recompilation_delay_ != 0) {
// At this point the optimizing compiler thread's event loop has stopped.

View File

@ -6,6 +6,7 @@
#define V8_OPTIMIZING_COMPILER_THREAD_H_
#include "src/base/atomicops.h"
#include "src/base/platform/condition-variable.h"
#include "src/base/platform/mutex.h"
#include "src/base/platform/platform.h"
#include "src/base/platform/time.h"
@ -38,7 +39,7 @@ class OptimizingCompilerThread : public base::Thread {
osr_hits_(0),
osr_attempts_(0),
blocked_jobs_(0),
ref_count_(1),
ref_count_(0),
tracing_enabled_(FLAG_trace_concurrent_recompilation),
job_based_recompilation_(FLAG_job_based_recompilation),
recompilation_delay_(FLAG_concurrent_recompilation_delay) {
@ -96,7 +97,7 @@ class OptimizingCompilerThread : public base::Thread {
void FlushOutputQueue(bool restore_function_code);
void FlushOsrBuffer(bool restore_function_code);
void CompileNext(OptimizedCompileJob* job);
OptimizedCompileJob* NextInput(StopFlag* flag = NULL);
OptimizedCompileJob* NextInput(bool check_if_flushing = false);
// Add a recompilation task for OSR to the cyclic buffer, awaiting OSR entry.
// Tasks evicted from the cyclic buffer are discarded.
@ -145,7 +146,9 @@ class OptimizingCompilerThread : public base::Thread {
int blocked_jobs_;
volatile base::AtomicWord ref_count_;
int ref_count_;
base::Mutex ref_count_mutex_;
base::ConditionVariable ref_count_zero_;
// Copies of FLAG_trace_concurrent_recompilation,
// FLAG_concurrent_recompilation_delay and