[test] Fix occasional hangs on pool termination

On termination of the worker pool in the main process, a SIGTERM is
sent from pool to worker. It was meant to terminate long-running
tests in the worker process. The signal handler on the worker side,
however, was only registered during test execution. During the
remaining logic (<1% of the time probably) the default system
behavior for SIGTERM would be used (which will likely just kill
the process). The ungracefully killed process might be killed while
writing to the results queue, which then remains with corrupted data.
Later when the main process cleans up the queue, it hangs.

We now register a default handler in the worker process that catches
the SIGTERM and also gracefully stops the processing loop. Like
that, the SIGTERM signal will always be handled in workers and never
fall back to SIGKILL.

However, a small time window exists when the SIGTERM was caught
right when starting a test process, but when the test-abort handler
was not registered yet. We keep fixing this as a TODO. Worst case,
the main process will block until the last test run is done.

Bug: v8:13113
Change-Id: Ib60f82c6a1569da042c9f44f7b516e2f40a46f93
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3882972
Reviewed-by: Alexander Schulze <alexschulze@chromium.org>
Commit-Queue: Michael Achenbach <machenbach@chromium.org>
Cr-Commit-Position: refs/heads/main@{#83101}
This commit is contained in:
Michael Achenbach 2022-09-08 15:43:40 +02:00 committed by V8 LUCI CQ
parent 4420804037
commit cd1ee28be8
2 changed files with 32 additions and 4 deletions

View File

@ -43,11 +43,22 @@ def handle_sigterm(process, abort_fun, enabled):
"""
# Variable to communicate with the signal handler.
abort_occured = [False]
def handler(signum, frame):
abort_fun(process, abort_occured)
if enabled:
previous = signal.signal(signal.SIGTERM, handler)
# TODO(https://crbug.com/v8/13113): There is a race condition on
# signal handler registration. In rare cases, the SIGTERM for stopping
# a worker might be caught right after a long running process has been
# started (or logic that starts it isn't interrupted), but before the
# registration of the abort_fun. In this case, process.communicate will
# block until the process is done.
previous = signal.getsignal(signal.SIGTERM)
def handler(signum, frame):
abort_fun(process, abort_occured)
if previous and callable(previous):
# Call default signal handler. If this command is called from a worker
# process, its signal handler will gracefully stop processing.
previous(signum, frame)
signal.signal(signal.SIGTERM, handler)
try:
yield
finally:

View File

@ -66,15 +66,29 @@ def Worker(fn, work_queue, done_queue,
"""Worker to be run in a child process.
The worker stops when the poison pill "STOP" is reached.
"""
# Install a default signal handler for SIGTERM that stops the processing
# loop below on the next occasion. The job function "fn" is supposed to
# register their own handler to avoid blocking, but still chain to this
# handler on SIGTERM to terminate the loop quickly.
stop = [False]
def handler(signum, frame):
stop[0] = True
signal.signal(signal.SIGTERM, handler)
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
if stop[0]:
# SIGINT, SIGTERM or internal hard timeout caught outside the execution
# of "fn".
break
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
except AbortException:
# SIGINT, SIGTERM or internal hard timeout.
# SIGINT, SIGTERM or internal hard timeout caught during execution of
# "fn".
break
except Exception as e:
logging.exception('Unhandled error during worker execution.')
@ -309,6 +323,9 @@ class DefaultExecutionPool(ContextPool):
# per worker to make them stop.
self.work_queue.put("STOP")
# Send a SIGTERM to all workers. They will gracefully terminate their
# processing loop and if the signal is caught during job execution they
# will try to terminate the ongoing test processes quickly.
if self.abort_now:
self._terminate_processes()