Use headless mutter for monitor tests

Add some monitor tests that are using headless
mutter, and python with our in-tree gir files.

So far, we test that we get expected signals
when monitors are added and removed.
This commit is contained in:
Matthias Clasen 2021-03-20 16:23:36 -04:00
parent 3c218687e3
commit 771df47820
2 changed files with 213 additions and 0 deletions

View File

@ -0,0 +1,187 @@
import sys
import subprocess
import gi
gi.require_version('Gdk', '4.0')
from gi.repository import GLib, Gdk
from pydbus import SessionBus
verbose = True
screen_cast = None
monitors = {}
waiting = False
done = False
monitor_model = None
def terminate():
for key in monitors:
monitor = monitors[key];
pipeline = monitor['pipeline'];
pipeline.terminate()
sys.exit(1)
def stream_added_closure(name):
def stream_added(node_id):
monitor = monitors[name];
freq = monitor['freq'];
width = monitor['width'];
height = monitor['height'];
# FIXME scale = monitor['scale'];
# Use gstreamer out-of-process, since the gst gl support gets
# itself into a twist with its wayland connection when monitors
# disappear
pipeline_desc = f'gst-launch-1.0 pipewiresrc path={node_id} ! video/x-raw,max-framerate={freq}/1,width={width},height={height} ! videoconvert ! glimagesink'
if verbose:
print(f'launching {pipeline_desc}')
monitor['pipeline'] = subprocess.Popen([pipeline_desc], shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return stream_added
def add_monitor(name, width, height, scale, freq):
if verbose:
print(f'add monitor {name}: {width}x{height}, scale {scale}, frequency {freq}')
session_path = screen_cast.CreateSession({})
session = bus.get('org.gnome.Mutter.ScreenCast', session_path)
monitors[name] = {
"session": session,
"width": width,
"height": height,
"scale": scale,
"freq": freq
}
stream_path = session.RecordVirtual({})
stream = bus.get('org.gnome.Mutter.ScreenCast', stream_path)
stream.onPipeWireStreamAdded = stream_added_closure(name)
session.Start()
def remove_monitor(name):
if verbose:
print(f'remove monitor {name}')
try:
monitor = monitors[name];
pipeline = monitor['pipeline']
pipeline.kill()
session = monitor['session']
session.Stop()
except KeyError:
print("failed to remove monitor")
monitors[name] = None
expected_change = None
loop = None
def quit_cb(loop):
loop.quit()
def wait(millis):
global loop
loop = GLib.MainLoop()
GLib.timeout_add(millis, quit_cb, loop)
loop.run()
def monitors_changed(monitors, position, removed, added):
global expected_change
assert expected_change != None, "No change expected"
assert position == expected_change['position'], "Unexpected position in monitors-changed"
assert removed == expected_change['removed'], "Unexpected removed in monitors-changed"
assert added == expected_change['added'], "Unexpected added in monitors-changed"
if verbose:
print('got expected change')
expected_change = None
loop.quit()
def launch_observer():
global monitor_model
if verbose:
print('launch observer')
Gdk.set_allowed_backends('wayland')
display = Gdk.Display.open('gtk-test')
monitor_model = display.get_monitors()
assert monitor_model.get_n_items() == 0, "Unexpected initial monitors"
monitor_model.connect('items-changed', monitors_changed)
def expect_monitors_changed(position, removed, added, timeout):
global expected_change
expected_change = {
'position' : position,
'removed' : removed,
'added' : added
}
wait(timeout)
assert expected_change == None, "Expected change did not happen"
def got_connector(monitor, pspec):
loop.quit()
def expect_monitor(position, width, height, scale, freq):
assert monitor_model.get_n_items() > position, f'Monitor {position} not present'
monitor = monitor_model.get_item(position)
if monitor.get_connector() == None:
handler = monitor.connect('notify::connector', got_connector)
wait(500)
monitor.disconnect(handler)
assert monitor.is_valid(), "Monitor is not valid"
geometry = monitor.get_geometry()
assert geometry.width == width, "Unexpected monitor width"
assert geometry.height == height, "Unexpected monitor height"
assert monitor.get_scale_factor() == scale, "Unexpected scale factor"
assert monitor.get_refresh_rate() == freq, "Unexpected monitor frequency"
if verbose:
print(f'monitor {position}: {geometry.width}x{geometry.height} frequency {monitor.get_refresh_rate()} scale {monitor.get_scale_factor()} model \'{monitor.get_model()}\' connector \'{monitor.get_connector()}\'')
def run_commands():
try:
launch_observer()
add_monitor("0", width=100, height=100, scale=1, freq=60)
expect_monitors_changed(0, 0, 1, 1000)
expect_monitor (position=0, width=100, height=100, scale=1, freq=60000)
add_monitor("1", width=1024, height=768, scale=1, freq=144)
expect_monitors_changed(1, 0, 1, 1000)
expect_monitor (position=1, width=1024, height=768, scale=1, freq=144000)
remove_monitor("0")
expect_monitors_changed(0, 1, 0, 11000) # mutter takes 10 seconds to remove it
remove_monitor("1")
expect_monitors_changed(0, 1, 0, 11000)
except AssertionError as e:
print("Error: {0}".format(e))
terminate()
def mutter_appeared(name):
global screen_cast
global done
if verbose:
print("mutter appeared on the bus")
screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
'/org/gnome/Mutter/ScreenCast')
run_commands()
done = True
def mutter_vanished():
global done
if screen_cast != None:
if verbose:
print("mutter left the bus")
done = True
bus = SessionBus()
bus.watch_name('org.gnome.Mutter.ScreenCast', 0, mutter_appeared, mutter_vanished)
try:
while not done:
GLib.MainContext.default().iteration(True)
except KeyboardInterrupt:
print('Interrupted')

View File

@ -0,0 +1,26 @@
#! /bin/sh
builddir=$(pwd)/build
dbus-run-session sh <<EOF
# echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
# echo WAYLAND_DISPLAY=gtk-test
mutter --headless --no-x11 --wayland-display gtk-test >&mutter.log &
pid=\$!
export WAYLAND_DISPLAY=gtk-test
export GDK_BACKEND=wayland
export GI_TYPELIB_PATH=$builddir/gtk:/usr/lib64/girepository-1.0
export LD_PRELOAD=$builddir/gtk/libgtk-4.so
python tests/headless-monitor-tests.py
status=\$?
kill \$pid
exit \$status
EOF