[test] Added test processor documentation.

Bug: v8:6917
Change-Id: I6ecfab76e6d2ee0b4ab86380e8cfbb68df07599e
Reviewed-on: https://chromium-review.googlesource.com/852295
Commit-Queue: Michał Majewski <majeski@google.com>
Reviewed-by: Michael Achenbach <machenbach@chromium.org>
Reviewed-by: Sergiy Byelozyorov <sergiyb@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50457}
This commit is contained in:
Michal Majewski 2018-01-09 14:56:18 +01:00 committed by Commit Bot
parent 1d9035ab3b
commit cfd43ee713
3 changed files with 70 additions and 0 deletions

View File

@ -3,19 +3,55 @@
# found in the LICENSE file.
"""
Pipeline
Test processors are chained together and communicate with each other by
calling previous/next processor in the chain.
----next_test()----> ----next_test()---->
Proc1 Proc2 Proc3
<---result_for()---- <---result_for()----
Subtests
When test processor needs to modify the test or create some variants of the
test it creates subtests and sends them to the next processor.
Each subtest has:
- procid - globally unique id that should contain id of the parent test and
some suffix given by test processor, e.g. its name + subtest type.
- processor - which created it
- origin - pointer to the parent (sub)test
"""
class TestProc(object):
def __init__(self):
self._prev_proc = None
self._next_proc = None
def connect_to(self, next_proc):
"""Puts `next_proc` after itself in the chain."""
next_proc._prev_proc = self
self._next_proc = next_proc
def next_test(self, test):
"""
Method called by previous processor whenever it produces new test.
This method shouldn't be called by anyone except previous processor.
"""
raise NotImplementedError()
def result_for(self, test, result, is_last):
"""
Method called by next processor whenever it has result for some test.
This method shouldn't be called by anyone except next processor.
Args:
test: test for which the `result` is
result: result of calling test's outproc on the output
is_last: for each test we've passed next processor may create subtests
and pass results for all of them. `is_last` is set when it
won't send any more results for subtests based on the `test`.
"""
raise NotImplementedError()
def heartbeat(self):
@ -24,14 +60,18 @@ class TestProc(object):
### Communication
def _send_test(self, test):
"""Helper method for sending test to the next processor."""
return self._next_proc.next_test(test)
def _send_result(self, test, result, is_last=True):
"""Helper method for sending result to the previous processor."""
return self._prev_proc.result_for(test, result, is_last=is_last)
class TestProcObserver(TestProc):
"""Processor used for observing the data."""
def next_test(self, test):
self._on_next_test(test)
self._send_test(test)
@ -45,9 +85,13 @@ class TestProcObserver(TestProc):
super(TestProcObserver, self).heartbeat()
def _on_next_test(self, test):
"""Method called after receiving test from previous processor but before
sending it to the next one."""
pass
def _on_result_for(self, test, result, is_last):
"""Method called after receiving result from next processor but before
sending it to the previous one."""
pass
def _on_heartbeat(self):
@ -55,6 +99,8 @@ class TestProcObserver(TestProc):
class TestProcProducer(TestProc):
"""Processor for creating subtests."""
def __init__(self, name):
super(TestProcProducer, self).__init__()
self._name = name
@ -71,13 +117,25 @@ class TestProcProducer(TestProc):
raise NotImplementedError()
def _result_for(self, test, subtest, result, is_last):
"""
result_for method extended with `subtest` parameter.
Args
test: test used by current processor to create the subtest.
subtest: test for which the `result` is.
other arguments are the same as for TestProc.result_for()
"""
raise NotImplementedError()
### Managing subtests
def _create_subtest(self, test, subtest_id):
"""Creates subtest with subtest id <processor name>-`subtest_id`."""
return test.create_subtest(self, '%s-%s' % (self._name, subtest_id))
def _get_subtest_origin(self, subtest):
"""Returns parent test that current processor used to create the subtest.
None if there is no parent created by the current processor.
"""
while subtest.processor and subtest.processor is not self:
subtest = subtest.origin
return subtest.origin

View File

@ -30,6 +30,11 @@ class Job(object):
class ExecutionProc(base.TestProc):
"""Last processor in the chain. Instead of passing tests further it creates
commands and output processors, executes them in multiple worker processes and
sends results to the previous processor.
"""
def __init__(self, jobs, context):
super(ExecutionProc, self).__init__()
self._pool = pool.Pool(jobs)

View File

@ -6,6 +6,9 @@ from . import base
class LoadProc(base.TestProc):
"""First processor in the chain that passes all tests to the next processor.
"""
def load_tests(self, tests):
loaded = set()
for test in tests:
@ -16,5 +19,9 @@ class LoadProc(base.TestProc):
loaded.add(test.procid)
self._send_test(test)
def next_test(self, test):
assert False, 'Nothing can be connected to the LoadProc'
def result_for(self, test, result, is_last):
# Ignore all results.
pass