From b8dd056fa203bf05876c78702b937678b7b9457d Mon Sep 17 00:00:00 2001 From: "yangguo@chromium.org" Date: Wed, 16 Oct 2013 14:47:20 +0000 Subject: [PATCH] Improve queuing for concurrent OSR. Specifically, this includes: - Encapsulating data structure for osr buffer into CyclicBuffer - Use the new CyclicQueue instead of UnboundedQueue to queue new jobs. We can enqueue and dequeue a CyclicQueue on both ends in O(1). This allows us to add OSR jobs to the front for lower compile latency. - Dispose osr buffer by one stale job per GC to avoid leak R=titzer@chromium.org BUG= Review URL: https://codereview.chromium.org/25505002 git-svn-id: http://v8.googlecode.com/svn/branches/bleeding_edge@17244 ce2b1a6d-e550-0410-aec6-3dcde31c8c00 --- src/heap.cc | 4 + src/optimizing-compiler-thread.cc | 120 +++++++++++++++++------------- src/optimizing-compiler-thread.h | 69 ++++++++++------- 3 files changed, 113 insertions(+), 80 deletions(-) diff --git a/src/heap.cc b/src/heap.cc index fec45d433b..83da35c9cd 100644 --- a/src/heap.cc +++ b/src/heap.cc @@ -450,6 +450,10 @@ void Heap::GarbageCollectionPrologue() { #endif // DEBUG store_buffer()->GCPrologue(); + + if (FLAG_concurrent_osr) { + isolate()->optimizing_compiler_thread()->AgeBufferedOsrJobs(); + } } diff --git a/src/optimizing-compiler-thread.cc b/src/optimizing-compiler-thread.cc index 0053148a02..fa2da9761a 100644 --- a/src/optimizing-compiler-thread.cc +++ b/src/optimizing-compiler-thread.cc @@ -93,12 +93,20 @@ void OptimizingCompilerThread::Run() { } +RecompileJob* OptimizingCompilerThread::NextInput() { + LockGuard access_input_queue_(&input_queue_mutex_); + if (input_queue_length_ == 0) return NULL; + RecompileJob* job = input_queue_[InputQueueIndex(0)]; + ASSERT_NE(NULL, job); + input_queue_shift_ = InputQueueIndex(1); + input_queue_length_--; + return job; +} + + void OptimizingCompilerThread::CompileNext() { - RecompileJob* job = NULL; - bool result = input_queue_.Dequeue(&job); - USE(result); - ASSERT(result); - Barrier_AtomicIncrement(&queue_length_, static_cast(-1)); + RecompileJob* job = NextInput(); + ASSERT_NE(NULL, job); // The function may have already been optimized by OSR. Simply continue. RecompileJob::Status status = job->OptimizeGraph(); @@ -131,7 +139,7 @@ static void DisposeRecompileJob(RecompileJob* job, void OptimizingCompilerThread::FlushInputQueue(bool restore_function_code) { RecompileJob* job; - while (input_queue_.Dequeue(&job)) { + while ((job = NextInput())) { // This should not block, since we have one signal on the input queue // semaphore corresponding to each element in the input queue. input_queue_semaphore_.Wait(); @@ -140,7 +148,6 @@ void OptimizingCompilerThread::FlushInputQueue(bool restore_function_code) { DisposeRecompileJob(job, restore_function_code); } } - Release_Store(&queue_length_, static_cast(0)); } @@ -156,12 +163,12 @@ void OptimizingCompilerThread::FlushOutputQueue(bool restore_function_code) { void OptimizingCompilerThread::FlushOsrBuffer(bool restore_function_code) { - RecompileJob* job; - for (int i = 0; i < osr_buffer_size_; i++) { - job = osr_buffer_[i]; - if (job != NULL) DisposeRecompileJob(job, restore_function_code); + for (int i = 0; i < osr_buffer_capacity_; i++) { + if (osr_buffer_[i] != NULL) { + DisposeRecompileJob(osr_buffer_[i], restore_function_code); + osr_buffer_[i] = NULL; + } } - osr_cursor_ = 0; } @@ -187,10 +194,9 @@ void OptimizingCompilerThread::Stop() { stop_semaphore_.Wait(); if (FLAG_concurrent_recompilation_delay != 0) { - // Barrier when loading queue length is not necessary since the write - // happens in CompileNext on the same thread. - // This is used only for testing. - while (NoBarrier_Load(&queue_length_) > 0) CompileNext(); + // At this point the optimizing compiler thread's event loop has stopped. + // There is no need for a mutex when reading input_queue_length_. + while (input_queue_length_ > 0) CompileNext(); InstallOptimizedFunctions(); } else { FlushInputQueue(false); @@ -239,7 +245,6 @@ void OptimizingCompilerThread::InstallOptimizedFunctions() { void OptimizingCompilerThread::QueueForOptimization(RecompileJob* job) { ASSERT(IsQueueAvailable()); ASSERT(!IsOptimizerThread()); - Barrier_AtomicIncrement(&queue_length_, static_cast(1)); CompilationInfo* info = job->info(); if (info->is_osr()) { if (FLAG_trace_concurrent_recompilation) { @@ -247,13 +252,24 @@ void OptimizingCompilerThread::QueueForOptimization(RecompileJob* job) { info->closure()->PrintName(); PrintF(" for concurrent on-stack replacement.\n"); } - AddToOsrBuffer(job); osr_attempts_++; BackEdgeTable::AddStackCheck(info); + AddToOsrBuffer(job); + // Add job to the front of the input queue. + LockGuard access_input_queue(&input_queue_mutex_); + ASSERT_LT(input_queue_length_, input_queue_capacity_); + // Move shift_ back by one. + input_queue_shift_ = InputQueueIndex(input_queue_capacity_ - 1); + input_queue_[InputQueueIndex(0)] = job; + input_queue_length_++; } else { info->closure()->MarkInRecompileQueue(); + // Add job to the back of the input queue. + LockGuard access_input_queue(&input_queue_mutex_); + ASSERT_LT(input_queue_length_, input_queue_capacity_); + input_queue_[InputQueueIndex(input_queue_length_)] = job; + input_queue_length_++; } - input_queue_.Enqueue(job); if (FLAG_block_concurrent_recompilation) { blocked_jobs_++; } else { @@ -274,15 +290,14 @@ void OptimizingCompilerThread::Unblock() { RecompileJob* OptimizingCompilerThread::FindReadyOSRCandidate( Handle function, uint32_t osr_pc_offset) { ASSERT(!IsOptimizerThread()); - RecompileJob* result = NULL; - for (int i = 0; i < osr_buffer_size_; i++) { - result = osr_buffer_[i]; - if (result == NULL) continue; - if (result->IsWaitingForInstall() && - result->info()->HasSameOsrEntry(function, osr_pc_offset)) { + for (int i = 0; i < osr_buffer_capacity_; i++) { + RecompileJob* current = osr_buffer_[i]; + if (current != NULL && + current->IsWaitingForInstall() && + current->info()->HasSameOsrEntry(function, osr_pc_offset)) { osr_hits_++; osr_buffer_[i] = NULL; - return result; + return current; } } return NULL; @@ -292,10 +307,11 @@ RecompileJob* OptimizingCompilerThread::FindReadyOSRCandidate( bool OptimizingCompilerThread::IsQueuedForOSR(Handle function, uint32_t osr_pc_offset) { ASSERT(!IsOptimizerThread()); - for (int i = 0; i < osr_buffer_size_; i++) { - if (osr_buffer_[i] != NULL && - osr_buffer_[i]->info()->HasSameOsrEntry(function, osr_pc_offset)) { - return !osr_buffer_[i]->IsWaitingForInstall(); + for (int i = 0; i < osr_buffer_capacity_; i++) { + RecompileJob* current = osr_buffer_[i]; + if (current != NULL && + current->info()->HasSameOsrEntry(function, osr_pc_offset)) { + return !current->IsWaitingForInstall(); } } return false; @@ -304,10 +320,10 @@ bool OptimizingCompilerThread::IsQueuedForOSR(Handle function, bool OptimizingCompilerThread::IsQueuedForOSR(JSFunction* function) { ASSERT(!IsOptimizerThread()); - for (int i = 0; i < osr_buffer_size_; i++) { - if (osr_buffer_[i] != NULL && - *osr_buffer_[i]->info()->closure() == function) { - return !osr_buffer_[i]->IsWaitingForInstall(); + for (int i = 0; i < osr_buffer_capacity_; i++) { + RecompileJob* current = osr_buffer_[i]; + if (current != NULL && *current->info()->closure() == function) { + return !current->IsWaitingForInstall(); } } return false; @@ -316,27 +332,27 @@ bool OptimizingCompilerThread::IsQueuedForOSR(JSFunction* function) { void OptimizingCompilerThread::AddToOsrBuffer(RecompileJob* job) { ASSERT(!IsOptimizerThread()); - // Store into next empty slot or replace next stale OSR job that's waiting - // in vain. Dispose in the latter case. - RecompileJob* stale; + // Find the next slot that is empty or has a stale job. while (true) { - stale = osr_buffer_[osr_cursor_]; - if (stale == NULL) break; - if (stale->IsWaitingForInstall()) { - CompilationInfo* info = stale->info(); - if (FLAG_trace_osr) { - PrintF("[COSR - Discarded "); - info->closure()->PrintName(); - PrintF(", AST id %d]\n", info->osr_ast_id().ToInt()); - } - DisposeRecompileJob(stale, false); - break; - } - AdvanceOsrCursor(); + RecompileJob* stale = osr_buffer_[osr_buffer_cursor_]; + if (stale == NULL || stale->IsWaitingForInstall()) break; + osr_buffer_cursor_ = (osr_buffer_cursor_ + 1) % osr_buffer_capacity_; } - osr_buffer_[osr_cursor_] = job; - AdvanceOsrCursor(); + // Add to found slot and dispose the evicted job. + RecompileJob* evicted = osr_buffer_[osr_buffer_cursor_]; + if (evicted != NULL) { + ASSERT(evicted->IsWaitingForInstall()); + CompilationInfo* info = evicted->info(); + if (FLAG_trace_osr) { + PrintF("[COSR - Discarded "); + info->closure()->PrintName(); + PrintF(", AST id %d]\n", info->osr_ast_id().ToInt()); + } + DisposeRecompileJob(evicted, false); + } + osr_buffer_[osr_buffer_cursor_] = job; + osr_buffer_cursor_ = (osr_buffer_cursor_ + 1) % osr_buffer_capacity_; } diff --git a/src/optimizing-compiler-thread.h b/src/optimizing-compiler-thread.h index 89921423ab..a9e108c5cb 100644 --- a/src/optimizing-compiler-thread.h +++ b/src/optimizing-compiler-thread.h @@ -53,21 +53,29 @@ class OptimizingCompilerThread : public Thread { isolate_(isolate), stop_semaphore_(0), input_queue_semaphore_(0), - osr_cursor_(0), + input_queue_capacity_(FLAG_concurrent_recompilation_queue_length), + input_queue_length_(0), + input_queue_shift_(0), + osr_buffer_capacity_(FLAG_concurrent_recompilation_queue_length + 4), + osr_buffer_cursor_(0), osr_hits_(0), osr_attempts_(0), blocked_jobs_(0) { NoBarrier_Store(&stop_thread_, static_cast(CONTINUE)); - NoBarrier_Store(&queue_length_, static_cast(0)); - if (FLAG_concurrent_osr) { - osr_buffer_size_ = FLAG_concurrent_recompilation_queue_length + 4; - osr_buffer_ = NewArray(osr_buffer_size_); - for (int i = 0; i < osr_buffer_size_; i++) osr_buffer_[i] = NULL; - } + input_queue_ = NewArray(input_queue_capacity_); + osr_buffer_ = NewArray(osr_buffer_capacity_); + // Mark OSR buffer slots as empty. + for (int i = 0; i < osr_buffer_capacity_; i++) osr_buffer_[i] = NULL; } ~OptimizingCompilerThread() { - if (FLAG_concurrent_osr) DeleteArray(osr_buffer_); + ASSERT_EQ(0, input_queue_length_); +#ifdef DEBUG + for (int i = 0; i < osr_buffer_capacity_; i++) { + CHECK_EQ(NULL, osr_buffer_[i]); + } +#endif + DeleteArray(osr_buffer_); } void Run(); @@ -83,17 +91,15 @@ class OptimizingCompilerThread : public Thread { bool IsQueuedForOSR(JSFunction* function); inline bool IsQueueAvailable() { - // We don't need a barrier since we have a data dependency right - // after. - Atomic32 current_length = NoBarrier_Load(&queue_length_); + LockGuard access_input_queue(&input_queue_mutex_); + return input_queue_length_ < input_queue_capacity_; + } - // This can be queried only from the execution thread. - ASSERT(!IsOptimizerThread()); - // Since only the execution thread increments queue_length_ and - // only one thread can run inside an Isolate at one time, a direct - // doesn't introduce a race -- queue_length_ may decreased in - // meantime, but not increased. - return (current_length < FLAG_concurrent_recompilation_queue_length); + inline void AgeBufferedOsrJobs() { + // Advance cursor of the cyclic buffer to next empty slot or stale OSR job. + // Dispose said OSR job in the latter case. Calling this on every GC + // should make sure that we do not hold onto stale jobs indefinitely. + AddToOsrBuffer(NULL); } #ifdef DEBUG @@ -107,12 +113,17 @@ class OptimizingCompilerThread : public Thread { void FlushOutputQueue(bool restore_function_code); void FlushOsrBuffer(bool restore_function_code); void CompileNext(); + RecompileJob* NextInput(); // Add a recompilation task for OSR to the cyclic buffer, awaiting OSR entry. // Tasks evicted from the cyclic buffer are discarded. void AddToOsrBuffer(RecompileJob* compiler); - void AdvanceOsrCursor() { - osr_cursor_ = (osr_cursor_ + 1) % osr_buffer_size_; + + inline int InputQueueIndex(int i) { + int result = (i + input_queue_shift_) % input_queue_capacity_; + ASSERT_LE(0, result); + ASSERT_LT(result, input_queue_capacity_); + return result; } #ifdef DEBUG @@ -124,20 +135,22 @@ class OptimizingCompilerThread : public Thread { Semaphore stop_semaphore_; Semaphore input_queue_semaphore_; - // Queue of incoming recompilation tasks (including OSR). - UnboundQueue input_queue_; + // Circular queue of incoming recompilation tasks (including OSR). + RecompileJob** input_queue_; + int input_queue_capacity_; + int input_queue_length_; + int input_queue_shift_; + Mutex input_queue_mutex_; + // Queue of recompilation tasks ready to be installed (excluding OSR). UnboundQueue output_queue_; + // Cyclic buffer of recompilation tasks for OSR. - // TODO(yangguo): This may keep zombie tasks indefinitely, holding on to - // a lot of memory. Fix this. RecompileJob** osr_buffer_; - // Cursor for the cyclic buffer. - int osr_cursor_; - int osr_buffer_size_; + int osr_buffer_capacity_; + int osr_buffer_cursor_; volatile AtomicWord stop_thread_; - volatile Atomic32 queue_length_; TimeDelta time_spent_compiling_; TimeDelta time_spent_total_;