Reland "[d8] Remove maximum workers limitation"

This is a reland of a0728e869b

Original change's description:
> [d8] Remove maximum workers limitation
> 
> This CL refactors the lifetime management of the v8::Worker C++ object
> and in the process lifts the 100 maximum worker limitation. To do this,
> it uses a Managed<v8::Worker> heap object and attaches the managed to
> the API worker object.
> 
> R=mstarzinger@chromium.org
> BUG=v8:9524
> 
> Change-Id: I279b7aeb6645a87f9108ee6f572105739721cef4
> Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1715453
> Commit-Queue: Ben Titzer <titzer@chromium.org>
> Reviewed-by: Clemens Hammacher <clemensh@chromium.org>
> Reviewed-by: Michael Starzinger <mstarzinger@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#62932}

Bug: v8:9524
Change-Id: I7d903fb12ddb00909a9429455f46c55db2fd02de
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/1722562
Reviewed-by: Michael Lippautz <mlippautz@chromium.org>
Reviewed-by: Clemens Hammacher <clemensh@chromium.org>
Commit-Queue: Ben Titzer <titzer@chromium.org>
Cr-Commit-Position: refs/heads/master@{#62974}
This commit is contained in:
Ben L. Titzer 2019-07-29 15:09:02 +02:00 committed by Commit Bot
parent d978b5c00c
commit e0b18b9022
37 changed files with 393 additions and 156 deletions

View File

@ -67,6 +67,8 @@ class V8_BASE_EXPORT Mutex final {
return native_handle_;
}
V8_INLINE void AssertHeld() { DCHECK_EQ(1, level_); }
private:
NativeHandle native_handle_;
#ifdef DEBUG

View File

@ -761,13 +761,12 @@ void Thread::set_name(const char* name) {
name_[sizeof(name_) - 1] = '\0';
}
void Thread::Start() {
bool Thread::Start() {
int result;
pthread_attr_t attr;
memset(&attr, 0, sizeof(attr));
result = pthread_attr_init(&attr);
DCHECK_EQ(0, result);
if (result != 0) return false;
size_t stack_size = stack_size_;
if (stack_size == 0) {
#if V8_OS_MACOSX
@ -780,17 +779,17 @@ void Thread::Start() {
}
if (stack_size > 0) {
result = pthread_attr_setstacksize(&attr, stack_size);
DCHECK_EQ(0, result);
if (result != 0) return pthread_attr_destroy(&attr), false;
}
{
MutexGuard lock_guard(&data_->thread_creation_mutex_);
result = pthread_create(&data_->thread_, &attr, ThreadEntry, this);
if (result != 0 || data_->thread_ == kNoThread) {
return pthread_attr_destroy(&attr), false;
}
}
DCHECK_EQ(0, result);
result = pthread_attr_destroy(&attr);
DCHECK_EQ(0, result);
DCHECK_NE(data_->thread_, kNoThread);
USE(result);
return result == 0;
}
void Thread::Join() { pthread_join(data_->thread_, nullptr); }

View File

@ -1352,13 +1352,13 @@ Thread::~Thread() {
// Create a new thread. It is important to use _beginthreadex() instead of
// the Win32 function CreateThread(), because the CreateThread() does not
// initialize thread specific structures in the C runtime library.
void Thread::Start() {
data_->thread_ = reinterpret_cast<HANDLE>(
_beginthreadex(nullptr, static_cast<unsigned>(stack_size_), ThreadEntry,
this, 0, &data_->thread_id_));
bool Thread::Start() {
uintptr_t result = _beginthreadex(nullptr, static_cast<unsigned>(stack_size_),
ThreadEntry, this, 0, &data_->thread_id_);
data_->thread_ = reinterpret_cast<HANDLE>(result);
return result != 0;
}
// Wait for thread to terminate.
void Thread::Join() {
if (data_->thread_id_ != GetCurrentThreadId()) {

View File

@ -333,15 +333,16 @@ class V8_BASE_EXPORT Thread {
virtual ~Thread();
// Start new thread by calling the Run() method on the new thread.
void Start();
V8_WARN_UNUSED_RESULT bool Start();
// Start new thread and wait until Run() method is called on the new thread.
void StartSynchronously() {
bool StartSynchronously() {
start_semaphore_ = new Semaphore(0);
Start();
if (!Start()) return false;
start_semaphore_->Wait();
delete start_semaphore_;
start_semaphore_ = nullptr;
return true;
}
// Wait until thread terminates.

View File

@ -36,6 +36,7 @@
#include "src/init/v8.h"
#include "src/interpreter/interpreter.h"
#include "src/logging/counters.h"
#include "src/objects/managed.h"
#include "src/objects/objects-inl.h"
#include "src/objects/objects.h"
#include "src/parsing/parse-info.h"
@ -76,7 +77,6 @@ namespace {
const int kMB = 1024 * 1024;
const int kMaxWorkers = 100;
const int kMaxSerializerMemoryUsage =
1 * kMB; // Arbitrary maximum for testing.
@ -227,14 +227,13 @@ Worker* GetWorkerFromInternalField(Isolate* isolate, Local<Object> object) {
return nullptr;
}
Worker* worker =
static_cast<Worker*>(object->GetAlignedPointerFromInternalField(0));
if (worker == nullptr) {
i::Handle<i::Object> handle = Utils::OpenHandle(*object->GetInternalField(0));
if (handle->IsSmi()) {
Throw(isolate, "Worker is defunct because main thread is terminating");
return nullptr;
}
return worker;
auto managed = i::Handle<i::Managed<Worker>>::cast(handle);
return managed->raw();
}
base::Thread::Options GetThreadOptions(const char* name) {
@ -333,7 +332,7 @@ const base::TimeTicks Shell::kInitialTicks =
Global<Function> Shell::stringify_function_;
base::LazyMutex Shell::workers_mutex_;
bool Shell::allow_new_workers_ = true;
std::vector<Worker*> Shell::workers_;
std::unordered_set<std::shared_ptr<Worker>> Shell::running_workers_;
std::vector<ExternalizedContents> Shell::externalized_contents_;
std::atomic<bool> Shell::script_executed_{false};
base::LazyMutex Shell::isolate_status_lock_;
@ -485,7 +484,7 @@ bool Shell::ExecuteString(Isolate* isolate, Local<String> source,
} else if (options.stress_background_compile) {
// Start a background thread compiling the script.
BackgroundCompileThread background_compile_thread(isolate, source);
background_compile_thread.Start();
CHECK(background_compile_thread.Start());
// In parallel, compile on the main thread to flush out any data races.
{
@ -1392,30 +1391,36 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) {
return;
}
// Initialize the embedder field to 0; if we return early without
// creating a new Worker (because the main thread is terminating) we can
// early-out from the instance calls.
args.Holder()->SetInternalField(0, v8::Integer::New(isolate, 0));
{
// Don't allow workers to create more workers if the main thread
// is waiting for existing running workers to terminate.
base::MutexGuard lock_guard(workers_mutex_.Pointer());
if (workers_.size() >= kMaxWorkers) {
Throw(args.GetIsolate(), "Too many workers, I won't let you create more");
return;
}
// Initialize the embedder field to nullptr; if we return early without
// creating a new Worker (because the main thread is terminating) we can
// early-out from the instance calls.
args.Holder()->SetAlignedPointerInInternalField(0, nullptr);
if (!allow_new_workers_) return;
Worker* worker = new Worker;
args.Holder()->SetAlignedPointerInInternalField(0, worker);
workers_.push_back(worker);
String::Utf8Value script(args.GetIsolate(), source);
if (!*script) {
Throw(args.GetIsolate(), "Can't get worker script");
return;
}
worker->StartExecuteInThread(*script);
// The C++ worker object's lifetime is shared between the Managed<Worker>
// object on the heap, which the JavaScript object points to, and an
// internal std::shared_ptr in the worker thread itself.
auto worker = std::make_shared<Worker>(*script);
i::Isolate* i_isolate = reinterpret_cast<i::Isolate*>(isolate);
const size_t kWorkerSizeEstimate = 4 * 1024 * 1024; // stack + heap.
i::Handle<i::Object> managed = i::Managed<Worker>::FromSharedPtr(
i_isolate, kWorkerSizeEstimate, worker);
args.Holder()->SetInternalField(0, Utils::ToLocal(managed));
if (!Worker::StartWorkerThread(std::move(worker))) {
Throw(args.GetIsolate(), "Can't start thread");
return;
}
}
}
@ -1475,7 +1480,7 @@ void Shell::QuitOnce(v8::FunctionCallbackInfo<v8::Value>* args) {
int exit_code = (*args)[0]
->Int32Value(args->GetIsolate()->GetCurrentContext())
.FromMaybe(0);
CleanupWorkers();
WaitForRunningWorkers();
args->GetIsolate()->Exit();
OnExit(args->GetIsolate());
base::OS::ExitProcess(exit_code);
@ -2504,7 +2509,7 @@ void SourceGroup::ExecuteInThread() {
void SourceGroup::StartExecuteInThread() {
if (thread_ == nullptr) {
thread_ = new IsolateThread(this);
thread_->Start();
CHECK(thread_->Start());
}
next_semaphore_.Signal();
}
@ -2550,11 +2555,11 @@ void SerializationDataQueue::Clear() {
data_.clear();
}
Worker::Worker()
Worker::Worker(const char* script)
: in_semaphore_(0),
out_semaphore_(0),
thread_(nullptr),
script_(nullptr),
script_(i::StrDup(script)),
running_(false) {}
Worker::~Worker() {
@ -2562,15 +2567,29 @@ Worker::~Worker() {
thread_ = nullptr;
delete[] script_;
script_ = nullptr;
in_queue_.Clear();
out_queue_.Clear();
}
void Worker::StartExecuteInThread(const char* script) {
running_ = true;
script_ = i::StrDup(script);
thread_ = new WorkerThread(this);
thread_->Start();
bool Worker::StartWorkerThread(std::shared_ptr<Worker> worker) {
worker->running_ = true;
auto thread = new WorkerThread(worker);
worker->thread_ = thread;
if (thread->Start()) {
Shell::AddRunningWorker(std::move(worker));
return true;
}
return false;
}
void Worker::WorkerThread::Run() {
// Prevent a lifetime cycle from Worker -> WorkerThread -> Worker.
// We must clear the worker_ field of the thread, but we keep the
// worker alive via a stack root until the thread finishes execution
// and removes itself from the running set. Thereafter the only
// remaining reference can be from a JavaScript object via a Managed.
auto worker = std::move(worker_);
worker_ = nullptr;
worker->ExecuteInThread();
Shell::RemoveRunningWorker(worker);
}
void Worker::PostMessage(std::unique_ptr<SerializationData> data) {
@ -2936,7 +2955,7 @@ int Shell::RunMain(Isolate* isolate, int argc, char* argv[], bool last_run) {
options.isolate_sources[i].WaitForThread();
}
}
CleanupWorkers();
WaitForRunningWorkers();
// In order to finish successfully, success must be != expected_to_throw.
return success == Shell::options.expected_to_throw ? 1 : 0;
}
@ -3267,24 +3286,35 @@ MaybeLocal<Value> Shell::DeserializeValue(
return deserializer.ReadValue(context);
}
void Shell::CleanupWorkers() {
// Make a copy of workers_, because we don't want to call Worker::Terminate
// while holding the workers_mutex_ lock. Otherwise, if a worker is about to
// create a new Worker, it would deadlock.
std::vector<Worker*> workers_copy;
void Shell::AddRunningWorker(std::shared_ptr<Worker> worker) {
workers_mutex_.Pointer()->AssertHeld(); // caller should hold the mutex.
running_workers_.insert(worker);
}
void Shell::RemoveRunningWorker(const std::shared_ptr<Worker>& worker) {
base::MutexGuard lock_guard(workers_mutex_.Pointer());
auto it = running_workers_.find(worker);
if (it != running_workers_.end()) running_workers_.erase(it);
}
void Shell::WaitForRunningWorkers() {
// Make a copy of running_workers_, because we don't want to call
// Worker::Terminate while holding the workers_mutex_ lock. Otherwise, if a
// worker is about to create a new Worker, it would deadlock.
std::unordered_set<std::shared_ptr<Worker>> workers_copy;
{
base::MutexGuard lock_guard(workers_mutex_.Pointer());
allow_new_workers_ = false;
workers_copy.swap(workers_);
workers_copy.swap(running_workers_);
}
for (Worker* worker : workers_copy) {
for (auto& worker : workers_copy) {
worker->WaitForThread();
delete worker;
}
// Now that all workers are terminated, we can re-enable Worker creation.
base::MutexGuard lock_guard(workers_mutex_.Pointer());
DCHECK(running_workers_.empty());
allow_new_workers_ = true;
externalized_contents_.clear();
}

View File

@ -11,6 +11,7 @@
#include <queue>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "src/base/once.h"
@ -207,12 +208,9 @@ class SerializationDataQueue {
class Worker {
public:
Worker();
explicit Worker(const char* script);
~Worker();
// Run the given script on this Worker. This function should only be called
// once, and should only be called by the thread that created the Worker.
void StartExecuteInThread(const char* script);
// Post a message to the worker's incoming message queue. The worker will
// take ownership of the SerializationData.
// This function should only be called by the thread that created the Worker.
@ -231,17 +229,20 @@ class Worker {
// This function can be called by any thread.
void WaitForThread();
// Start running the given worker in another thread.
static bool StartWorkerThread(std::shared_ptr<Worker> worker);
private:
class WorkerThread : public base::Thread {
public:
explicit WorkerThread(Worker* worker)
explicit WorkerThread(std::shared_ptr<Worker> worker)
: base::Thread(base::Thread::Options("WorkerThread")),
worker_(worker) {}
worker_(std::move(worker)) {}
void Run() override { worker_->ExecuteInThread(); }
void Run() override;
private:
Worker* worker_;
std::shared_ptr<Worker> worker_;
};
void ExecuteInThread();
@ -378,7 +379,6 @@ class Shell : public i::AllStatic {
Isolate* isolate, Local<Value> value, Local<Value> transfer);
static MaybeLocal<Value> DeserializeValue(
Isolate* isolate, std::unique_ptr<SerializationData> data);
static void CleanupWorkers();
static int* LookupCounter(const char* name);
static void* CreateHistogram(const char* name, int min, int max,
size_t buckets);
@ -493,6 +493,10 @@ class Shell : public i::AllStatic {
!options.test_shell;
}
static void WaitForRunningWorkers();
static void AddRunningWorker(std::shared_ptr<Worker> worker);
static void RemoveRunningWorker(const std::shared_ptr<Worker>& worker);
private:
static Global<Context> evaluation_context_;
static base::OnceType quit_once_;
@ -509,7 +513,7 @@ class Shell : public i::AllStatic {
static base::LazyMutex workers_mutex_; // Guards the following members.
static bool allow_new_workers_;
static std::vector<Worker*> workers_;
static std::unordered_set<std::shared_ptr<Worker>> running_workers_;
static std::vector<ExternalizedContents> externalized_contents_;
// Multiple isolates may update this flag concurrently.

View File

@ -73,7 +73,7 @@ DefaultWorkerThreadsTaskRunner::WorkerThread::WorkerThread(
DefaultWorkerThreadsTaskRunner* runner)
: Thread(Options("V8 DefaultWorkerThreadsTaskRunner WorkerThread")),
runner_(runner) {
Start();
CHECK(Start());
}
DefaultWorkerThreadsTaskRunner::WorkerThread::~WorkerThread() { Join(); }

View File

@ -12,7 +12,7 @@ namespace platform {
WorkerThread::WorkerThread(TaskQueue* queue)
: Thread(Options("V8 WorkerThread")), queue_(queue) {
Start();
CHECK(Start());
}
WorkerThread::~WorkerThread() {

View File

@ -871,7 +871,7 @@ void Profiler::Engage() {
// Start thread processing the profiler buffer.
base::Relaxed_Store(&running_, 1);
Start();
CHECK(Start());
// Register to get ticks.
Logger* logger = isolate_->logger();

View File

@ -12975,7 +12975,7 @@ void ApiTestFuzzer::SetUp(PartOfTest part) {
RegisterThreadedTest::nth(i)->fuzzer_ = new ApiTestFuzzer(i + start);
}
for (int i = 0; i < active_tests_; i++) {
RegisterThreadedTest::nth(i)->fuzzer_->Start();
CHECK(RegisterThreadedTest::nth(i)->fuzzer_->Start());
}
}
@ -18386,8 +18386,8 @@ TEST(MultipleIsolatesOnIndividualThreads) {
IsolateThread thread2(12);
// Compute some fibonacci numbers on 3 threads in 3 isolates.
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
int result1 = CalcFibonacci(CcTest::isolate(), 21);
int result2 = CalcFibonacci(CcTest::isolate(), 12);
@ -18481,7 +18481,7 @@ class InitDefaultIsolateThread : public v8::base::Thread {
static void InitializeTestHelper(InitDefaultIsolateThread::TestCase testCase) {
InitDefaultIsolateThread thread(testCase);
thread.Start();
CHECK(thread.Start());
thread.Join();
CHECK(thread.result());
}
@ -20746,7 +20746,7 @@ class ThreadInterruptTest {
void RunTest() {
InterruptThread i_thread(this);
i_thread.Start();
CHECK(i_thread.Start());
sem_.Wait();
CHECK_EQ(kExpectedValue, sem_value_);
@ -21009,7 +21009,7 @@ class RegExpInterruptTest {
v8::HandleScope handle_scope(isolate_);
i_thread.SetTestBody(test_body_fn);
i_thread.Start();
CHECK(i_thread.Start());
TestBody();
@ -21213,7 +21213,7 @@ class RequestInterruptTestBaseWithSimpleInterrupt
public:
RequestInterruptTestBaseWithSimpleInterrupt() : i_thread(this) { }
void StartInterruptThread() override { i_thread.Start(); }
void StartInterruptThread() override { CHECK(i_thread.Start()); }
private:
class InterruptThread : public v8::base::Thread {
@ -21444,7 +21444,7 @@ class RequestMultipleInterrupts : public RequestInterruptTestBase {
public:
RequestMultipleInterrupts() : i_thread(this), counter_(0) {}
void StartInterruptThread() override { i_thread.Start(); }
void StartInterruptThread() override { CHECK(i_thread.Start()); }
void TestBody() override {
Local<Function> func = Function::New(env_.local(), ShouldContinueCallback,
@ -24851,7 +24851,7 @@ TEST(FutexInterruption) {
FutexInterruptionThread timeout_thread(isolate);
v8::TryCatch try_catch(CcTest::isolate());
timeout_thread.Start();
CHECK(timeout_thread.Start());
CompileRun(
"var ab = new SharedArrayBuffer(4);"
@ -25268,7 +25268,7 @@ TEST(MemoryPressure) {
LocalContext env;
MemoryPressureThread memory_pressure_thread(
isolate, v8::MemoryPressureLevel::kCritical);
memory_pressure_thread.Start();
CHECK(memory_pressure_thread.Start());
memory_pressure_thread.Join();
// This should trigger GC.
CHECK_EQ(0, counter.NumberOfWeakCalls());
@ -26080,7 +26080,7 @@ void AtomicsWaitCallbackForTesting(
break;
case AtomicsWaitCallbackAction::StopFromThreadAndThrow:
info->stop_thread = v8::base::make_unique<StopAtomicsWaitThread>(info);
info->stop_thread->Start();
CHECK(info->stop_thread->Start());
break;
case AtomicsWaitCallbackAction::KeepWaiting:
break;

View File

@ -148,7 +148,7 @@ TEST(SamplingCircularQueueMultithreading) {
ProducerThread producer3(&scq, kRecordsPerChunk, 20, &semaphore);
CHECK(!scq.Peek());
producer1.Start();
CHECK(producer1.Start());
semaphore.Wait();
for (Record i = 1; i < 1 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.Peek());
@ -160,7 +160,7 @@ TEST(SamplingCircularQueueMultithreading) {
}
CHECK(!scq.Peek());
producer2.Start();
CHECK(producer2.Start());
semaphore.Wait();
for (Record i = 10; i < 10 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.Peek());
@ -172,7 +172,7 @@ TEST(SamplingCircularQueueMultithreading) {
}
CHECK(!scq.Peek());
producer3.Start();
CHECK(producer3.Start());
semaphore.Wait();
for (Record i = 20; i < 20 + kRecordsPerChunk; ++i) {
Record* rec = reinterpret_cast<Record*>(scq.Peek());

View File

@ -85,7 +85,7 @@ TEST(StartStop) {
new SamplingEventsProcessor(isolate, &generator, &code_observer,
v8::base::TimeDelta::FromMicroseconds(100),
true));
processor->Start();
CHECK(processor->Start());
processor->StopSynchronously();
}
@ -170,7 +170,7 @@ TEST(CodeEvents) {
ProfilerEventsProcessor* processor = new SamplingEventsProcessor(
isolate, generator, &code_observer,
v8::base::TimeDelta::FromMicroseconds(100), true);
processor->Start();
CHECK(processor->Start());
ProfilerListener profiler_listener(isolate, processor);
isolate->logger()->AddCodeEventListener(&profiler_listener);
@ -235,7 +235,7 @@ TEST(TickEvents) {
CpuProfiler profiler(isolate, kDebugNaming, kLazyLogging, profiles, generator,
processor);
profiles->StartProfiling("");
processor->Start();
CHECK(processor->Start());
ProfilerListener profiler_listener(isolate, processor);
isolate->logger()->AddCodeEventListener(&profiler_listener);
@ -307,7 +307,7 @@ TEST(Issue1398) {
CpuProfiler profiler(isolate, kDebugNaming, kLazyLogging, profiles, generator,
processor);
profiles->StartProfiling("");
processor->Start();
CHECK(processor->Start());
ProfilerListener profiler_listener(isolate, processor);
profiler_listener.CodeCreateEvent(i::Logger::BUILTIN_TAG, code, "bbb");
@ -1178,7 +1178,7 @@ static void TickLines(bool optimize) {
// This would normally happen automatically with CpuProfiler::StartProfiling
// but doesn't because it's constructed with a generator and a processor.
isolate->logger()->LogCompiledFunctions();
processor->Start();
CHECK(processor->Start());
ProfilerListener profiler_listener(isolate, processor);
// Enqueue code creation events.
@ -3055,8 +3055,8 @@ TEST(MultipleIsolates) {
IsolateThread thread1;
IsolateThread thread2;
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
thread1.Join();
thread2.Join();
@ -3089,7 +3089,7 @@ TEST(LowPrecisionSamplingStartStopInternal) {
new SamplingEventsProcessor(isolate, &generator, &code_observer,
v8::base::TimeDelta::FromMicroseconds(100),
false));
processor->Start();
CHECK(processor->Start());
processor->StopSynchronously();
}

View File

@ -3854,7 +3854,7 @@ TEST(DebugBreakOffThreadTerminate) {
DebugBreakTriggerTerminate delegate;
v8::debug::SetDebugDelegate(isolate, &delegate);
TerminationThread terminator(isolate);
terminator.Start();
CHECK(terminator.Start());
v8::TryCatch try_catch(env->GetIsolate());
env->GetIsolate()->RequestInterrupt(BreakRightNow, nullptr);
CompileRun("while (true);");
@ -3950,7 +3950,7 @@ class ArchiveRestoreThread : public v8::base::Thread,
// on) so that the ThreadManager is forced to archive and restore
// the current thread.
ArchiveRestoreThread child(isolate_, spawn_count_ - 1);
child.Start();
CHECK(child.Start());
child.Join();
// The child thread sets itself as the debug delegate, so we need to

View File

@ -82,7 +82,7 @@ void UnlockForDeoptimization(const v8::FunctionCallbackInfo<v8::Value>& args) {
isolate->Exit();
v8::Unlocker unlocker(isolate);
// Starts the deoptimizing thread.
deoptimizer->Start();
CHECK(deoptimizer->Start());
// Waits for deoptimization to finish.
deoptimizer->Join();
}
@ -107,7 +107,7 @@ void UnlockForDeoptimizationIfReady(
isolate->Exit();
v8::Unlocker unlocker(isolate);
// Starts the thread that deoptimizes the function.
deoptimizer->Start();
CHECK(deoptimizer->Start());
// Waits for the deoptimizing thread to finish.
deoptimizer->Join();
}
@ -339,7 +339,7 @@ TEST(KangarooIsolates) {
CompileRun("function getValue() { return 30; }");
thread1.reset(new KangarooThread(isolate, context));
}
thread1->Start();
CHECK(thread1->Start());
thread1->Join();
}
@ -364,9 +364,7 @@ class JoinableThread {
virtual ~JoinableThread() = default;
void Start() {
thread_.Start();
}
void Start() { CHECK(thread_.Start()); }
void Join() {
semaphore_.Wait();

View File

@ -352,7 +352,7 @@ TEST(simulator_invalidate_exclusive_access_threaded) {
TestData test_data(1);
MemoryAccessThread thread;
thread.Start();
CHECK(thread.Start());
MemoryAccess ldrex_w(Kind::LoadExcl, Size::Word, offsetof(TestData, w));
MemoryAccess strex_w(Kind::StoreExcl, Size::Word, offsetof(TestData, w), 7);

View File

@ -359,7 +359,7 @@ TEST(simulator_invalidate_exclusive_access_threaded) {
TestData test_data(1);
MemoryAccessThread thread;
thread.Start();
CHECK(thread.Start());
MemoryAccess ldaxr_w(Kind::LoadExcl, Size::Word, offsetof(TestData, w));
MemoryAccess stlxr_w(Kind::StoreExcl, Size::Word, offsetof(TestData, w), 7);

View File

@ -179,7 +179,7 @@ class TerminatorThread : public v8::base::Thread {
void TestTerminatingSlowOperation(const char* source) {
semaphore = new v8::base::Semaphore(0);
TerminatorThread thread(CcTest::i_isolate());
thread.Start();
CHECK(thread.Start());
v8::HandleScope scope(CcTest::isolate());
v8::Local<v8::ObjectTemplate> global =
@ -474,7 +474,7 @@ void MicrotaskLoopForever(const v8::FunctionCallbackInfo<v8::Value>& info) {
TEST(TerminateFromOtherThreadWhileMicrotaskRunning) {
semaphore = new v8::base::Semaphore(0);
TerminatorThread thread(CcTest::i_isolate());
thread.Start();
CHECK(thread.Start());
v8::Isolate* isolate = CcTest::isolate();
isolate->SetMicrotasksPolicy(v8::MicrotasksPolicy::kExplicit);
@ -878,7 +878,7 @@ TEST(TerminateRegExp) {
CHECK(!isolate->IsExecutionTerminating());
CHECK(!CompileRun("var re = /(x+)+y$/; re.test('x');").IsEmpty());
TerminatorSleeperThread terminator(isolate, 100);
terminator.Start();
CHECK(terminator.Start());
CHECK(CompileRun("re.test('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'); fail();")
.IsEmpty());
CHECK(try_catch.HasCaught());

View File

@ -54,7 +54,7 @@ class ThreadIdValidationThread : public base::Thread {
}
refs_[thread_no_].store(thread_id, std::memory_order_relaxed);
if (thread_to_start_ != nullptr) {
thread_to_start_->Start();
CHECK(thread_to_start_->Start());
}
semaphore_->Signal();
}
@ -77,7 +77,7 @@ TEST(ThreadIdValidation) {
threads[i] =
base::make_unique<ThreadIdValidationThread>(prev, refs, i, &semaphore);
}
threads[0]->Start();
CHECK(threads[0]->Start());
for (int i = 0; i < kNThreads; i++) {
semaphore.Wait();
}

View File

@ -246,8 +246,8 @@ TEST(JumpTablePatchingStress) {
}
JumpTablePatcher patcher(slot_start, slot, thunk1, thunk2);
global_stop_bit = 0; // Signal runners to keep going.
for (auto& runner : runners) runner.Start();
patcher.Start();
for (auto& runner : runners) CHECK(runner.Start());
CHECK(patcher.Start());
patcher.Join();
global_stop_bit = -1; // Signal runners to stop.
for (auto& runner : runners) runner.Join();

View File

@ -583,7 +583,7 @@ TEST(TestInterruptLoop) {
int32_t* memory_array = reinterpret_cast<int32_t*>(memory->backing_store());
InterruptThread thread(isolate, memory_array);
thread.Start();
CHECK(thread.Start());
testing::RunWasmModuleForTesting(isolate, instance, 0, nullptr);
Address address = reinterpret_cast<Address>(
&memory_array[InterruptThread::interrupt_location_]);

View File

@ -273,8 +273,8 @@ TEST(SharedEngineRunThreadedBuildingSync) {
Handle<WasmInstanceObject> instance = isolate.CompileAndInstantiate(buffer);
CHECK_EQ(42, isolate.Run(instance));
});
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
thread1.Join();
thread2.Join();
}
@ -295,8 +295,8 @@ TEST(SharedEngineRunThreadedBuildingAsync) {
CompileAndInstantiateAsync(isolate, buffer);
CHECK_EQ(42, isolate.Run(instance));
});
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
thread1.Join();
thread2.Join();
}
@ -321,8 +321,8 @@ TEST(SharedEngineRunThreadedExecution) {
Handle<WasmInstanceObject> instance = isolate.ImportInstance(module);
CHECK_EQ(23, isolate.Run(instance));
});
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
thread1.Join();
thread2.Join();
}
@ -358,7 +358,7 @@ TEST(SharedEngineRunThreadedTierUp) {
&module->module()->functions[0], ExecutionTier::kTurbofan);
CHECK_EQ(23, isolate.Run(instance));
});
for (auto& thread : threads) thread.Start();
for (auto& thread : threads) CHECK(thread.Start());
for (auto& thread : threads) thread.Join();
}

View File

@ -43,7 +43,7 @@ TaskRunner::TaskRunner(IsolateData::SetupGlobalTasks setup_global_tasks,
process_queue_semaphore_(0),
nested_loop_count_(0),
is_terminated_(0) {
Start();
CHECK(Start());
}
TaskRunner::~TaskRunner() { Join(); }

View File

@ -0,0 +1,42 @@
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --stress-runs=1
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `postMessage(42)`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Terminate half of the workers early.
for (let i = 0; i < workers.length; i++) {
if ((i & 1) == 1) workers[i].terminate();
}
// Get messages from some workers.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
assertTrue(msg === undefined || msg === 42);
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();

View File

@ -0,0 +1,56 @@
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc --stress-runs=1
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
gc();
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();

View File

@ -0,0 +1,47 @@
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --expose-gc --stress-runs=1
let script = `onmessage =
function(msg) {
if (msg.depth > 0) {
print("spawn");
let w = new Worker(msg.script, {type : "string"});
w.postMessage({script: msg.script, depth: msg.depth - 1});
let m = w.getMessage();
w.terminate();
postMessage(m);
} else {
postMessage(-99);
}
}`;
function RunWorker(depth) {
let w = new Worker(script, {type : "string"});
let array = new Int32Array([55, -77]);
w.postMessage({script: script, depth: depth});
let msg = w.getMessage();
print(msg);
w.terminate();
}
function RunTest(depth, iterations) {
let time = performance.now();
for (let i = 0; i < iterations; i++) {
let now = performance.now();
print(`iteration ${i}, Δ = ${(now - time).toFixed(3)} ms`);
RunWorker(depth);
gc();
time = now;
}
}
// TODO(9524): increase the workload of this test. Runs out of threads
// on too many platforms.
RunTest(1, 1);
RunTest(2, 2);
RunTest(5, 3);
RunTest(9, 2);

View File

@ -0,0 +1,55 @@
// Copyright 2019 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Flags: --stress-runs=1
const kBatchSize = 10;
const kNumBatches = 10;
function RunWorkerBatch(count) {
let script = `onmessage =
function(msg) {
if (msg.array) {
msg.array[0] = 99;
postMessage({array : msg.array});
}
}`;
// Launch workers.
let workers = new Array(count);
for (let i = 0; i < count; i++) {
workers[i] = new Worker(script, {type : 'string'});
}
// Send messages.
for (let i = 0; i < workers.length; i++) {
let array = new Int32Array([55, -77]);
workers[i].postMessage({array : array});
// terminate half of the workers early.
if ((i & 1) == 1) workers[i].terminate();
}
// Wait for replies.
for (let i = 0; i < workers.length; i++) {
let msg = workers[i].getMessage();
if (msg !== undefined && msg.array) {
assertInstanceof(msg.array, Int32Array);
assertEquals(99, msg.array[0]);
assertEquals(-77, msg.array[1]);
}
// terminate all workers.
workers[i].terminate();
}
}
(function RunTest() {
print(`running ${kNumBatches} batches...`);
let time = performance.now();
for (let i = 0; i < kNumBatches; i++) {
let before = performance.now();
RunWorkerBatch(kBatchSize);
let time = performance.now() - before;
print(`batch ${i+1}, Δ = ${(time).toFixed(3)} ms`);
}
})();

View File

@ -194,6 +194,9 @@
'wasm/compare-exchange-stress': [PASS, SLOW, NO_VARIANTS],
'wasm/compare-exchange64-stress': [PASS, SLOW, NO_VARIANTS],
# worker creation/shutdown is very slow in debug mode
'd8/d8-worker-shutdown*': [PASS, ['mode == debug', SLOW], ['no_snap', SKIP]],
# case-insensitive unicode regexp relies on case mapping provided by ICU.
'es6/unicode-regexp-ignore-case': [PASS, ['no_i18n == True', FAIL]],
'es6/unicode-regexp-ignore-case-noi18n': [FAIL, ['no_i18n == True', PASS]],

View File

@ -105,7 +105,7 @@ TEST(AsAtomic8, CompareAndSwap_Concurrent) {
}
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Start();
CHECK(threads[i].Start());
}
for (int i = 0; i < kThreadCount; i++) {
@ -179,7 +179,7 @@ TEST(AsAtomicWord, SetBits_Concurrent) {
threads[i].Initialize(&word, i * 2);
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Start();
CHECK(threads[i].Start());
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Join();

View File

@ -64,7 +64,7 @@ TEST(ConditionVariable, MultipleThreadsWithSeparateConditionVariables) {
MutexGuard lock_guard(&threads[n].mutex_);
EXPECT_FALSE(threads[n].running_);
EXPECT_FALSE(threads[n].finished_);
threads[n].Start();
CHECK(threads[n].Start());
// Wait for nth thread to start.
while (!threads[n].running_) {
threads[n].cv_.Wait(&threads[n].mutex_);
@ -153,7 +153,7 @@ TEST(ConditionVariable, MultipleThreadsWithSharedSeparateConditionVariables) {
for (int n = 0; n < kThreadCount; ++n) {
EXPECT_FALSE(threads[n].running_);
EXPECT_FALSE(threads[n].finished_);
threads[n].Start();
CHECK(threads[n].Start());
}
}
@ -281,7 +281,7 @@ TEST(ConditionVariable, LoopIncrement) {
// Start all threads.
for (int n = thread_count - 1; n >= 0; --n) {
threads[n]->Start();
CHECK(threads[n]->Start());
}
// Join and cleanup all threads.

View File

@ -79,7 +79,7 @@ class ThreadLocalStorageTest : public Thread, public ::testing::Test {
TEST_F(ThreadLocalStorageTest, DoTest) {
Run();
Start();
CHECK(Start());
Join();
}

View File

@ -94,8 +94,8 @@ TEST(Semaphore, ProducerConsumer) {
Semaphore used_space(0);
ProducerThread producer_thread(buffer, &free_space, &used_space);
ConsumerThread consumer_thread(buffer, &free_space, &used_space);
producer_thread.Start();
consumer_thread.Start();
CHECK(producer_thread.Start());
CHECK(consumer_thread.Start());
producer_thread.Join();
consumer_thread.Join();
}
@ -106,8 +106,8 @@ TEST(Semaphore, WaitAndSignal) {
WaitAndSignalThread t1(&semaphore);
WaitAndSignalThread t2(&semaphore);
t1.Start();
t2.Start();
CHECK(t1.Start());
CHECK(t2.Start());
// Make something available.
semaphore.Signal();

View File

@ -69,12 +69,12 @@ TEST(DateCache, AdoptDefaultFirst) {
// We finish all the operation AdoptDefaultThread before
// running all other thread so it won't show the problem of
// AdoptDefault trashing newly create default.
t1.Start();
CHECK(t1.Start());
t1.Join();
t2.Start();
t3.Start();
t4.Start();
CHECK(t2.Start());
CHECK(t3.Start());
CHECK(t4.Start());
t2.Join();
t3.Join();
@ -92,10 +92,10 @@ TEST(DateCache, AdoptDefaultMixed) {
// it will cause crash in other thread because the TimeZone
// newly created by createDefault could be trashed by AdoptDefault
// while a deleted DEFAULT_ZONE got cloned.
t1.Start();
t2.Start();
t3.Start();
t4.Start();
CHECK(t1.Start());
CHECK(t2.Start());
CHECK(t3.Start());
CHECK(t4.Start());
t1.Join();
t2.Join();

View File

@ -57,7 +57,7 @@ TEST(OneshotBarrier, DoneAfterWait_Concurrent) {
barrier.Start();
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Start();
CHECK(threads[i].Start());
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Join();
@ -80,7 +80,7 @@ TEST(OneshotBarrier, EarlyFinish_Concurrent) {
barrier.Start();
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Start();
CHECK(threads[i].Start());
}
for (int i = 0; i < kThreadCount; i++) {
threads[i].Join();
@ -133,7 +133,7 @@ TEST(OneshotBarrier, Processing_Concurrent) {
barrier.Start();
barrier.Start();
EXPECT_FALSE(barrier.DoneForTesting());
counting_thread.Start();
CHECK(counting_thread.Start());
for (size_t i = 0; i < kWorkCounter; i++) {
{

View File

@ -460,8 +460,8 @@ TEST_F(GCTracerTest, MultithreadedBackgroundScope) {
ThreadWithBackgroundScope thread1(tracer);
ThreadWithBackgroundScope thread2(tracer);
tracer->ResetForTesting();
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
tracer->FetchBackgroundMarkCompactCounters();
thread1.Join();
thread2.Join();

View File

@ -51,8 +51,8 @@ TEST(TaskQueueTest, TerminateMultipleReaders) {
TaskQueue queue;
TaskQueueThread thread1(&queue);
TaskQueueThread thread2(&queue);
thread1.Start();
thread2.Start();
CHECK(thread1.Start());
CHECK(thread2.Start());
queue.Terminate();
thread1.Join();
thread2.Join();

View File

@ -160,8 +160,8 @@ TEST_F(CancelableTaskManagerTest, ThreadedMultipleTasksStarted) {
ResultType result2{0};
ThreadedRunner runner1(NewTask(&result1, TestTask::kWaitTillCancelTriggered));
ThreadedRunner runner2(NewTask(&result2, TestTask::kWaitTillCancelTriggered));
runner1.Start();
runner2.Start();
CHECK(runner1.Start());
CHECK(runner2.Start());
// Busy wait on result to make sure both tasks are done.
while (result1.load() == 0 || result2.load() == 0) {
}
@ -179,8 +179,8 @@ TEST_F(CancelableTaskManagerTest, ThreadedMultipleTasksNotRun) {
ThreadedRunner runner2(NewTask(&result2, TestTask::kCheckNotRun));
CancelAndWait();
// Tasks are canceled, hence the runner will bail out and not update result.
runner1.Start();
runner2.Start();
CHECK(runner1.Start());
CHECK(runner2.Start());
runner1.Join();
runner2.Join();
EXPECT_EQ(0u, result1);
@ -193,7 +193,7 @@ TEST_F(CancelableTaskManagerTest, RemoveBeforeCancelAndWait) {
CancelableTaskManager::Id id = runner1.task_id();
EXPECT_EQ(1u, id);
EXPECT_EQ(TryAbortResult::kTaskAborted, manager()->TryAbort(id));
runner1.Start();
CHECK(runner1.Start());
runner1.Join();
CancelAndWait();
EXPECT_EQ(0u, result1);
@ -204,7 +204,7 @@ TEST_F(CancelableTaskManagerTest, RemoveAfterCancelAndWait) {
ThreadedRunner runner1(NewTask(&result1));
CancelableTaskManager::Id id = runner1.task_id();
EXPECT_EQ(1u, id);
runner1.Start();
CHECK(runner1.Start());
runner1.Join();
CancelAndWait();
EXPECT_EQ(TryAbortResult::kTaskRemoved, manager()->TryAbort(id));
@ -231,8 +231,8 @@ TEST_F(CancelableTaskManagerTest, ThreadedMultipleTasksNotRunTryAbortAll) {
ThreadedRunner runner2(NewTask(&result2, TestTask::kCheckNotRun));
EXPECT_EQ(TryAbortResult::kTaskAborted, TryAbortAll());
// Tasks are canceled, hence the runner will bail out and not update result.
runner1.Start();
runner2.Start();
CHECK(runner1.Start());
CHECK(runner2.Start());
runner1.Join();
runner2.Join();
EXPECT_EQ(0u, result1);
@ -245,7 +245,7 @@ TEST_F(CancelableTaskManagerTest, ThreadedMultipleTasksStartedTryAbortAll) {
ResultType result2{0};
ThreadedRunner runner1(NewTask(&result1, TestTask::kWaitTillCancelTriggered));
ThreadedRunner runner2(NewTask(&result2, TestTask::kWaitTillCancelTriggered));
runner1.Start();
CHECK(runner1.Start());
// Busy wait on result to make sure task1 is done.
while (result1.load() == 0) {
}
@ -255,7 +255,7 @@ TEST_F(CancelableTaskManagerTest, ThreadedMultipleTasksStartedTryAbortAll) {
EXPECT_THAT(TryAbortAll(),
testing::AnyOf(testing::Eq(TryAbortResult::kTaskAborted),
testing::Eq(TryAbortResult::kTaskRunning)));
runner2.Start();
CHECK(runner2.Start());
runner1.Join();
runner2.Join();
EXPECT_EQ(1u, result1);

View File

@ -465,7 +465,7 @@ TEST_P(TrapHandlerTest, TestCrashInOtherThread) {
CHECK(!GetThreadInWasmFlag());
// Set the thread-in-wasm flag manually in this thread.
*trap_handler::GetThreadInWasmThreadLocalAddress() = 1;
runner.Start();
CHECK(runner.Start());
runner.Join();
CHECK(GetThreadInWasmFlag());
// Reset the thread-in-wasm flag.