v8/tools/testrunner/local/pool.py
machenbach@chromium.org fc437f4007 Introduce a dynamic process pool for the local test driver
The new process pool allows adding jobs after testing has been started. It will also allow to restructure building the job queue (in a follow up CL), so that testing can start instantly while the queue is being built.

Also attempts to clean up the keyboard-interrupt logic. Idea: Only catch keyboard interrupt once per process at the outermost level. Use proper "finally" clauses to clean up everywhere where a keyboard interrupt might occur. Never turn named exceptions into none-exceptions using anonymous "raise".

TEST=python -m unittest pool_unittest
R=jkummerow@chromium.org

Review URL: https://codereview.chromium.org/275093002

git-svn-id: https://v8.googlecode.com/svn/branches/bleeding_edge@21310 ce2b1a6d-e550-0410-aec6-3dcde31c8c00
2014-05-14 13:30:57 +00:00

137 lines
4.2 KiB
Python

#!/usr/bin/env python
# Copyright 2014 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from multiprocessing import Event, Process, Queue
class NormalResult():
def __init__(self, result):
self.result = result
self.exception = False
self.break_now = False
class ExceptionResult():
def __init__(self):
self.exception = True
self.break_now = False
class BreakResult():
def __init__(self):
self.exception = False
self.break_now = True
def Worker(fn, work_queue, done_queue, done):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
reached or 2. when the event "done" is set."""
try:
for args in iter(work_queue.get, "STOP"):
if done.is_set():
break
try:
done_queue.put(NormalResult(fn(*args)))
except Exception, e:
print(">>> EXCEPTION: %s" % e)
done_queue.put(ExceptionResult())
except KeyboardInterrupt:
done_queue.put(BreakResult())
class Pool():
"""Distributes tasks to a number of worker processes.
New tasks can be added dynamically even after the workers have been started.
Requirement: Tasks can only be added from the parent process, e.g. while
consuming the results generator."""
# Factor to calculate the maximum number of items in the work/done queue.
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
def __init__(self, num_workers):
self.num_workers = num_workers
self.processes = []
self.terminated = False
# Invariant: count >= #work_queue + #done_queue. It is greater when a
# worker takes an item from the work_queue and before the result is
# submitted to the done_queue. It is equal when no worker is working,
# e.g. when all workers have finished, and when no results are processed.
# Count is only accessed by the parent process. Only the parent process is
# allowed to remove items from the done_queue and to add items to the
# work_queue.
self.count = 0
self.work_queue = Queue()
self.done_queue = Queue()
self.done = Event()
def imap_unordered(self, fn, gen):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
the function. Returns a results iterator."""
try:
gen = iter(gen)
self.advance = self._advance_more
for w in xrange(self.num_workers):
p = Process(target=Worker, args=(fn,
self.work_queue,
self.done_queue,
self.done))
self.processes.append(p)
p.start()
self.advance(gen)
while self.count > 0:
result = self.done_queue.get()
self.count -= 1
if result.exception:
# Ignore items with unexpected exceptions.
continue
elif result.break_now:
# A keyboard interrupt happened in one of the worker processes.
raise KeyboardInterrupt
else:
yield result.result
self.advance(gen)
finally:
self.terminate()
def _advance_more(self, gen):
while self.count < self.num_workers * self.BUFFER_FACTOR:
try:
self.work_queue.put(gen.next())
self.count += 1
except StopIteration:
self.advance = self._advance_empty
break
def _advance_empty(self, gen):
pass
def add(self, args):
"""Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered."""
self.work_queue.put(args)
self.count += 1
def terminate(self):
if self.terminated:
return
self.terminated = True
# For exceptional tear down set the "done" event to stop the workers before
# they empty the queue buffer.
self.done.set()
for p in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop.
self.work_queue.put("STOP")
for p in self.processes:
p.join()