d852ae6f2b
Procs return the result by increasing recursion through result_for. This CL eliminates that mechanism from the Processor interface and uses boolen return values for sending tests to signal success or the failure to load the test into the execution queue. R=machenbach@chromium.org CC=yangguo@chromium.org,sergiyb@chromium.org Bug: v8:8174,v8:8731 Change-Id: I073a86ca84bcf88da11132b90013d4c8455bc61e Reviewed-on: https://chromium-review.googlesource.com/c/1439239 Commit-Queue: Tamer Tas <tmrts@chromium.org> Reviewed-by: Michael Achenbach <machenbach@chromium.org> Reviewed-by: Sergiy Belozorov <sergiyb@chromium.org> Cr-Commit-Position: refs/heads/master@{#59201}
96 lines
2.6 KiB
Python
96 lines
2.6 KiB
Python
# Copyright 2018 the V8 project authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import collections
|
|
import traceback
|
|
|
|
from . import base
|
|
from ..local import pool
|
|
|
|
|
|
# Global function for multiprocessing, because pickling a static method doesn't
|
|
# work on Windows.
|
|
def run_job(job, process_context):
|
|
return job.run(process_context)
|
|
|
|
|
|
def create_process_context(result_reduction):
|
|
return ProcessContext(result_reduction)
|
|
|
|
|
|
JobResult = collections.namedtuple('JobResult', ['id', 'result'])
|
|
ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
|
|
|
|
|
|
class Job(object):
|
|
def __init__(self, test_id, cmd, outproc, keep_output):
|
|
self.test_id = test_id
|
|
self.cmd = cmd
|
|
self.outproc = outproc
|
|
self.keep_output = keep_output
|
|
|
|
def run(self, process_ctx):
|
|
output = self.cmd.execute()
|
|
reduction = process_ctx.result_reduction if not self.keep_output else None
|
|
result = self.outproc.process(output, reduction)
|
|
return JobResult(self.test_id, result)
|
|
|
|
|
|
class ExecutionProc(base.TestProc):
|
|
"""Last processor in the chain. Instead of passing tests further it creates
|
|
commands and output processors, executes them in multiple worker processes and
|
|
sends results to the previous processor.
|
|
"""
|
|
|
|
def __init__(self, jobs, outproc_factory=None):
|
|
super(ExecutionProc, self).__init__()
|
|
self._pool = pool.Pool(jobs)
|
|
self._outproc_factory = outproc_factory or (lambda t: t.output_proc)
|
|
self._tests = {}
|
|
|
|
def connect_to(self, next_proc):
|
|
assert False, 'ExecutionProc cannot be connected to anything'
|
|
|
|
def run(self):
|
|
it = self._pool.imap_unordered(
|
|
fn=run_job,
|
|
gen=[],
|
|
process_context_fn=create_process_context,
|
|
process_context_args=[self._prev_requirement],
|
|
)
|
|
for pool_result in it:
|
|
self._unpack_result(pool_result)
|
|
|
|
def next_test(self, test):
|
|
if self.is_stopped:
|
|
return False
|
|
|
|
test_id = test.procid
|
|
cmd = test.get_command()
|
|
self._tests[test_id] = test, cmd
|
|
|
|
outproc = self._outproc_factory(test)
|
|
self._pool.add([Job(test_id, cmd, outproc, test.keep_output)])
|
|
|
|
return True
|
|
|
|
def result_for(self, test, result):
|
|
assert False, 'ExecutionProc cannot receive results'
|
|
|
|
def stop(self):
|
|
super(ExecutionProc, self).stop()
|
|
self._pool.abort()
|
|
|
|
def _unpack_result(self, pool_result):
|
|
if pool_result.heartbeat:
|
|
self.heartbeat()
|
|
return
|
|
|
|
job_result = pool_result.value
|
|
test_id, result = job_result
|
|
|
|
test, result.cmd = self._tests[test_id]
|
|
del self._tests[test_id]
|
|
self._send_result(test, result)
|