diff --git a/src/circular-queue-inl.h b/src/circular-queue-inl.h index ffe8fb003e..962b069fb0 100644 --- a/src/circular-queue-inl.h +++ b/src/circular-queue-inl.h @@ -82,11 +82,10 @@ Record* CircularQueue::Next(Record* curr) { void* SamplingCircularQueue::Enqueue() { - Cell* enqueue_pos = reinterpret_cast( - Thread::GetThreadLocal(producer_key_)); - WrapPositionIfNeeded(&enqueue_pos); - Thread::SetThreadLocal(producer_key_, enqueue_pos + record_size_); - return enqueue_pos; + WrapPositionIfNeeded(&producer_pos_->enqueue_pos); + void* result = producer_pos_->enqueue_pos; + producer_pos_->enqueue_pos += record_size_; + return result; } diff --git a/src/circular-queue.cc b/src/circular-queue.cc index 5f7a33eb3a..a7c25323e8 100644 --- a/src/circular-queue.cc +++ b/src/circular-queue.cc @@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes, buffer_[i] = kClear; } buffer_[buffer_size_] = kEnd; + + // Layout producer and consumer position pointers each on their own + // cache lines to avoid cache lines thrashing due to simultaneous + // updates of positions by different processor cores. + const int positions_size = + RoundUp(1, kProcessorCacheLineSize) + + RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) + + RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize); + positions_ = NewArray(positions_size); + + producer_pos_ = reinterpret_cast( + RoundUp(positions_, kProcessorCacheLineSize)); + producer_pos_->enqueue_pos = buffer_; + + consumer_pos_ = reinterpret_cast( + reinterpret_cast(producer_pos_) + kProcessorCacheLineSize); + ASSERT(reinterpret_cast(consumer_pos_ + 1) <= + positions_ + positions_size); + consumer_pos_->dequeue_chunk_pos = buffer_; + consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; + consumer_pos_->dequeue_pos = NULL; } SamplingCircularQueue::~SamplingCircularQueue() { + DeleteArray(positions_); DeleteArray(buffer_); } -void SamplingCircularQueue::SetUpProducer() { - producer_key_ = Thread::CreateThreadLocalKey(); - Thread::SetThreadLocal(producer_key_, buffer_); -} - - -void SamplingCircularQueue::TearDownProducer() { - Thread::DeleteThreadLocalKey(producer_key_); -} - - -void SamplingCircularQueue::SetUpConsumer() { - consumer_key_ = Thread::CreateThreadLocalKey(); - ConsumerPosition* cp = new ConsumerPosition; - cp->dequeue_chunk_pos = buffer_; - cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; - cp->dequeue_pos = NULL; - Thread::SetThreadLocal(consumer_key_, cp); -} - - -void SamplingCircularQueue::TearDownConsumer() { - delete reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - Thread::DeleteThreadLocalKey(consumer_key_); -} - - void* SamplingCircularQueue::StartDequeue() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - if (cp->dequeue_pos != NULL) { - return cp->dequeue_pos; + if (consumer_pos_->dequeue_pos != NULL) { + return consumer_pos_->dequeue_pos; } else { - if (*cp->dequeue_chunk_poll_pos != kClear) { - cp->dequeue_pos = cp->dequeue_chunk_pos; - cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_; - return cp->dequeue_pos; + if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) { + consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos; + consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_; + return consumer_pos_->dequeue_pos; } else { return NULL; } @@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() { void SamplingCircularQueue::FinishDequeue() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); - cp->dequeue_pos += record_size_; - if (cp->dequeue_pos < cp->dequeue_end_pos) return; + consumer_pos_->dequeue_pos += record_size_; + if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return; // Move to next chunk. - cp->dequeue_pos = NULL; - *cp->dequeue_chunk_pos = kClear; - cp->dequeue_chunk_pos += chunk_size_; - WrapPositionIfNeeded(&cp->dequeue_chunk_pos); - cp->dequeue_chunk_poll_pos += chunk_size_; - WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos); + consumer_pos_->dequeue_pos = NULL; + *consumer_pos_->dequeue_chunk_pos = kClear; + consumer_pos_->dequeue_chunk_pos += chunk_size_; + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos); + consumer_pos_->dequeue_chunk_poll_pos += chunk_size_; + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos); } void SamplingCircularQueue::FlushResidualRecords() { - ConsumerPosition* cp = reinterpret_cast( - Thread::GetThreadLocal(consumer_key_)); // Eliminate producer / consumer distance. - cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos; + consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos; } diff --git a/src/circular-queue.h b/src/circular-queue.h index 11159e0388..dce7fc2ad9 100644 --- a/src/circular-queue.h +++ b/src/circular-queue.h @@ -76,15 +76,11 @@ class SamplingCircularQueue { int buffer_size_in_chunks); ~SamplingCircularQueue(); - // Executed on the producer (sampler) or application thread. - void SetUpProducer(); // Enqueue returns a pointer to a memory location for storing the next // record. INLINE(void* Enqueue()); - void TearDownProducer(); // Executed on the consumer (analyzer) thread. - void SetUpConsumer(); // StartDequeue returns a pointer to a memory location for retrieving // the next record. After the record had been read by a consumer, // FinishDequeue must be called. Until that moment, subsequent calls @@ -95,7 +91,6 @@ class SamplingCircularQueue { // the queue must be notified whether producing has been finished in order // to process remaining records from the buffer. void FlushResidualRecords(); - void TearDownConsumer(); typedef AtomicWord Cell; // Reserved values for the first cell of a record. @@ -103,6 +98,9 @@ class SamplingCircularQueue { static const Cell kEnd = -1; // Marks the end of the buffer. private: + struct ProducerPosition { + Cell* enqueue_pos; + }; struct ConsumerPosition { Cell* dequeue_chunk_pos; Cell* dequeue_chunk_poll_pos; @@ -118,10 +116,9 @@ class SamplingCircularQueue { const int buffer_size_; const int producer_consumer_distance_; Cell* buffer_; - // Store producer and consumer data in TLS to avoid modifying the - // same CPU cache line from two threads simultaneously. - Thread::LocalStorageKey consumer_key_; - Thread::LocalStorageKey producer_key_; + byte* positions_; + ProducerPosition* producer_pos_; + ConsumerPosition* consumer_pos_; }; diff --git a/src/cpu-profiler.cc b/src/cpu-profiler.cc index d36f511209..d16c17f4c0 100644 --- a/src/cpu-profiler.cc +++ b/src/cpu-profiler.cc @@ -176,7 +176,6 @@ bool ProfilerEventsProcessor::ProcessTicks(unsigned dequeue_order) { void ProfilerEventsProcessor::Run() { - ticks_buffer_.SetUpConsumer(); unsigned dequeue_order = 0; running_ = true; @@ -194,7 +193,6 @@ void ProfilerEventsProcessor::Run() { ticks_buffer_.FlushResidualRecords(); // Perform processing until we have tick events, skip remaining code events. while (ProcessTicks(dequeue_order) && ProcessCodeEvent(&dequeue_order)) { } - ticks_buffer_.TearDownConsumer(); } diff --git a/src/cpu-profiler.h b/src/cpu-profiler.h index ccfac5c5c7..8a7d2fdd31 100644 --- a/src/cpu-profiler.h +++ b/src/cpu-profiler.h @@ -154,14 +154,11 @@ class ProfilerEventsProcessor : public Thread { void FunctionMoveEvent(Address from, Address to); void FunctionDeleteEvent(Address from); - // Tick sampler registration. Called by sampler thread or signal handler. - inline void SetUpSamplesProducer() { ticks_buffer_.SetUpProducer(); } // Tick sample events are filled directly in the buffer of the circular // queue (because the structure is of fixed width, but usually not all // stack frame entries are filled.) This method returns a pointer to the // next record of the buffer. INLINE(TickSample* TickSampleEvent()); - inline void TearDownSamplesProducer() { ticks_buffer_.TearDownProducer(); } private: union CodeEventsContainer { diff --git a/src/globals.h b/src/globals.h index cb7f27ee7d..90007e67bf 100644 --- a/src/globals.h +++ b/src/globals.h @@ -195,6 +195,10 @@ const Address kFromSpaceZapValue = reinterpret_cast
(0xbeefdad); // gives 8K bytes per page. const int kPageSizeBits = 13; +// On Intel architecture, cache line size is 64 bytes. +// On ARM it may be less (32 bytes), but as far this constant is +// used for aligning data, it doesn't hurt to align on a greater value. +const int kProcessorCacheLineSize = 64; // Constants relevant to double precision floating point numbers. diff --git a/test/cctest/test-circular-queue.cc b/test/cctest/test-circular-queue.cc index bb69c1bc02..3fa49bfaf3 100644 --- a/test/cctest/test-circular-queue.cc +++ b/test/cctest/test-circular-queue.cc @@ -61,8 +61,6 @@ TEST(SamplingCircularQueue) { SamplingCircularQueue scq(sizeof(Record), kRecordsPerChunk * sizeof(Record), 3); - scq.SetUpProducer(); - scq.SetUpConsumer(); // Check that we are using non-reserved values. CHECK_NE(SamplingCircularQueue::kClear, 1); @@ -121,7 +119,103 @@ TEST(SamplingCircularQueue) { // Consumption must still be possible as the first cell of the // last chunk is not clean. CHECK_NE(NULL, scq.StartDequeue()); - - scq.TearDownConsumer(); - scq.TearDownProducer(); +} + + +namespace { + +class ProducerThread: public i::Thread { + public: + typedef SamplingCircularQueue::Cell Record; + + ProducerThread(SamplingCircularQueue* scq, + int records_per_chunk, + Record value, + i::Semaphore* finished) + : scq_(scq), + records_per_chunk_(records_per_chunk), + value_(value), + finished_(finished) { } + + virtual void Run() { + for (Record i = value_; i < value_ + records_per_chunk_; ++i) { + Record* rec = reinterpret_cast(scq_->Enqueue()); + CHECK_NE(NULL, rec); + *rec = i; + } + + finished_->Signal(); + } + + private: + SamplingCircularQueue* scq_; + const int records_per_chunk_; + Record value_; + i::Semaphore* finished_; +}; + +} // namespace + +TEST(SamplingCircularQueueMultithreading) { + // Emulate multiple VM threads working 'one thread at a time.' + // This test enqueues data from different threads. This corresponds + // to the case of profiling under Linux, where signal handler that + // does sampling is called in the context of different VM threads. + + typedef ProducerThread::Record Record; + const int kRecordsPerChunk = 4; + SamplingCircularQueue scq(sizeof(Record), + kRecordsPerChunk * sizeof(Record), + 3); + i::Semaphore* semaphore = i::OS::CreateSemaphore(0); + // Don't poll ahead, making possible to check data in the buffer + // immediately after enqueuing. + scq.FlushResidualRecords(); + + // Check that we are using non-reserved values. + CHECK_NE(SamplingCircularQueue::kClear, 1); + CHECK_NE(SamplingCircularQueue::kEnd, 1); + ProducerThread producer1(&scq, kRecordsPerChunk, 1, semaphore); + ProducerThread producer2(&scq, kRecordsPerChunk, 10, semaphore); + ProducerThread producer3(&scq, kRecordsPerChunk, 20, semaphore); + + CHECK_EQ(NULL, scq.StartDequeue()); + producer1.Start(); + semaphore->Wait(); + for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); + producer2.Start(); + semaphore->Wait(); + for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); + producer3.Start(); + semaphore->Wait(); + for (Record i = 20; i < 20 + kRecordsPerChunk; ++i) { + Record* rec = reinterpret_cast(scq.StartDequeue()); + CHECK_NE(NULL, rec); + CHECK_EQ(static_cast(i), static_cast(*rec)); + CHECK_EQ(rec, reinterpret_cast(scq.StartDequeue())); + scq.FinishDequeue(); + CHECK_NE(rec, reinterpret_cast(scq.StartDequeue())); + } + + CHECK_EQ(NULL, scq.StartDequeue()); + + delete semaphore; } diff --git a/test/cctest/test-cpu-profiler.cc b/test/cctest/test-cpu-profiler.cc index bd966fa21b..2fff4fae02 100644 --- a/test/cctest/test-cpu-profiler.cc +++ b/test/cctest/test-cpu-profiler.cc @@ -64,7 +64,6 @@ TEST(CodeEvents) { ProfileGenerator generator(&profiles); ProfilerEventsProcessor processor(&generator); processor.Start(); - processor.SetUpSamplesProducer(); while (!processor.running()) { i::Thread::YieldCPU(); } @@ -117,8 +116,6 @@ TEST(CodeEvents) { CodeEntry* entry5 = generator.code_map()->FindEntry(ToAddress(0x1700)); CHECK_NE(NULL, entry5); CHECK_EQ(aaa_str, entry5->name()); - - processor.TearDownSamplesProducer(); } @@ -133,7 +130,6 @@ TEST(TickEvents) { ProfileGenerator generator(&profiles); ProfilerEventsProcessor processor(&generator); processor.Start(); - processor.SetUpSamplesProducer(); while (!processor.running()) { i::Thread::YieldCPU(); } @@ -197,6 +193,4 @@ TEST(TickEvents) { bottom_up_ddd_children.last()->GetChildren(&bottom_up_ddd_stub_children); CHECK_EQ(1, bottom_up_ddd_stub_children.length()); CHECK_EQ("bbb", bottom_up_ddd_stub_children.last()->entry()->name()); - - processor.TearDownSamplesProducer(); }