[test] Simplify passing results between test processors.

Bug: v8:6917
Change-Id: Id73e4892a0d1b3b9c5bdd70ccc136e7bd2edf360
Cq-Include-Trybots: luci.v8.try:v8_linux64_fyi_rel_ng
Reviewed-on: https://chromium-review.googlesource.com/863603
Commit-Queue: Michał Majewski <majeski@google.com>
Reviewed-by: Michael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/master@{#50565}
This commit is contained in:
Michal Majewski 2018-01-15 09:03:48 +01:00 committed by Commit Bot
parent d557e7d412
commit 60c17bf983
9 changed files with 128 additions and 83 deletions

View File

@ -4,6 +4,7 @@
from ..local import statusfile
from ..outproc import base as outproc_base
from ..testproc.result import Result
# Only check the exit code of the predictable_wrapper in
@ -31,7 +32,7 @@ class OutProc(outproc_base.BaseOutProc):
self._outproc = _outproc
def process(self, output):
return outproc_base.Result(self.has_unexpected_output(output), output)
return Result(self.has_unexpected_output(output), output)
def has_unexpected_output(self, output):
return output.exit_code != 0

View File

@ -6,13 +6,12 @@ import collections
import itertools
from ..local import statusfile
from ..testproc.result import Result
OUTCOMES_PASS = [statusfile.PASS]
OUTCOMES_FAIL = [statusfile.FAIL]
Result = collections.namedtuple('Result', ['has_unexpected_output', 'output'])
class BaseOutProc(object):
def process(self, output):

View File

@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from .result import SKIPPED
"""
Pipeline
@ -12,6 +14,12 @@ calling previous/next processor in the chain.
Proc1 Proc2 Proc3
<---result_for()---- <---result_for()----
For every next_test there is exactly one result_for call.
If processor ignores the test it has to return SkippedResult.
If it created multiple subtests for one test and wants to pass all of them to
the previous processor it can enclose them in GroupedResult.
Subtests
When test processor needs to modify the test or create some variants of the
@ -38,21 +46,13 @@ class TestProc(object):
"""
Method called by previous processor whenever it produces new test.
This method shouldn't be called by anyone except previous processor.
Returns: bool whether test will be processed.
"""
raise NotImplementedError()
def result_for(self, test, result, is_last):
def result_for(self, test, result):
"""
Method called by next processor whenever it has result for some test.
This method shouldn't be called by anyone except next processor.
Args:
test: test for which the `result` is
result: result of calling test's outproc on the output
is_last: for each test we've passed next processor may create subtests
and pass results for all of them. `is_last` is set when it
won't send any more results for subtests based on the `test`.
"""
raise NotImplementedError()
@ -61,13 +61,14 @@ class TestProc(object):
self._prev_proc.heartbeat()
### Communication
def _send_test(self, test):
"""Helper method for sending test to the next processor."""
return self._next_proc.next_test(test)
self._next_proc.next_test(test)
def _send_result(self, test, result, is_last=True):
def _send_result(self, test, result):
"""Helper method for sending result to the previous processor."""
self._prev_proc.result_for(test, result, is_last=is_last)
self._prev_proc.result_for(test, result)
@ -76,11 +77,11 @@ class TestProcObserver(TestProc):
def next_test(self, test):
self._on_next_test(test)
return self._send_test(test)
self._send_test(test)
def result_for(self, test, result, is_last):
self._on_result_for(test, result, is_last)
self._send_result(test, result, is_last)
def result_for(self, test, result):
self._on_result_for(test, result)
self._send_result(test, result)
def heartbeat(self):
self._on_heartbeat()
@ -91,7 +92,7 @@ class TestProcObserver(TestProc):
sending it to the next one."""
pass
def _on_result_for(self, test, result, is_last):
def _on_result_for(self, test, result):
"""Method called after receiving result from next processor but before
sending it to the previous one."""
pass
@ -108,24 +109,23 @@ class TestProcProducer(TestProc):
self._name = name
def next_test(self, test):
return self._next_test(test)
self._next_test(test)
def result_for(self, subtest, result, is_last):
test = self._get_subtest_origin(subtest)
self._result_for(test, subtest, result, is_last)
def result_for(self, subtest, result):
self._result_for(subtest.origin, subtest, result)
### Implementation
def _next_test(self, test):
raise NotImplementedError()
def _result_for(self, test, subtest, result, is_last):
def _result_for(self, test, subtest, result):
"""
result_for method extended with `subtest` parameter.
Args
test: test used by current processor to create the subtest.
subtest: test for which the `result` is.
other arguments are the same as for TestProc.result_for()
result: subtest execution result created by the output processor.
"""
raise NotImplementedError()
@ -135,23 +135,18 @@ class TestProcProducer(TestProc):
return test.create_subtest(self, '%s-%s' % (self._name, subtest_id),
**kwargs)
def _get_subtest_origin(self, subtest):
"""Returns parent test that current processor used to create the subtest.
None if there is no parent created by the current processor.
"""
while subtest.processor and subtest.processor is not self:
subtest = subtest.origin
return subtest.origin
class TestProcFilter(TestProc):
"""Processor for filtering tests."""
def next_test(self, test):
return not self._filter(test) and self._send_test(test)
if self._filter(test):
self._send_result(test, SKIPPED)
else:
self._send_test(test)
def result_for(self, test, result, is_last):
self._send_result(test, result, is_last)
def result_for(self, test, result):
self._send_result(test, result)
def _filter(self, test):
"""Returns whether test should be filtered out."""

View File

@ -82,7 +82,5 @@ class ExecutionProc(base.TestProc):
outproc = test.output_proc
self._pool.add([Job(test_id, test.cmd, outproc)])
return True
def result_for(self, test, result, is_last):
def result_for(self, test, result):
assert False, 'ExecutionProc cannot receive results'

View File

@ -22,6 +22,6 @@ class LoadProc(base.TestProc):
def next_test(self, test):
assert False, 'Nothing can be connected to the LoadProc'
def result_for(self, test, result, is_last):
def result_for(self, test, result):
# Ignore all results.
pass

View File

@ -34,10 +34,9 @@ class ResultsTracker(base.TestProcObserver):
self.total += 1
self.remaining += 1
def _on_result_for(self, test, result, is_last):
if not is_last and not self.count_subtests:
return
def _on_result_for(self, test, result):
# TODO(majeski): Count grouped results when count_subtests is set.
# TODO(majeski): Support for dummy/grouped results
self.remaining -= 1
if result.has_unexpected_output:
self.failed += 1
@ -61,7 +60,8 @@ class SimpleProgressIndicator(ProgressIndicator):
def _on_next_test(self, test):
self._total += 1
def _on_result_for(self, test, result, is_last):
def _on_result_for(self, test, result):
# TODO(majeski): Support for dummy/grouped results
if result.has_unexpected_output:
self._failed.append((test, result.output))
@ -100,8 +100,9 @@ class SimpleProgressIndicator(ProgressIndicator):
class VerboseProgressIndicator(SimpleProgressIndicator):
def _on_result_for(self, test, result, is_last):
super(VerboseProgressIndicator, self)._on_result_for(test, result, is_last)
def _on_result_for(self, test, result):
super(VerboseProgressIndicator, self)._on_result_for(test, result)
# TODO(majeski): Support for dummy/grouped results
if result.has_unexpected_output:
if result.output.HasCrashed():
outcome = 'CRASH'
@ -122,7 +123,8 @@ class DotsProgressIndicator(SimpleProgressIndicator):
super(DotsProgressIndicator, self).__init__()
self._count = 0
def _on_result_for(self, test, result, is_last):
def _on_result_for(self, test, result):
# TODO(majeski): Support for dummy/grouped results
self._count += 1
if self._count > 1 and self._count % 50 == 1:
sys.stdout.write('\n')
@ -155,11 +157,8 @@ class CompactProgressIndicator(ProgressIndicator):
def _on_next_test(self, test):
self._total += 1
def _on_result_for(self, test, result, is_last):
if not is_last:
# Some processor further in the chain created several subtests of one
# test, so lets add them to the total amount.
self._total += 1
def _on_result_for(self, test, result):
# TODO(majeski): Support for dummy/grouped results
if result.has_unexpected_output:
self._failed += 1
else:
@ -257,7 +256,8 @@ class JUnitTestProgressIndicator(ProgressIndicator):
else:
self.outfile = sys.stdout
def _on_result_for(self, test, result, is_last):
def _on_result_for(self, test, result):
# TODO(majeski): Support for dummy/grouped results
fail_text = ""
output = result.output
if result.has_unexpected_output:
@ -294,7 +294,8 @@ class JsonTestProgressIndicator(ProgressIndicator):
self.results = []
self.tests = []
def _on_result_for(self, test, result, is_last):
def _on_result_for(self, test, result):
# TODO(majeski): Support for dummy/grouped results
output = result.output
# Buffer all tests for sorting the durations in the end.
self.tests.append((test, output.duration))

View File

@ -15,15 +15,8 @@ class RerunProc(base.TestProcProducer):
def _next_test(self, test):
self._init_test(test)
self._send_next_subtest(test)
return True
def _result_for(self, test, subtest, result, is_last):
# Rerun processor cannot be placed before any processor that produces more
# than one subtest per test.
# TODO(majeski): Introduce constraints and check them during pipeline
# creation to avoid asserts like that.
assert is_last
def _result_for(self, test, subtest, result):
if self._needs_rerun(test, result):
self._rerun[test.procid] += 1
if self._rerun_total_left is not None:

View File

@ -0,0 +1,60 @@
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
class ResultBase(object):
@property
def is_skipped(self):
return False
@property
def is_grouped(self):
return False
class Result(ResultBase):
"""Result created by the output processor."""
def __init__(self, has_unexpected_output, output):
self.has_unexpected_output = has_unexpected_output
self.output = output
class GroupedResult(ResultBase):
"""Result consisting of multiple results. It can be used by processors that
create multiple subtests for each test and want to pass all results back.
"""
@staticmethod
def create(results):
"""Create grouped result from the list of results. It filters out skipped
results. If all results are skipped results it returns skipped result.
Args:
results: list of pairs (test, result)
"""
results = [(t, r) for (t, r) in results if not r.is_skipped]
if not results:
return SKIPPED
return GroupedResult(results)
def __init__(self, results):
self.results = results
@property
def is_grouped(self):
return True
class SkippedResult(ResultBase):
"""Result without any meaningful value. Used primarily to inform the test
processor that it's test wasn't executed.
"""
@property
def is_skipped(self):
return True
SKIPPED = SkippedResult()

View File

@ -3,8 +3,8 @@
# found in the LICENSE file.
from . import base
from ..local.variants import ALL_VARIANTS, ALL_VARIANT_FLAGS, FAST_VARIANT_FLAGS
from .result import GroupedResult
FAST_VARIANTS = set(["default", "turbofan"])
@ -26,33 +26,31 @@ class VariantProc(base.TestProcProducer):
def __init__(self, variants):
super(VariantProc, self).__init__('VariantProc')
self._next_test_iter = {}
self._test_data = {} # procid: (generator, results)
self._variant_gens = {}
self._variants = variants
def _next_test(self, test):
self._next_test_iter[test.procid] = iter(self._variants_gen(test))
return self._try_send_new_subtest(test)
test_data = gen, results = self._variants_gen(test), []
self._test_data[test.procid] = test_data
self._try_send_new_subtest(test, gen, results)
def _result_for(self, test, subtest, result, is_last):
if not is_last:
self._send_result(subtest, result, is_last=False)
return
def _result_for(self, test, subtest, result):
gen, results = self._test_data[test.procid]
results.append((subtest, result))
self._try_send_new_subtest(test, gen, results)
has_sent = self._try_send_new_subtest(test)
self._send_result(subtest, result, is_last=not has_sent)
def _try_send_new_subtest(self, test):
# Keep trying until variant is not ignored by the next processors or there
# are no more variants to generate.
for variant, flags, suffix in self._next_test_iter[test.procid]:
def _try_send_new_subtest(self, test, variants_gen, results):
for variant, flags, suffix in variants_gen:
subtest = self._create_subtest(test, '%s-%s' % (variant, suffix),
variant=variant, flags=flags)
if self._send_test(subtest):
return True
self._send_test(subtest)
return
del self._next_test_iter[test.procid]
return False
del self._test_data[test.procid]
# TODO(majeski): Don't group tests if previous processors don't need them.
result = GroupedResult.create(results)
self._send_result(test, result)
def _variants_gen(self, test):
"""Generator producing (variant, flags, procid suffix) tuples."""