From ae2ef7d234851245f17daad206fcafb5a912c3b4 Mon Sep 17 00:00:00 2001 From: Michael Achenbach Date: Tue, 13 Sep 2022 22:03:13 +0200 Subject: [PATCH] [test] Drain queues asynchroneously when terminating workers Joining a queue-using process can deadlock if the child process is about to write to the queue, but the parent process wants to join the child. To fix this, we now drain elements from a separate thread of the main process. Bug: v8:13113 Change-Id: Ic279e66ab84eb89a4034ff1f2c025eb850b65013 Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3891116 Commit-Queue: Michael Achenbach Reviewed-by: Alexander Schulze Cr-Commit-Position: refs/heads/main@{#83177} --- tools/testrunner/local/pool.py | 64 +++++++++++++++-------------- tools/testrunner/local/pool_test.py | 16 +++++++- 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/tools/testrunner/local/pool.py b/tools/testrunner/local/pool.py index bf04e1f055..527070cac4 100644 --- a/tools/testrunner/local/pool.py +++ b/tools/testrunner/local/pool.py @@ -7,6 +7,7 @@ import collections import logging import os import signal +import threading import traceback from contextlib import contextmanager @@ -25,8 +26,6 @@ def setup_testing(): del Process from queue import Queue from threading import Thread as Process - # Monkeypatch threading Queue to look like multiprocessing Queue. - Queue.cancel_join_thread = lambda self: None # Monkeypatch os.kill and add fake pid property on Thread. os.kill = lambda *args: None Process.pid = property(lambda self: None) @@ -108,6 +107,36 @@ def without_sig(): signal.signal(signal.SIGTERM, term_handler) +@contextmanager +def drain_queue_async(queue): + """Drains a queue in a background thread until the wrapped code unblocks. + + This can be used to unblock joining a child process that might still write + to the queue. The join should be wrapped by this context manager. + """ + keep_running = True + + def empty_queue(): + elem_count = 0 + while keep_running: + try: + while True: + queue.get(True, 0.1) + elem_count += 1 + if elem_count < 200: + logging.info('Drained an element from queue.') + except Empty: + pass + except: + logging.exception('Error draining queue.') + + emptier = threading.Thread(target=empty_queue) + emptier.start() + yield + keep_running = False + emptier.join() + + class ContextPool(): def __init__(self): @@ -325,35 +354,10 @@ class DefaultExecutionPool(ContextPool): self._terminate_processes() self.notify("Joining workers") - for p in self.processes: - p.join() + with drain_queue_async(self.done_queue): + for p in self.processes: + p.join() - # Drain the queues to prevent stderr chatter when queues are garbage - # collected. - self.notify("Draining queues") - # TODO(https://crbug.com/v8/13113): Remove extra logging after - # investigation. - elem_count = 0 - try: - while True: - self.work_queue.get(False) - elem_count += 1 - if elem_count < 200: - logging.info('Drained an element from work queue.') - except Empty: - pass - except: - logging.exception('Error draining work queue.') - try: - while True: - self.done_queue.get(False) - elem_count += 1 - if elem_count < 200: - logging.info('Drained an element from done queue.') - except Empty: - pass - except: - logging.exception('Error draining done queue.') self.notify("Pool terminated") def _get_result_from_queue(self): diff --git a/tools/testrunner/local/pool_test.py b/tools/testrunner/local/pool_test.py index acd597ee6c..e023ae188c 100755 --- a/tools/testrunner/local/pool_test.py +++ b/tools/testrunner/local/pool_test.py @@ -7,12 +7,14 @@ import os import sys import unittest +from queue import Empty, Full, Queue + # Needed because the test runner contains relative imports. TOOLS_PATH = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(TOOLS_PATH) -from testrunner.local.pool import DefaultExecutionPool +from testrunner.local.pool import DefaultExecutionPool, drain_queue_async def Run(x): @@ -64,5 +66,17 @@ class PoolTest(unittest.TestCase): set(range(0, 10)) | set(range(20, 30)) | set(range(40, 50)), results) +class QueueTest(unittest.TestCase): + def testDrainQueueAsync(self): + queue = Queue(1) + queue.put('foo') + with self.assertRaises(Full): + queue.put('bar', timeout=0.01) + with drain_queue_async(queue): + queue.put('bar') + with self.assertRaises(Empty): + queue.get(False) + + if __name__ == '__main__': unittest.main()