[Heap]: Evacuate uses Jobs

Replaces ItemParallelJob by std::vector to hold work items.
IndexGenerator is used to iterate over evacuation items.

Change-Id: I63ea246f267d8cbe140c47c022b95b3873bc957a
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/2425339
Commit-Queue: Etienne Pierre-Doray <etiennep@chromium.org>
Reviewed-by: Ulan Degenbaev <ulan@chromium.org>
Cr-Commit-Position: refs/heads/master@{#70242}
This commit is contained in:
Etienne Pierre-doray 2020-09-29 14:04:44 -04:00 committed by Commit Bot
parent 19e6ead05c
commit 0763e924a8
2 changed files with 92 additions and 72 deletions

View File

@ -6,6 +6,7 @@
#include <unordered_map>
#include "src/base/optional.h"
#include "src/base/utils/random-number-generator.h"
#include "src/codegen/compilation-cache.h"
#include "src/deoptimizer/deoptimizer.h"
@ -19,6 +20,7 @@
#include "src/heap/code-object-registry.h"
#include "src/heap/gc-tracer.h"
#include "src/heap/incremental-marking-inl.h"
#include "src/heap/index-generator.h"
#include "src/heap/invalidated-slots-inl.h"
#include "src/heap/item-parallel-job.h"
#include "src/heap/large-spaces.h"
@ -390,11 +392,8 @@ int NumberOfAvailableCores() {
} // namespace
int MarkCompactCollectorBase::NumberOfParallelCompactionTasks(int pages) {
DCHECK_GT(pages, 0);
int tasks = FLAG_parallel_compaction ? Min(NumberOfAvailableCores(),
pages / (MB / Page::kPageSize) + 1)
: 1;
int MarkCompactCollectorBase::NumberOfParallelCompactionTasks() {
int tasks = FLAG_parallel_compaction ? NumberOfAvailableCores() : 1;
if (!heap_->CanPromoteYoungAndExpandOldGeneration(
static_cast<size_t>(tasks * Page::kPageSize))) {
// Optimize for memory usage near the heap limit.
@ -2953,7 +2952,7 @@ class Evacuator : public Malloced {
// Merge back locally cached info sequentially. Note that this method needs
// to be called from the main thread.
inline void Finalize();
virtual void Finalize();
virtual GCTracer::BackgroundScope::ScopeId GetBackgroundTracingScope() = 0;
virtual GCTracer::Scope::ScopeId GetTracingScope() = 0;
@ -3052,7 +3051,7 @@ class FullEvacuator : public Evacuator {
return GCTracer::Scope::MC_EVACUATE_COPY_PARALLEL;
}
inline void Finalize() {
void Finalize() override {
Evacuator::Finalize();
for (auto it = ephemeron_remembered_set_.begin();
@ -3121,48 +3120,68 @@ void FullEvacuator::RawEvacuatePage(MemoryChunk* chunk, intptr_t* live_bytes) {
}
}
class EvacuationItem : public ItemParallelJob::Item {
class PageEvacuationJob : public v8::JobTask {
public:
explicit EvacuationItem(MemoryChunk* chunk) : chunk_(chunk) {}
~EvacuationItem() override = default;
MemoryChunk* chunk() const { return chunk_; }
private:
MemoryChunk* chunk_;
};
class PageEvacuationTask : public ItemParallelJob::Task {
public:
PageEvacuationTask(Isolate* isolate, Evacuator* evacuator)
: ItemParallelJob::Task(isolate),
evacuator_(evacuator),
PageEvacuationJob(
Isolate* isolate, std::vector<std::unique_ptr<Evacuator>>* evacuators,
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items)
: evacuators_(evacuators),
evacuation_items_(std::move(evacuation_items)),
remaining_evacuation_items_(evacuation_items_.size()),
generator_(evacuation_items_.size()),
tracer_(isolate->heap()->tracer()) {}
void RunInParallel(Runner runner) override {
if (runner == Runner::kForeground) {
TRACE_GC(tracer_, evacuator_->GetTracingScope());
ProcessItems();
void Run(JobDelegate* delegate) override {
Evacuator* evacuator = (*evacuators_)[delegate->GetTaskId()].get();
if (delegate->IsJoiningThread()) {
TRACE_GC(tracer_, evacuator->GetTracingScope());
ProcessItems(delegate, evacuator);
} else {
TRACE_BACKGROUND_GC(tracer_, evacuator_->GetBackgroundTracingScope());
ProcessItems();
TRACE_BACKGROUND_GC(tracer_, evacuator->GetBackgroundTracingScope());
ProcessItems(delegate, evacuator);
}
}
private:
void ProcessItems() {
EvacuationItem* item = nullptr;
while ((item = GetItem<EvacuationItem>()) != nullptr) {
evacuator_->EvacuatePage(item->chunk());
item->MarkFinished();
void ProcessItems(JobDelegate* delegate, Evacuator* evacuator) {
while (remaining_evacuation_items_.load(std::memory_order_relaxed) > 0) {
base::Optional<size_t> index = generator_.GetNext();
if (!index) return;
for (size_t i = *index; i < evacuation_items_.size(); ++i) {
auto& work_item = evacuation_items_[i];
if (!work_item.first.TryAcquire()) break;
evacuator->EvacuatePage(work_item.second);
if (remaining_evacuation_items_.fetch_sub(
1, std::memory_order_relaxed) <= 1) {
return;
}
}
}
}
Evacuator* evacuator_;
size_t GetMaxConcurrency(size_t worker_count) const override {
const size_t kItemsPerWorker = MB / Page::kPageSize;
// Ceiling division to ensure enough workers for all
// |remaining_evacuation_items_|
const size_t wanted_num_workers =
(remaining_evacuation_items_.load(std::memory_order_relaxed) +
kItemsPerWorker - 1) /
kItemsPerWorker;
return std::min<size_t>(wanted_num_workers, evacuators_->size());
}
private:
std::vector<std::unique_ptr<Evacuator>>* evacuators_;
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items_;
std::atomic<size_t> remaining_evacuation_items_{0};
IndexGenerator generator_;
GCTracer* tracer_;
};
template <class Evacuator, class Collector>
void MarkCompactCollectorBase::CreateAndExecuteEvacuationTasks(
Collector* collector, ItemParallelJob* job,
Collector* collector,
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items,
MigrationObserver* migration_observer, const intptr_t live_bytes) {
// Used for trace summary.
double compaction_speed = 0;
@ -3173,31 +3192,33 @@ void MarkCompactCollectorBase::CreateAndExecuteEvacuationTasks(
const bool profiling = isolate()->LogObjectRelocation();
ProfilingMigrationObserver profiling_observer(heap());
const int wanted_num_tasks =
NumberOfParallelCompactionTasks(job->NumberOfItems());
Evacuator** evacuators = new Evacuator*[wanted_num_tasks];
const size_t pages_count = evacuation_items.size();
std::vector<std::unique_ptr<v8::internal::Evacuator>> evacuators;
const int wanted_num_tasks = NumberOfParallelCompactionTasks();
for (int i = 0; i < wanted_num_tasks; i++) {
evacuators[i] = new Evacuator(collector);
if (profiling) evacuators[i]->AddObserver(&profiling_observer);
auto evacuator = std::make_unique<Evacuator>(collector);
if (profiling) evacuator->AddObserver(&profiling_observer);
if (migration_observer != nullptr)
evacuators[i]->AddObserver(migration_observer);
job->AddTask(new PageEvacuationTask(heap()->isolate(), evacuators[i]));
evacuator->AddObserver(migration_observer);
evacuators.push_back(std::move(evacuator));
}
job->Run();
for (int i = 0; i < wanted_num_tasks; i++) {
evacuators[i]->Finalize();
delete evacuators[i];
}
delete[] evacuators;
V8::GetCurrentPlatform()
->PostJob(v8::TaskPriority::kUserBlocking,
std::make_unique<PageEvacuationJob>(
isolate(), &evacuators, std::move(evacuation_items)))
->Join();
for (auto& evacuator : evacuators) evacuator->Finalize();
evacuators.clear();
if (FLAG_trace_evacuation) {
PrintIsolate(isolate(),
"%8.0f ms: evacuation-summary: parallel=%s pages=%d "
"wanted_tasks=%d tasks=%d cores=%d live_bytes=%" V8PRIdPTR
"%8.0f ms: evacuation-summary: parallel=%s pages=%zu "
"wanted_tasks=%d cores=%d live_bytes=%" V8PRIdPTR
" compaction_speed=%.f\n",
isolate()->time_millis_since_init(),
FLAG_parallel_compaction ? "yes" : "no", job->NumberOfItems(),
wanted_num_tasks, job->NumberOfTasks(),
FLAG_parallel_compaction ? "yes" : "no", pages_count,
wanted_num_tasks,
V8::GetCurrentPlatform()->NumberOfWorkerThreads() + 1,
live_bytes, compaction_speed);
}
@ -3214,8 +3235,7 @@ bool MarkCompactCollectorBase::ShouldMovePage(Page* p, intptr_t live_bytes,
}
void MarkCompactCollector::EvacuatePagesInParallel() {
ItemParallelJob evacuation_job(isolate()->cancelable_task_manager(),
&page_parallel_job_semaphore_);
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items;
intptr_t live_bytes = 0;
// Evacuation of new space pages cannot be aborted, so it needs to run
@ -3238,12 +3258,12 @@ void MarkCompactCollector::EvacuatePagesInParallel() {
EvacuateNewSpacePageVisitor<NEW_TO_NEW>::Move(page);
}
}
evacuation_job.AddItem(new EvacuationItem(page));
evacuation_items.emplace_back(ParallelWorkItem{}, page);
}
for (Page* page : old_space_evacuation_pages_) {
live_bytes += non_atomic_marking_state()->live_bytes(page);
evacuation_job.AddItem(new EvacuationItem(page));
evacuation_items.emplace_back(ParallelWorkItem{}, page);
}
// Promote young generation large objects.
@ -3259,18 +3279,18 @@ void MarkCompactCollector::EvacuatePagesInParallel() {
if (marking_state->IsBlack(object)) {
heap_->lo_space()->PromoteNewLargeObject(current);
current->SetFlag(Page::PAGE_NEW_OLD_PROMOTION);
evacuation_job.AddItem(new EvacuationItem(current));
evacuation_items.emplace_back(ParallelWorkItem{}, current);
}
}
if (evacuation_job.NumberOfItems() == 0) return;
if (evacuation_items.empty()) return;
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.gc"),
"MarkCompactCollector::EvacuatePagesInParallel", "pages",
evacuation_job.NumberOfItems());
evacuation_items.size());
CreateAndExecuteEvacuationTasks<FullEvacuator>(this, &evacuation_job, nullptr,
live_bytes);
CreateAndExecuteEvacuationTasks<FullEvacuator>(
this, std::move(evacuation_items), nullptr, live_bytes);
// After evacuation there might still be swept pages that weren't
// added to one of the compaction space but still reside in the
@ -5161,8 +5181,7 @@ void YoungGenerationEvacuator::RawEvacuatePage(MemoryChunk* chunk,
} // namespace
void MinorMarkCompactCollector::EvacuatePagesInParallel() {
ItemParallelJob evacuation_job(isolate()->cancelable_task_manager(),
&page_parallel_job_semaphore_);
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items;
intptr_t live_bytes = 0;
for (Page* page : new_space_evacuation_pages_) {
@ -5176,7 +5195,7 @@ void MinorMarkCompactCollector::EvacuatePagesInParallel() {
EvacuateNewSpacePageVisitor<NEW_TO_NEW>::Move(page);
}
}
evacuation_job.AddItem(new EvacuationItem(page));
evacuation_items.emplace_back(ParallelWorkItem{}, page);
}
// Promote young generation large objects.
@ -5189,15 +5208,15 @@ void MinorMarkCompactCollector::EvacuatePagesInParallel() {
if (non_atomic_marking_state_.IsGrey(object)) {
heap_->lo_space()->PromoteNewLargeObject(current);
current->SetFlag(Page::PAGE_NEW_OLD_PROMOTION);
evacuation_job.AddItem(new EvacuationItem(current));
evacuation_items.emplace_back(ParallelWorkItem{}, current);
}
}
if (evacuation_job.NumberOfItems() == 0) return;
if (evacuation_items.empty()) return;
YoungGenerationMigrationObserver observer(heap(),
heap()->mark_compact_collector());
CreateAndExecuteEvacuationTasks<YoungGenerationEvacuator>(
this, &evacuation_job, &observer, live_bytes);
this, std::move(evacuation_items), &observer, live_bytes);
}
#endif // ENABLE_MINOR_MC

View File

@ -13,6 +13,7 @@
#include "src/heap/marking-worklist.h"
#include "src/heap/marking.h"
#include "src/heap/memory-measurement.h"
#include "src/heap/parallel-work-item.h"
#include "src/heap/spaces.h"
#include "src/heap/sweeper.h"
@ -220,10 +221,10 @@ class MarkCompactCollectorBase {
MemoryChunk* chunk, RememberedSetUpdatingMode updating_mode) = 0;
template <class Evacuator, class Collector>
void CreateAndExecuteEvacuationTasks(Collector* collector,
ItemParallelJob* job,
MigrationObserver* migration_observer,
const intptr_t live_bytes);
void CreateAndExecuteEvacuationTasks(
Collector* collector,
std::vector<std::pair<ParallelWorkItem, MemoryChunk*>> evacuation_items,
MigrationObserver* migration_observer, const intptr_t live_bytes);
// Returns whether this page should be moved according to heuristics.
bool ShouldMovePage(Page* p, intptr_t live_bytes, bool promote_young);
@ -234,7 +235,7 @@ class MarkCompactCollectorBase {
IterateableSpace* space,
RememberedSetUpdatingMode mode);
int NumberOfParallelCompactionTasks(int pages);
int NumberOfParallelCompactionTasks();
int NumberOfParallelPointerUpdateTasks(int pages, int slots);
int NumberOfParallelToSpacePointerUpdateTasks(int pages);