[test] Drain queues asynchroneously when terminating workers

Joining a queue-using process can deadlock if the child process is
about to write to the queue, but the parent process wants to join the
child. To fix this, we now drain elements from a separate thread of
the main process.

Bug: v8:13113
Change-Id: Ic279e66ab84eb89a4034ff1f2c025eb850b65013
Reviewed-on: https://chromium-review.googlesource.com/c/v8/v8/+/3891116
Commit-Queue: Michael Achenbach <machenbach@chromium.org>
Reviewed-by: Alexander Schulze <alexschulze@chromium.org>
Cr-Commit-Position: refs/heads/main@{#83177}
This commit is contained in:
Michael Achenbach 2022-09-13 22:03:13 +02:00 committed by V8 LUCI CQ
parent 2124146565
commit ae2ef7d234
2 changed files with 49 additions and 31 deletions

View File

@ -7,6 +7,7 @@ import collections
import logging
import os
import signal
import threading
import traceback
from contextlib import contextmanager
@ -25,8 +26,6 @@ def setup_testing():
del Process
from queue import Queue
from threading import Thread as Process
# Monkeypatch threading Queue to look like multiprocessing Queue.
Queue.cancel_join_thread = lambda self: None
# Monkeypatch os.kill and add fake pid property on Thread.
os.kill = lambda *args: None
Process.pid = property(lambda self: None)
@ -108,6 +107,36 @@ def without_sig():
signal.signal(signal.SIGTERM, term_handler)
@contextmanager
def drain_queue_async(queue):
"""Drains a queue in a background thread until the wrapped code unblocks.
This can be used to unblock joining a child process that might still write
to the queue. The join should be wrapped by this context manager.
"""
keep_running = True
def empty_queue():
elem_count = 0
while keep_running:
try:
while True:
queue.get(True, 0.1)
elem_count += 1
if elem_count < 200:
logging.info('Drained an element from queue.')
except Empty:
pass
except:
logging.exception('Error draining queue.')
emptier = threading.Thread(target=empty_queue)
emptier.start()
yield
keep_running = False
emptier.join()
class ContextPool():
def __init__(self):
@ -325,35 +354,10 @@ class DefaultExecutionPool(ContextPool):
self._terminate_processes()
self.notify("Joining workers")
for p in self.processes:
p.join()
with drain_queue_async(self.done_queue):
for p in self.processes:
p.join()
# Drain the queues to prevent stderr chatter when queues are garbage
# collected.
self.notify("Draining queues")
# TODO(https://crbug.com/v8/13113): Remove extra logging after
# investigation.
elem_count = 0
try:
while True:
self.work_queue.get(False)
elem_count += 1
if elem_count < 200:
logging.info('Drained an element from work queue.')
except Empty:
pass
except:
logging.exception('Error draining work queue.')
try:
while True:
self.done_queue.get(False)
elem_count += 1
if elem_count < 200:
logging.info('Drained an element from done queue.')
except Empty:
pass
except:
logging.exception('Error draining done queue.')
self.notify("Pool terminated")
def _get_result_from_queue(self):

View File

@ -7,12 +7,14 @@ import os
import sys
import unittest
from queue import Empty, Full, Queue
# Needed because the test runner contains relative imports.
TOOLS_PATH = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(TOOLS_PATH)
from testrunner.local.pool import DefaultExecutionPool
from testrunner.local.pool import DefaultExecutionPool, drain_queue_async
def Run(x):
@ -64,5 +66,17 @@ class PoolTest(unittest.TestCase):
set(range(0, 10)) | set(range(20, 30)) | set(range(40, 50)), results)
class QueueTest(unittest.TestCase):
def testDrainQueueAsync(self):
queue = Queue(1)
queue.put('foo')
with self.assertRaises(Full):
queue.put('bar', timeout=0.01)
with drain_queue_async(queue):
queue.put('bar')
with self.assertRaises(Empty):
queue.get(False)
if __name__ == '__main__':
unittest.main()