diff --git a/src/compiler.h b/src/compiler.h
index b51b272bfc..de1ffbcc0d 100644
--- a/src/compiler.h
+++ b/src/compiler.h
@@ -208,6 +208,7 @@ class V8_EXPORT_PRIVATE CompilationJob {
   State state() const { return state_; }
   CompilationInfo* info() const { return info_; }
   Isolate* isolate() const;
+  virtual size_t AllocatedMemory() const { return 0; }
 
  protected:
   // Overridden by the actual implementation.
diff --git a/src/compiler/pipeline.cc b/src/compiler/pipeline.cc
index 306479c1c1..0802744aa0 100644
--- a/src/compiler/pipeline.cc
+++ b/src/compiler/pipeline.cc
@@ -690,6 +690,8 @@ class PipelineWasmCompilationJob final : public CompilationJob {
   Status FinalizeJobImpl() final;
 
  private:
+  size_t AllocatedMemory() const override;
+
   ZoneStats zone_stats_;
   std::unique_ptr<PipelineStatistics> pipeline_statistics_;
   PipelineData data_;
@@ -736,6 +738,10 @@ PipelineWasmCompilationJob::ExecuteJobImpl() {
   return SUCCEEDED;
 }
 
+size_t PipelineWasmCompilationJob::AllocatedMemory() const {
+  return pipeline_.data_->zone_stats()->GetCurrentAllocatedBytes();
+}
+
 PipelineWasmCompilationJob::Status
 PipelineWasmCompilationJob::FinalizeJobImpl() {
   pipeline_.AssembleCode(&linkage_);
diff --git a/src/compiler/wasm-compiler.cc b/src/compiler/wasm-compiler.cc
index 8b841a8ef9..23dfb88af2 100644
--- a/src/compiler/wasm-compiler.cc
+++ b/src/compiler/wasm-compiler.cc
@@ -3975,6 +3975,12 @@ void WasmCompilationUnit::ExecuteCompilation() {
         isolate_->counters()->wasm_compile_function_time());
   }
   ExecuteCompilationInternal();
+  // Record the memory cost this unit places on the system until
+  // it is finalized. That may be "0" in error cases.
+  if (job_) {
+    size_t cost = job_->AllocatedMemory();
+    set_memory_cost(cost);
+  }
 }
 
 void WasmCompilationUnit::ExecuteCompilationInternal() {
diff --git a/src/compiler/wasm-compiler.h b/src/compiler/wasm-compiler.h
index afda0156f9..47b65387e5 100644
--- a/src/compiler/wasm-compiler.h
+++ b/src/compiler/wasm-compiler.h
@@ -64,6 +64,9 @@ class WasmCompilationUnit final {
                                           wasm::ModuleBytesEnv* module_env,
                                           const wasm::WasmFunction* function);
 
+  void set_memory_cost(size_t memory_cost) { memory_cost_ = memory_cost; }
+  size_t memory_cost() const { return memory_cost_; }
+
  private:
   SourcePositionTable* BuildGraphForWasmFunction(double* decode_ms);
 
@@ -85,7 +88,7 @@ class WasmCompilationUnit final {
   int func_index_;
   wasm::Result<wasm::DecodeStruct*> graph_construction_result_;
   bool ok_ = true;
-
+  size_t memory_cost_ = 0;
   void ExecuteCompilationInternal();
 
   DISALLOW_COPY_AND_ASSIGN(WasmCompilationUnit);
diff --git a/src/compiler/zone-stats.cc b/src/compiler/zone-stats.cc
index 8942df5555..626ad4072c 100644
--- a/src/compiler/zone-stats.cc
+++ b/src/compiler/zone-stats.cc
@@ -68,11 +68,11 @@ ZoneStats::~ZoneStats() {
   DCHECK(stats_.empty());
 }
 
-size_t ZoneStats::GetMaxAllocatedBytes() {
+size_t ZoneStats::GetMaxAllocatedBytes() const {
   return std::max(max_allocated_bytes_, GetCurrentAllocatedBytes());
 }
 
-size_t ZoneStats::GetCurrentAllocatedBytes() {
+size_t ZoneStats::GetCurrentAllocatedBytes() const {
   size_t total = 0;
   for (Zone* zone : zones_) {
     total += static_cast<size_t>(zone->allocation_size());
@@ -80,7 +80,7 @@ size_t ZoneStats::GetCurrentAllocatedBytes() {
   return total;
 }
 
-size_t ZoneStats::GetTotalAllocatedBytes() {
+size_t ZoneStats::GetTotalAllocatedBytes() const {
   return total_deleted_bytes_ + GetCurrentAllocatedBytes();
 }
 
diff --git a/src/compiler/zone-stats.h b/src/compiler/zone-stats.h
index 39adca3693..6e0cd5fe4e 100644
--- a/src/compiler/zone-stats.h
+++ b/src/compiler/zone-stats.h
@@ -66,9 +66,9 @@ class V8_EXPORT_PRIVATE ZoneStats final {
   explicit ZoneStats(AccountingAllocator* allocator);
   ~ZoneStats();
 
-  size_t GetMaxAllocatedBytes();
-  size_t GetTotalAllocatedBytes();
-  size_t GetCurrentAllocatedBytes();
+  size_t GetMaxAllocatedBytes() const;
+  size_t GetTotalAllocatedBytes() const;
+  size_t GetCurrentAllocatedBytes() const;
 
  private:
   Zone* NewEmptyZone(const char* zone_name);
diff --git a/src/wasm/module-compiler.cc b/src/wasm/module-compiler.cc
index ad7d552765..7ac3ecb54b 100644
--- a/src/wasm/module-compiler.cc
+++ b/src/wasm/module-compiler.cc
@@ -39,14 +39,22 @@ namespace internal {
 namespace wasm {
 
 ModuleCompiler::CodeGenerationSchedule::CodeGenerationSchedule(
-    base::RandomNumberGenerator* random_number_generator)
-    : random_number_generator_(random_number_generator) {
+    base::RandomNumberGenerator* random_number_generator, size_t max_memory)
+    : random_number_generator_(random_number_generator),
+      max_memory_(max_memory) {
   DCHECK_NOT_NULL(random_number_generator_);
+  DCHECK_GT(max_memory_, 0);
 }
 
 void ModuleCompiler::CodeGenerationSchedule::Schedule(
     std::unique_ptr<compiler::WasmCompilationUnit>&& item) {
+  size_t cost = item->memory_cost();
   schedule_.push_back(std::move(item));
+  allocated_memory_.Increment(cost);
+}
+
+bool ModuleCompiler::CodeGenerationSchedule::CanAcceptWork() {
+  return (!throttle_ || allocated_memory_.Value() <= max_memory_);
 }
 
 std::unique_ptr<compiler::WasmCompilationUnit>
@@ -73,24 +81,66 @@ ModuleCompiler::ModuleCompiler(Isolate* isolate,
       module_(std::move(module)),
       counters_shared_(isolate->counters_shared()),
       is_sync_(is_sync),
-      executed_units_(isolate->random_number_generator()) {
+      executed_units_(
+          isolate->random_number_generator(),
+          (isolate->heap()->memory_allocator()->code_range()->valid()
+               ? isolate->heap()->memory_allocator()->code_range()->size()
+               : isolate->heap()->code_space()->Capacity()) /
+              2),
+      num_background_tasks_(
+          Min(static_cast<size_t>(FLAG_wasm_num_compilation_tasks),
+              V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads())),
+      stopped_compilation_tasks_{num_background_tasks_} {
   counters_ = counters_shared_.get();
 }
 
+bool ModuleCompiler::GetNextUncompiledFunctionId(size_t* index) {
+  DCHECK_NOT_NULL(index);
+  // - 1 because AtomicIncrement returns the value after the atomic increment.
+  *index = next_unit_.Increment(1) - 1;
+  return *index < compilation_units_.size();
+}
+
 // The actual runnable task that performs compilations in the background.
 ModuleCompiler::CompilationTask::CompilationTask(ModuleCompiler* compiler)
     : CancelableTask(compiler->isolate_), compiler_(compiler) {}
 
 void ModuleCompiler::CompilationTask::RunInternal() {
-  while (compiler_->FetchAndExecuteCompilationUnit()) {
+  size_t index = 0;
+  while (compiler_->executed_units_.CanAcceptWork() &&
+         compiler_->GetNextUncompiledFunctionId(&index)) {
+    compiler_->CompileAndSchedule(index);
   }
-  compiler_->module_->pending_tasks.get()->Signal();
+  compiler_->OnBackgroundTaskStopped();
 }
 
-// Run by each compilation task and by the main thread. The
-// no_finisher_callback is called within the result_mutex_ lock when no
-// finishing task is running, i.e. when the finisher_is_running_ flag is not
-// set.
+void ModuleCompiler::OnBackgroundTaskStopped() {
+  size_t num_stopped = stopped_compilation_tasks_.Increment(1);
+  DCHECK_LE(num_stopped, num_background_tasks_);
+  if (num_stopped == num_background_tasks_) {
+    compilation_tasks_cv_.NotifyOne();
+  }
+}
+
+void ModuleCompiler::CompileAndSchedule(size_t index) {
+  DisallowHeapAllocation no_allocation;
+  DisallowHandleAllocation no_handles;
+  DisallowHandleDereference no_deref;
+  DisallowCodeDependencyChange no_dependency_change;
+  DCHECK_LT(index, compilation_units_.size());
+
+  std::unique_ptr<compiler::WasmCompilationUnit> unit =
+      std::move(compilation_units_.at(index));
+  unit->ExecuteCompilation();
+  {
+    base::LockGuard<base::Mutex> guard(&result_mutex_);
+    executed_units_.Schedule(std::move(unit));
+  }
+}
+
+// Run by each compilation task The no_finisher_callback is called
+// within the result_mutex_ lock when no finishing task is running,
+// i.e. when the finisher_is_running_ flag is not set.
 bool ModuleCompiler::FetchAndExecuteCompilationUnit(
     std::function<void()> no_finisher_callback) {
   DisallowHeapAllocation no_allocation;
@@ -99,6 +149,7 @@ bool ModuleCompiler::FetchAndExecuteCompilationUnit(
   DisallowCodeDependencyChange no_dependency_change;
 
   // - 1 because AtomicIncrement returns the value after the atomic increment.
+  // Bail out fast if there's no work to do.
   size_t index = next_unit_.Increment(1) - 1;
   if (index >= compilation_units_.size()) {
     return false;
@@ -136,41 +187,35 @@ size_t ModuleCompiler::InitializeParallelCompilation(
   return funcs_to_compile;
 }
 
-uint32_t* ModuleCompiler::StartCompilationTasks() {
-  num_background_tasks_ =
-      Min(static_cast<size_t>(FLAG_wasm_num_compilation_tasks),
-          V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads());
-  uint32_t* task_ids = new uint32_t[num_background_tasks_];
-  for (size_t i = 0; i < num_background_tasks_; ++i) {
-    CompilationTask* task = new CompilationTask(this);
-    task_ids[i] = task->id();
+void ModuleCompiler::RestartCompilationTasks() {
+  size_t current_stopped_tasks = stopped_compilation_tasks_.Value();
+  stopped_compilation_tasks_.Decrement(current_stopped_tasks);
+  for (size_t i = 0; i < current_stopped_tasks; ++i) {
     V8::GetCurrentPlatform()->CallOnBackgroundThread(
-        task, v8::Platform::kShortRunningTask);
-  }
-  return task_ids;
-}
-
-void ModuleCompiler::WaitForCompilationTasks(uint32_t* task_ids) {
-  for (size_t i = 0; i < num_background_tasks_; ++i) {
-    // If the task has not started yet, then we abort it. Otherwise we wait
-    // for it to finish.
-    if (isolate_->cancelable_task_manager()->TryAbort(task_ids[i]) !=
-        CancelableTaskManager::kTaskAborted) {
-      module_->pending_tasks.get()->Wait();
-    }
+        new CompilationTask(this),
+        v8::Platform::ExpectedRuntime::kShortRunningTask);
   }
 }
 
-void ModuleCompiler::FinishCompilationUnits(std::vector<Handle<Code>>& results,
-                                            ErrorThrower* thrower) {
-  SetFinisherIsRunning(true);
+void ModuleCompiler::WaitForCompilationTasks() {
+  base::LockGuard<base::Mutex> guard(&cv_mutex_);
+  while (stopped_compilation_tasks_.Value() < num_background_tasks_) {
+    compilation_tasks_cv_.Wait(&cv_mutex_);
+  }
+}
+
+size_t ModuleCompiler::FinishCompilationUnits(
+    std::vector<Handle<Code>>& results, ErrorThrower* thrower) {
+  size_t finished = 0;
   while (true) {
     int func_index = -1;
     Handle<Code> result = FinishCompilationUnit(thrower, &func_index);
     if (func_index < 0) break;
     results[func_index] = result;
+    ++finished;
   }
-  SetFinisherIsRunning(false);
+  RestartCompilationTasks();
+  return finished;
 }
 
 void ModuleCompiler::SetFinisherIsRunning(bool value) {
@@ -221,30 +266,35 @@ void ModuleCompiler::CompileInParallel(ModuleBytesEnv* module_env,
   //    and stores them in the vector {compilation_units}.
   InitializeParallelCompilation(module->functions, *module_env);
 
-  // Objects for the synchronization with the background threads.
-  base::AtomicNumber<size_t> next_unit(
-      static_cast<size_t>(FLAG_skip_compiling_wasm_funcs));
+  executed_units_.EnableThrottling();
 
   // 2) The main thread spawns {CompilationTask} instances which run on
   //    the background threads.
-  std::unique_ptr<uint32_t[]> task_ids(StartCompilationTasks());
+  RestartCompilationTasks();
+
+  size_t finished_functions = 0;
+  while (finished_functions < compilation_units_.size()) {
+    // 3.a) The background threads and the main thread pick one compilation
+    //      unit at a time and execute the parallel phase of the compilation
+    //      unit. After finishing the execution of the parallel phase, the
+    //      result is enqueued in {executed_units}.
+    //      The foreground task bypasses waiting on memory threshold, because
+    //      its results will immediately be converted to code (below).
+    size_t index = 0;
+    if (GetNextUncompiledFunctionId(&index)) {
+      CompileAndSchedule(index);
+    }
 
-  // 3.a) The background threads and the main thread pick one compilation
-  //      unit at a time and execute the parallel phase of the compilation
-  //      unit. After finishing the execution of the parallel phase, the
-  //      result is enqueued in {executed_units}.
-  while (FetchAndExecuteCompilationUnit()) {
     // 3.b) If {executed_units} contains a compilation unit, the main thread
     //      dequeues it and finishes the compilation unit. Compilation units
     //      are finished concurrently to the background threads to save
     //      memory.
-    FinishCompilationUnits(results, thrower);
+    finished_functions += FinishCompilationUnits(results, thrower);
   }
   // 4) After the parallel phase of all compilation units has started, the
-  //    main thread waits for all {CompilationTask} instances to finish.
-  WaitForCompilationTasks(task_ids.get());
-  // Finish the compilation of the remaining compilation units.
-  FinishCompilationUnits(results, thrower);
+  //    main thread waits for all {CompilationTask} instances to finish - which
+  //    happens once they all realize there's no next work item to process.
+  WaitForCompilationTasks();
 }
 
 void ModuleCompiler::CompileSequentially(ModuleBytesEnv* module_env,
diff --git a/src/wasm/module-compiler.h b/src/wasm/module-compiler.h
index de4948f30e..467977e6ef 100644
--- a/src/wasm/module-compiler.h
+++ b/src/wasm/module-compiler.h
@@ -42,7 +42,8 @@ class ModuleCompiler {
   class CodeGenerationSchedule {
    public:
     explicit CodeGenerationSchedule(
-        base::RandomNumberGenerator* random_number_generator);
+        base::RandomNumberGenerator* random_number_generator,
+        size_t max_memory = 0);
 
     void Schedule(std::unique_ptr<compiler::WasmCompilationUnit>&& item);
 
@@ -50,11 +51,18 @@ class ModuleCompiler {
 
     std::unique_ptr<compiler::WasmCompilationUnit> GetNext();
 
+    bool CanAcceptWork();
+
+    void EnableThrottling() { throttle_ = true; }
+
    private:
     size_t GetRandomIndexInSchedule();
 
     base::RandomNumberGenerator* random_number_generator_ = nullptr;
     std::vector<std::unique_ptr<compiler::WasmCompilationUnit>> schedule_;
+    const size_t max_memory_;
+    bool throttle_ = false;
+    base::AtomicNumber<size_t> allocated_memory_{0};
   };
 
   Isolate* isolate_;
@@ -67,7 +75,7 @@ class ModuleCompiler {
   CodeGenerationSchedule executed_units_;
   base::Mutex result_mutex_;
   base::AtomicNumber<size_t> next_unit_;
-  size_t num_background_tasks_ = 0;
+  const size_t num_background_tasks_;
   // This flag should only be set while holding result_mutex_.
   bool finisher_is_running_ = false;
 
@@ -78,15 +86,19 @@ class ModuleCompiler {
   bool FetchAndExecuteCompilationUnit(
       std::function<void()> no_finisher_callback = [] {});
 
+  void CompileAndSchedule(size_t index);
+  bool GetNextUncompiledFunctionId(size_t* index);
+  void OnBackgroundTaskStopped();
+
   size_t InitializeParallelCompilation(
       const std::vector<WasmFunction>& functions, ModuleBytesEnv& module_env);
 
-  uint32_t* StartCompilationTasks();
+  void RestartCompilationTasks();
 
-  void WaitForCompilationTasks(uint32_t* task_ids);
+  void WaitForCompilationTasks();
 
-  void FinishCompilationUnits(std::vector<Handle<Code>>& results,
-                              ErrorThrower* thrower);
+  size_t FinishCompilationUnits(std::vector<Handle<Code>>& results,
+                                ErrorThrower* thrower);
 
   void SetFinisherIsRunning(bool value);
 
@@ -114,6 +126,10 @@ class ModuleCompiler {
       Vector<const byte> asm_js_offset_table_bytes, Factory* factory,
       WasmInstance* temp_instance, Handle<FixedArray>* function_tables,
       Handle<FixedArray>* signature_tables);
+
+  base::AtomicNumber<size_t> stopped_compilation_tasks_{0};
+  base::Mutex cv_mutex_;
+  base::ConditionVariable compilation_tasks_cv_;
 };
 
 class JSToWasmWrapperCache {