Implement new features in qt-wasmtestrunner

The script is now able to:
- kill a test that is not responding after certain timeout
- multicast output to files/streams (tee-like, but not only on posix)
- forward requested format (xml/txt/etc.) to the test executable
- run a batched test from a batch test package
- interop with the js batched test runner

Change-Id: Ia189d78a078e11b9efd25865c5a0ddc6a62d9b85
Reviewed-by: David Skoland <david.skoland@qt.io>
This commit is contained in:
Mikolaj Boc 2022-08-25 15:35:34 +02:00
parent 11c19ac6e2
commit 3c9a12934e

View File

@ -2,20 +2,82 @@
# Copyright (C) 2021 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
import argparse
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.ui import WebDriverWait
from selenium import webdriver
from pathlib import Path
import typing
import http.server
import subprocess
import threading
import psutil
import re
import os
import sys
import time
import atexit
import threading
import subprocess
import http.server
from pathlib import Path
from signal import SIGINT
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.chrome.service import Service
import argparse
import sys
class StdoutOutputSink(object):
def __init__(self):
pass
def write(self, data: str):
print(data)
def __enter__(self):
pass
def __exit__(self, _, __, ___):
pass
class FileOutputSink(object):
def __init__(self, filename: str):
self.__filename = filename
self.__file = None
def write(self, data: str):
self.__file.write(data)
def __enter__(self):
self.__file = open(self.__filename, 'w')
def __exit__(self, _, __, ___):
self.__file.close()
class OutputMulticast(object):
def __init__(self, destinations: typing.List[str]):
self.__sinks: typing.List[typing.Union[StdoutOutputSink, FileOutputSink]] = [
]
self.__destinations = [
'stdout'] if destinations is None else destinations
number_of_stdout_sinks = sum(
[1 if destination == 'stdout' else 0 for destination in self.__destinations])
if number_of_stdout_sinks > 1:
raise Exception('Maximum allowed number of stdout sinks is 1')
def write(self, data: str):
for sink in self.__sinks:
sink.write(data)
def _makeSink(self, destination: str):
return StdoutOutputSink() if 'stdout' == destination else FileOutputSink(destination)
def __enter__(self):
for destination in self.__destinations:
sink = self._makeSink(destination)
sink.__enter__()
self.__sinks.append(sink)
return self
def __exit__(self, _, __, ___):
for sink in reversed(self.__sinks):
sink.__exit__(_, __, ___)
class WasmTestRunner:
@ -27,7 +89,6 @@ class WasmTestRunner:
self.host = 'localhost'
self.webserver = None
self.webthread = None
self.exit_code = 0
paths = ['html_path', 'browser_path', 'chromedriver_path', 'tmp_dir']
@ -47,20 +108,18 @@ class WasmTestRunner:
if hasattr(self, 'browser_path') and not self.browser_path.exists():
raise FileNotFoundError(self.browser_path)
atexit.register(self.cleanup)
def run(self):
# self.run_webserver()
self.run_threaded_webserver()
if self.use_browser:
self.run_wasm_browser()
else:
self.run_wasm_webdriver()
self.shutdown_threaded_webserver()
return self.exit_code
with OutputMulticast(
self.output if hasattr(self, 'output') else ['stdout']) as output_multicast:
try:
if self.use_browser:
return self.run_wasm_browser()
else:
return self.run_wasm_webdriver(output_multicast)
finally:
self.cleanup()
def run_webserver(self):
webroot = self.html_path.parent.resolve()
@ -85,35 +144,40 @@ class WasmTestRunner:
if self.webthread is not None:
self.webthread.join()
def run_wasm_webdriver(self):
def run_wasm_webdriver(self, output_multicast: OutputMulticast):
url = f'http://localhost:{self.port}/{self.html_path.name}'
if (self.batched_test is not None):
url = f'{url}?qtestname={self.batched_test}&qtestoutputformat={self.format}'
d = DesiredCapabilities.CHROME
d['goog:loggingPrefs'] = {'browser': 'ALL'}
ser = Service(executable_path=self.chromedriver_path)
driver = webdriver.Chrome(desired_capabilities=d, service=ser)
driver.get(url)
driver.execute_script(
""" const status = qtTestRunner.status;
const onFinished = status => {
if (status === 'Completed' || status === 'Error')
document.title = 'qtFinished';
};
onFinished(status);
qtTestRunner.onStatusChanged.addEventListener(onFinished);
""")
app_state = ''
WebDriverWait(driver, self.timeout).until(
expected_conditions.title_is('qtFinished'))
while app_state != 'Exited':
# HACK: Optimally, we would want the program to report back to us
# when it changes state and prints logs
# Unfortunately, that's rather difficult, so we resort to polling it
# at a given interval instead, which is adjustable
time.sleep(1)
app_state = self.get_loader_variable(driver, 'status')
for entry in driver.get_log('browser'):
regex = re.compile(r'[^"]*"(.*)".*')
match = regex.match(entry['message'])
if match is not None:
console_line = match.group(1)
print(console_line)
if self.get_loader_variable(driver, 'crashed'):
self.exit_code = 1
runner_status = driver.execute_script(f"return qtTestRunner.status")
if runner_status == 'Error':
output_multicast.write(driver.execute_script(
"return qtTestRunner.errorDetails"))
return -1
else:
assert runner_status == 'Completed'
output_multicast.write(driver.execute_script(
f"return qtTestRunner.results.get('{self.batched_test}').textOutput"))
return driver.execute_script(
f"return qtTestRunner.results.get('{self.batched_test}').exitCode")
def run_wasm_browser(self):
if not hasattr(self, 'browser_path'):
@ -182,8 +246,10 @@ class WasmTestRunner:
# See https://web.dev/cross-origin-isolation-guide/
def end_headers(self):
self.send_header("Cross-Origin-Opener-Policy", "same-origin")
self.send_header("Cross-Origin-Embedder-Policy", "require-corp")
self.send_header("Cross-Origin-Resource-Policy", "cross-origin")
self.send_header(
"Cross-Origin-Embedder-Policy", "require-corp")
self.send_header(
"Cross-Origin-Resource-Policy", "cross-origin")
http.server.SimpleHTTPRequestHandler.end_headers(self)
# We usually don't care that much about what the webserver is logging
@ -200,21 +266,65 @@ class WasmTestRunner:
self.shutdown_threaded_webserver()
class BackendProcess:
def __init__(self) -> None:
self.__process = subprocess.Popen(
[sys.executable, *sys.argv, '--backend'], shell=False, stdout=subprocess.PIPE)
def abort(self):
current_process = psutil.Process(self.__process.pid)
children = current_process.children(recursive=True)
for child in [*children, current_process]:
os.kill(child.pid, SIGINT)
def communicate(self, timeout):
return self.__process.communicate(timeout)[0].decode('utf-8')
def returncode(self):
return self.__process.returncode
def main():
parser = argparse.ArgumentParser(description='WASM testrunner')
parser.add_argument('html_path', help='Path to the HTML file to request')
parser.add_argument('--port', help='Port to run the webserver on', default='8000')
parser.add_argument(
'--batched_test', help='Specifies a batched test to run')
parser.add_argument('--timeout', help='Test timeout',
type=int, default=120)
parser.add_argument(
'--port', help='Port to run the webserver on', default='8000')
parser.add_argument('--use_browser', action='store_true')
parser.add_argument('--browser_path', help='Path to the browser to use')
parser.add_argument('--chromedriver_path', help='Absolute path to chromedriver',
default='chromedriver')
parser.add_argument('--tmp_dir', help='Path to the tmpdir to use when using a browser',
default='/tmp/wasm-testrunner')
parser.add_argument(
'-o', help='filename. Filename may be "stdout" to write to stdout.',
action='append', dest='output')
parser.add_argument(
'--format', help='Output format', choices=['txt', 'xml', 'lightxml', 'junitxml', 'tap'],
default='txt')
parser.add_argument(
'--backend', help='Run as a backend process. There are two types of test runner processes - '
'the main monitoring process and the backend processes launched by it. The tests are '
'run on the backend to avoid any undesired behavior, like deadlocks in browser main process, '
'spilling over across test cases.',
action='store_true')
args = vars(parser.parse_args())
args = parser.parse_args()
if not args.backend:
backend_process = BackendProcess()
try:
stdout = backend_process.communicate(args.timeout)
print(stdout)
return backend_process.returncode()
except Exception as e:
print(f"Exception while executing test {e}")
backend_process.abort()
return -1
test_runner = WasmTestRunner(args)
return test_runner.run()
return WasmTestRunner(vars(args)).run()
if __name__ == '__main__':