v8/tools/testrunner/local/pool.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

217 lines
6.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from Queue import Empty
from multiprocessing import Event, Process, Queue
import traceback
def setup_testing():
"""For testing only: Use threading under the hood instead of multiprocessing
to make coverage work.
"""
global Queue
global Event
global Process
del Queue
del Event
del Process
from Queue import Queue
from threading import Event
from threading import Thread as Process
class NormalResult():
def __init__(self, result):
self.result = result
self.exception = False
self.break_now = False
class ExceptionResult():
def __init__(self):
self.exception = True
self.break_now = False
class BreakResult():
def __init__(self):
self.exception = False
self.break_now = True
class MaybeResult():
def __init__(self, heartbeat, value):
self.heartbeat = heartbeat
self.value = value
@staticmethod
def create_heartbeat():
return MaybeResult(True, None)
@staticmethod
def create_result(value):
return MaybeResult(False, value)
def Worker(fn, work_queue, done_queue, done,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
reached or 2. when the event "done" is set."""
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if done.is_set():
break
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
except Exception, e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult())
except KeyboardInterrupt:
done_queue.put(BreakResult())
class Pool():
"""Distributes tasks to a number of worker processes.
New tasks can be added dynamically even after the workers have been started.
Requirement: Tasks can only be added from the parent process, e.g. while
consuming the results generator."""
# Factor to calculate the maximum number of items in the work/done queue.
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self, num_workers, heartbeat_timeout=30):
self.num_workers = num_workers
self.processes = []
self.terminated = False
# Invariant: count >= #work_queue + #done_queue. It is greater when a
# worker takes an item from the work_queue and before the result is
# submitted to the done_queue. It is equal when no worker is working,
# e.g. when all workers have finished, and when no results are processed.
# Count is only accessed by the parent process. Only the parent process is
# allowed to remove items from the done_queue and to add items to the
# work_queue.
self.count = 0
self.work_queue = Queue()
self.done_queue = Queue()
self.done = Event()
self.heartbeat_timeout = heartbeat_timeout
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
the function. Returns a results iterator. A result value of type
MaybeResult either indicates a heartbeat of the runner, i.e. indicating
that the runner is still waiting for the result to be computed, or it wraps
the real result.
Args:
process_context_fn: Function executed once by each worker. Expected to
return a process-context object. If present, this object is passed
as additional argument to each call to fn.
process_context_args: List of arguments for the invocation of
process_context_fn. All arguments will be pickled and sent beyond the
process boundary.
"""
try:
internal_error = False
gen = iter(gen)
self.advance = self._advance_more
for w in xrange(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.done,
process_context_fn,
process_context_args))
Revert "Reland "[test] Creating command before execution phase."" This reverts commit 3b065110528562a53533bef516237bf466e39096. Reason for revert: Broke fuzzers: https://build.chromium.org/p/client.v8.clusterfuzz/builders/V8%20Deopt%20Fuzzer/builds/384 Original change's description: > Reland "[test] Creating command before execution phase." > > This is a reland of 98cc9e862f9e8c0a8cde95e7b42beb808b88dab6 > Original change's description: > > [test] Creating command before execution phase. > > > > Immutable command class with shell, flags and > > environment. > > > > Command creation moved from worker to the main > > process. Because of that there is no need to send > > test cases beyond process boundaries and load test > > suites in worker processes. > > > > Bug: v8:6917 > > Change-Id: Ib6a44278095b4f7141eb9b96802fe3e8117678a6 > > Reviewed-on: https://chromium-review.googlesource.com/791710 > > Commit-Queue: Michał Majewski <majeski@google.com> > > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > > Cr-Commit-Position: refs/heads/master@{#49746} > > Bug: v8:6917 > Change-Id: I49c29a8db813c47909f2cc45070ac7721a447c7a > Reviewed-on: https://chromium-review.googlesource.com/800370 > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > Commit-Queue: Michał Majewski <majeski@google.com> > Cr-Commit-Position: refs/heads/master@{#49756} TBR=machenbach@chromium.org,sergiyb@chromium.org,majeski@google.com # Not skipping CQ checks because original CL landed > 1 day ago. Bug: v8:6917 Change-Id: I4938642c4396366be1e13daf6998c4b8538b688b Reviewed-on: https://chromium-review.googlesource.com/804254 Reviewed-by: Michael Achenbach <machenbach@chromium.org> Commit-Queue: Michael Achenbach <machenbach@chromium.org> Cr-Commit-Position: refs/heads/master@{#49805}
2017-12-01 18:46:54 +00:00
p.start()
Reland "Reland "[test] Creating command before execution phase."" This is a reland of 3b065110528562a53533bef516237bf466e39096 Original change's description: > Reland "[test] Creating command before execution phase." > > This is a reland of 98cc9e862f9e8c0a8cde95e7b42beb808b88dab6 > Original change's description: > > [test] Creating command before execution phase. > > > > Immutable command class with shell, flags and > > environment. > > > > Command creation moved from worker to the main > > process. Because of that there is no need to send > > test cases beyond process boundaries and load test > > suites in worker processes. > > > > Bug: v8:6917 > > Change-Id: Ib6a44278095b4f7141eb9b96802fe3e8117678a6 > > Reviewed-on: https://chromium-review.googlesource.com/791710 > > Commit-Queue: Michał Majewski <majeski@google.com> > > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > > Cr-Commit-Position: refs/heads/master@{#49746} > > Bug: v8:6917 > Change-Id: I49c29a8db813c47909f2cc45070ac7721a447c7a > Reviewed-on: https://chromium-review.googlesource.com/800370 > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > Commit-Queue: Michał Majewski <majeski@google.com> > Cr-Commit-Position: refs/heads/master@{#49756} Bug: v8:6917 Change-Id: Ia39010a0a0f63537ad12490dfab17897d70d4930 Reviewed-on: https://chromium-review.googlesource.com/806034 Reviewed-by: Michael Achenbach <machenbach@chromium.org> Commit-Queue: Michał Majewski <majeski@google.com> Cr-Commit-Position: refs/heads/master@{#49830}
2017-11-30 12:57:45 +00:00
self.processes.append(p)
self.advance(gen)
while self.count > 0:
while True:
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
break
except Empty:
# Indicate a heartbeat. The iterator will continue fetching the
# next result.
yield MaybeResult.create_heartbeat()
self.count -= 1
if result.exception:
# TODO(machenbach): Handle a few known types of internal errors
# gracefully, e.g. missing test files.
internal_error = True
continue
elif result.break_now:
# A keyboard interrupt happened in one of the worker processes.
raise KeyboardInterrupt
else:
yield MaybeResult.create_result(result.result)
self.advance(gen)
Reland "Reland "[test] Creating command before execution phase."" This is a reland of 3b065110528562a53533bef516237bf466e39096 Original change's description: > Reland "[test] Creating command before execution phase." > > This is a reland of 98cc9e862f9e8c0a8cde95e7b42beb808b88dab6 > Original change's description: > > [test] Creating command before execution phase. > > > > Immutable command class with shell, flags and > > environment. > > > > Command creation moved from worker to the main > > process. Because of that there is no need to send > > test cases beyond process boundaries and load test > > suites in worker processes. > > > > Bug: v8:6917 > > Change-Id: Ib6a44278095b4f7141eb9b96802fe3e8117678a6 > > Reviewed-on: https://chromium-review.googlesource.com/791710 > > Commit-Queue: Michał Majewski <majeski@google.com> > > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > > Cr-Commit-Position: refs/heads/master@{#49746} > > Bug: v8:6917 > Change-Id: I49c29a8db813c47909f2cc45070ac7721a447c7a > Reviewed-on: https://chromium-review.googlesource.com/800370 > Reviewed-by: Michael Achenbach <machenbach@chromium.org> > Commit-Queue: Michał Majewski <majeski@google.com> > Cr-Commit-Position: refs/heads/master@{#49756} Bug: v8:6917 Change-Id: Ia39010a0a0f63537ad12490dfab17897d70d4930 Reviewed-on: https://chromium-review.googlesource.com/806034 Reviewed-by: Michael Achenbach <machenbach@chromium.org> Commit-Queue: Michał Majewski <majeski@google.com> Cr-Commit-Position: refs/heads/master@{#49830}
2017-11-30 12:57:45 +00:00
except KeyboardInterrupt:
raise
except Exception as e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
finally:
self.terminate()
if internal_error:
raise Exception("Internal error in a worker process.")
def _advance_more(self, gen):
while self.count < self.num_workers * self.BUFFER_FACTOR:
try:
self.work_queue.put(gen.next())
self.count += 1
except StopIteration:
self.advance = self._advance_empty
break
def _advance_empty(self, gen):
pass
def add(self, args):
"""Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered."""
self.work_queue.put(args)
self.count += 1
def terminate(self):
if self.terminated:
return
self.terminated = True
# For exceptional tear down set the "done" event to stop the workers before
# they empty the queue buffer.
self.done.set()
for p in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop.
self.work_queue.put("STOP")
for p in self.processes:
p.join()
# Drain the queues to prevent failures when queues are garbage collected.
try:
while True: self.work_queue.get(False)
except:
pass
try:
while True: self.done_queue.get(False)
except:
pass