v8/tools/testrunner/local/execution.py
machenbach 780077f6d1 [test-runner] Move test case processing beyond the multi-process boundary.
This will allow moving the test outcome check beyond the
multi-process boundary in a follow up. It'll allow wrapping
more complex test jobs like predicable mode on the multi-
process side, which will make the code easier to maintain.

BUG=

Review URL: https://codereview.chromium.org/1469833002

Cr-Commit-Position: refs/heads/master@{#32373}
2015-11-27 12:52:25 +00:00

374 lines
13 KiB
Python

# Copyright 2012 the V8 project authors. All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections
import os
import shutil
import sys
import time
from pool import Pool
from . import commands
from . import perfdata
from . import statusfile
from . import testsuite
from . import utils
# Base dir of the v8 checkout.
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))))
TEST_DIR = os.path.join(BASE_DIR, "test")
class Instructions(object):
def __init__(self, command, dep_command, test_id, timeout, verbose):
self.command = command
self.dep_command = dep_command
self.id = test_id
self.timeout = timeout
self.verbose = verbose
# Structure that keeps global information per worker process.
ProcessContext = collections.namedtuple(
"process_context", ["suites", "context"])
def MakeProcessContext(context):
"""Generate a process-local context.
This reloads all suites per process and stores the global context.
Args:
context: The global context from the test runner.
"""
suite_paths = utils.GetSuitePaths(TEST_DIR)
suites = {}
for root in suite_paths:
# Don't reinitialize global state as this is concurrently called from
# different processes.
suite = testsuite.TestSuite.LoadTestSuite(
os.path.join(TEST_DIR, root), global_init=False)
if suite:
suites[suite.name] = suite
return ProcessContext(suites, context)
def GetCommand(test, context):
d8testflag = []
shell = test.suite.shell()
if shell == "d8":
d8testflag = ["--test"]
if utils.IsWindows():
shell += ".exe"
if context.random_seed:
d8testflag += ["--random-seed=%s" % context.random_seed]
cmd = (context.command_prefix +
[os.path.abspath(os.path.join(context.shell_dir, shell))] +
d8testflag +
test.suite.GetFlagsForTestCase(test, context) +
context.extra_flags)
return cmd
def _GetInstructions(test, context):
command = GetCommand(test, context)
timeout = context.timeout
if ("--stress-opt" in test.flags or
"--stress-opt" in context.mode_flags or
"--stress-opt" in context.extra_flags):
timeout *= 4
if "--noenable-vfp3" in context.extra_flags:
timeout *= 2
# FIXME(machenbach): Make this more OO. Don't expose default outcomes or
# the like.
if statusfile.IsSlow(test.outcomes or [statusfile.PASS]):
timeout *= 2
if test.dependency is not None:
dep_command = [ c.replace(test.path, test.dependency) for c in command ]
else:
dep_command = None
return Instructions(
command, dep_command, test.id, timeout, context.verbose)
class Job(object):
"""Stores data to be sent over the multi-process boundary.
All contained fields will be pickled/unpickled.
"""
def Run(self, process_context):
"""Executes the job.
Args:
process_context: Process-local information that is initialized by the
executing worker.
"""
raise NotImplementedError()
class TestJob(Job):
def __init__(self, test):
self.test = test
def Run(self, process_context):
# Retrieve a new suite object on the worker-process side. The original
# suite object isn't pickled.
self.test.SetSuiteObject(process_context.suites)
instr = _GetInstructions(self.test, process_context.context)
start_time = time.time()
if instr.dep_command is not None:
dep_output = commands.Execute(
instr.dep_command, instr.verbose, instr.timeout)
# TODO(jkummerow): We approximate the test suite specific function
# IsFailureOutput() by just checking the exit code here. Currently
# only cctests define dependencies, for which this simplification is
# correct.
if dep_output.exit_code != 0:
return (instr.id, dep_output, time.time() - start_time)
output = commands.Execute(instr.command, instr.verbose, instr.timeout)
return (instr.id, output, time.time() - start_time)
def RunTest(job, process_context):
return job.Run(process_context)
class Runner(object):
def __init__(self, suites, progress_indicator, context):
self.datapath = os.path.join("out", "testrunner_data")
self.perf_data_manager = perfdata.GetPerfDataManager(
context, self.datapath)
self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
self.perf_failures = False
self.printed_allocations = False
self.tests = [ t for s in suites for t in s.tests ]
if not context.no_sorting:
for t in self.tests:
t.duration = self.perfdata.FetchPerfData(t) or 1.0
slow_key = lambda t: statusfile.IsSlow(t.outcomes)
self.tests.sort(key=slow_key, reverse=True)
self.tests.sort(key=lambda t: t.duration, reverse=True)
self._CommonInit(suites, progress_indicator, context)
def _CommonInit(self, suites, progress_indicator, context):
self.total = 0
for s in suites:
for t in s.tests:
t.id = self.total
self.total += 1
self.indicator = progress_indicator
progress_indicator.SetRunner(self)
self.context = context
self.succeeded = 0
self.remaining = self.total
self.failed = []
self.crashed = 0
self.reran_tests = 0
def _RunPerfSafe(self, fun):
try:
fun()
except Exception, e:
print("PerfData exception: %s" % e)
self.perf_failures = True
def _MaybeRerun(self, pool, test):
if test.run <= self.context.rerun_failures_count:
# Possibly rerun this test if its run count is below the maximum per
# test. <= as the flag controls reruns not including the first run.
if test.run == 1:
# Count the overall number of reran tests on the first rerun.
if self.reran_tests < self.context.rerun_failures_max:
self.reran_tests += 1
else:
# Don't rerun this if the overall number of rerun tests has been
# reached.
return
if test.run >= 2 and test.duration > self.context.timeout / 20.0:
# Rerun slow tests at most once.
return
# Rerun this test.
test.duration = None
test.output = None
test.run += 1
pool.add([TestJob(test)])
self.remaining += 1
self.total += 1
def _ProcessTestNormal(self, test, result, pool):
self.indicator.AboutToRun(test)
test.output = result[1]
test.duration = result[2]
has_unexpected_output = test.suite.HasUnexpectedOutput(test)
if has_unexpected_output:
self.failed.append(test)
if test.output.HasCrashed():
self.crashed += 1
else:
self.succeeded += 1
self.remaining -= 1
# For the indicator, everything that happens after the first run is treated
# as unexpected even if it flakily passes in order to include it in the
# output.
self.indicator.HasRun(test, has_unexpected_output or test.run > 1)
if has_unexpected_output:
# Rerun test failures after the indicator has processed the results.
self._VerbosePrint("Attempting to rerun test after failure.")
self._MaybeRerun(pool, test)
# Update the perf database if the test succeeded.
return not has_unexpected_output
def _ProcessTestPredictable(self, test, result, pool):
def HasDifferentAllocations(output1, output2):
def AllocationStr(stdout):
for line in reversed((stdout or "").splitlines()):
if line.startswith("### Allocations = "):
self.printed_allocations = True
return line
return ""
return (AllocationStr(output1.stdout) != AllocationStr(output2.stdout))
# Always pass the test duration for the database update.
test.duration = result[2]
if test.run == 1 and result[1].HasTimedOut():
# If we get a timeout in the first run, we are already in an
# unpredictable state. Just report it as a failure and don't rerun.
self.indicator.AboutToRun(test)
test.output = result[1]
self.remaining -= 1
self.failed.append(test)
self.indicator.HasRun(test, True)
if test.run > 1 and HasDifferentAllocations(test.output, result[1]):
# From the second run on, check for different allocations. If a
# difference is found, call the indicator twice to report both tests.
# All runs of each test are counted as one for the statistic.
self.indicator.AboutToRun(test)
self.remaining -= 1
self.failed.append(test)
self.indicator.HasRun(test, True)
self.indicator.AboutToRun(test)
test.output = result[1]
self.indicator.HasRun(test, True)
elif test.run >= 3:
# No difference on the third run -> report a success.
self.indicator.AboutToRun(test)
self.remaining -= 1
self.succeeded += 1
test.output = result[1]
self.indicator.HasRun(test, False)
else:
# No difference yet and less than three runs -> add another run and
# remember the output for comparison.
test.run += 1
test.output = result[1]
pool.add([TestJob(test)])
# Always update the perf database.
return True
def Run(self, jobs):
self.indicator.Starting()
self._RunInternal(jobs)
self.indicator.Done()
if self.failed:
return 1
elif self.remaining:
return 2
return 0
def _RunInternal(self, jobs):
pool = Pool(jobs)
test_map = {}
queued_exception = [None]
def gen_tests():
for test in self.tests:
assert test.id >= 0
test_map[test.id] = test
try:
yield [TestJob(test)]
except Exception, e:
# If this failed, save the exception and re-raise it later (after
# all other tests have had a chance to run).
queued_exception[0] = e
continue
try:
it = pool.imap_unordered(
fn=RunTest,
gen=gen_tests(),
process_context_fn=MakeProcessContext,
process_context_args=[self.context],
)
for result in it:
if result.heartbeat:
self.indicator.Heartbeat()
continue
test = test_map[result.value[0]]
if self.context.predictable:
update_perf = self._ProcessTestPredictable(test, result.value, pool)
else:
update_perf = self._ProcessTestNormal(test, result.value, pool)
if update_perf:
self._RunPerfSafe(lambda: self.perfdata.UpdatePerfData(test))
finally:
self._VerbosePrint("Closing process pool.")
pool.terminate()
self._VerbosePrint("Closing database connection.")
self._RunPerfSafe(lambda: self.perf_data_manager.close())
if self.perf_failures:
# Nuke perf data in case of failures. This might not work on windows as
# some files might still be open.
print "Deleting perf test data due to db corruption."
shutil.rmtree(self.datapath)
if queued_exception[0]:
raise queued_exception[0]
# Make sure that any allocations were printed in predictable mode (if we
# ran any tests).
assert (
not self.total or
not self.context.predictable or
self.printed_allocations
)
def _VerbosePrint(self, text):
if self.context.verbose:
print text
sys.stdout.flush()
class BreakNowException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)