From 2be701bf5cdc24d0b4c7515eb673829eaf3fb15f Mon Sep 17 00:00:00 2001 From: bhack Date: Mon, 11 May 2026 01:19:32 +0200 Subject: [PATCH] Prepare Mini EQ 0.7.3 release --- .github/workflows/ci.yml | 12 +- CHANGELOG.md | 10 + data/io.github.bhack.mini-eq.metainfo.xml | 13 +- docs/development.md | 4 +- io.github.bhack.mini-eq.yaml | 4 +- pyproject.toml | 4 +- src/mini_eq/app.py | 101 ++++-- src/mini_eq/deps.py | 10 +- src/mini_eq/pipewire_backend.py | 219 ++++++++++-- src/mini_eq/pipewire_routes.py | 113 +++--- src/mini_eq/routing.py | 193 ++++++---- tests/test_mini_eq_app.py | 32 ++ tests/test_mini_eq_deps.py | 18 +- tests/test_mini_eq_pipewire_backend.py | 406 +++++++++++++++++++--- tests/test_mini_eq_routing.py | 126 ++++++- tools/check_live_ui_runtime.py | 6 +- tools/check_pipewire_gobject.py | 6 + 17 files changed, 1044 insertions(+), 233 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f85a282..b22800f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true +env: + PWG_VERSION: "0.3.6" + PWG_REQUIREMENT: "pipewire-gobject>=0.3.6,<0.4" + jobs: changes: runs-on: ubuntu-24.04 @@ -291,11 +295,11 @@ jobs: run: | python3 -m venv /tmp/mini-eq-pwg-build /tmp/mini-eq-pwg-build/bin/python -m pip install --upgrade pip - /tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.5,<0.4' -w /tmp/mini-eq-wheelhouse + /tmp/mini-eq-pwg-build/bin/python -m pip wheel "$PWG_REQUIREMENT" -w /tmp/mini-eq-wheelhouse python3 -m venv --system-site-packages .venv .venv/bin/python -m pip install --upgrade pip - .venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4' + .venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse "$PWG_REQUIREMENT" .venv/bin/python -m pip install -e '.[dev]' - name: Lint @@ -323,7 +327,7 @@ jobs: run: | python3 -m venv --system-site-packages /tmp/mini-eq-wheel-test /tmp/mini-eq-wheel-test/bin/python -m pip install --upgrade pip - /tmp/mini-eq-wheel-test/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4' + /tmp/mini-eq-wheel-test/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse "$PWG_REQUIREMENT" /tmp/mini-eq-wheel-test/bin/python -m pip install dist/mini_eq-*.whl /tmp/mini-eq-wheel-test/bin/mini-eq --help @@ -369,7 +373,7 @@ jobs: - name: Check pipewire-gobject GI compatibility run: | flatpak run --filesystem="$PWD":ro --command=python3 io.github.bhack.mini-eq \ - "$PWD/tools/check_pipewire_gobject.py" --expect-version 0.3.5 + "$PWD/tools/check_pipewire_gobject.py" --expect-version "$PWG_VERSION" - name: Smoke-test Flatpak runtime modules run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 97dc090..c7dbe99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.7.3 - 2026-05-11 + +- Require pipewire-gobject 0.3.6 and use its synchronous PipeWire registry, + metadata, node, and device APIs for startup and routing state. +- Replace Mini EQ's PipeWire startup and routing sleep loops with bounded sync + or registry-event waits for route params, virtual sink creation, and stream + routing. +- Avoid a headless startup hang when system-wide EQ fails synchronously before + the GLib main loop starts. + ## 0.7.2 - 2026-05-10 - Finish GNOME Shell startup notification after Mini EQ's delayed first-window diff --git a/data/io.github.bhack.mini-eq.metainfo.xml b/data/io.github.bhack.mini-eq.metainfo.xml index 28369ba..63e4254 100644 --- a/data/io.github.bhack.mini-eq.metainfo.xml +++ b/data/io.github.bhack.mini-eq.metainfo.xml @@ -33,11 +33,11 @@ - https://raw.githubusercontent.com/bhack/mini-eq/v0.7.2/docs/screenshots/mini-eq.png + https://raw.githubusercontent.com/bhack/mini-eq/v0.7.3/docs/screenshots/mini-eq.png Adjust sound output with equalizer controls - https://raw.githubusercontent.com/bhack/mini-eq/v0.7.2/docs/screenshots/mini-eq-dark.png + https://raw.githubusercontent.com/bhack/mini-eq/v0.7.3/docs/screenshots/mini-eq-dark.png Use the equalizer with dark style @@ -45,6 +45,15 @@ https://github.com/bhack/mini-eq/issues https://github.com/bhack/mini-eq + + +
    +
  • Require pipewire-gobject 0.3.6 and use its synchronous PipeWire registry, metadata, node, and device APIs for startup and routing state.
  • +
  • Replace Mini EQ's PipeWire startup and routing sleep loops with bounded sync or registry-event waits for route params, virtual sink creation, and stream routing.
  • +
  • Avoid a headless startup hang when system-wide EQ fails synchronously before the GLib main loop starts.
  • +
+
+
    diff --git a/docs/development.md b/docs/development.md index 1424b7a..59d40e4 100644 --- a/docs/development.md +++ b/docs/development.md @@ -84,11 +84,11 @@ Install the Python package after the system packages are present: ```bash python3 -m venv /tmp/mini-eq-pwg-build /tmp/mini-eq-pwg-build/bin/python -m pip install --upgrade pip -/tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.5,<0.4' -w /tmp/mini-eq-wheelhouse +/tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.6,<0.4' -w /tmp/mini-eq-wheelhouse python3 -m venv --system-site-packages ~/.local/share/mini-eq/venv ~/.local/share/mini-eq/venv/bin/python -m pip install --upgrade pip -~/.local/share/mini-eq/venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4' +~/.local/share/mini-eq/venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.6,<0.4' ~/.local/share/mini-eq/venv/bin/python -m pip install mini-eq ~/.local/share/mini-eq/venv/bin/mini-eq --check-deps ~/.local/share/mini-eq/venv/bin/mini-eq diff --git a/io.github.bhack.mini-eq.yaml b/io.github.bhack.mini-eq.yaml index 7076270..8bc527a 100644 --- a/io.github.bhack.mini-eq.yaml +++ b/io.github.bhack.mini-eq.yaml @@ -124,8 +124,8 @@ modules: sources: - type: git url: https://github.com/bhack/pipewire-gobject.git - tag: 0.3.5 - commit: b570bc4ff0a53223416fff9a4fc05d367273789d + tag: 0.3.6 + commit: 7e20b29865c0a869695bc730bacb68e2f0853077 - python3-dependencies.yaml diff --git a/pyproject.toml b/pyproject.toml index 467dbb4..1c6916c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mini-eq" -version = "0.7.2" +version = "0.7.3" description = "Compact PipeWire system-wide parametric equalizer for Linux desktops." readme = "README.md" requires-python = ">=3.11" @@ -40,7 +40,7 @@ classifiers = [ "Topic :: Multimedia :: Sound/Audio :: Analysis", "Topic :: Multimedia :: Sound/Audio :: Mixers", ] -dependencies = ["numpy>=1.26", "pipewire-gobject>=0.3.5,<0.4"] +dependencies = ["numpy>=1.26", "pipewire-gobject>=0.3.6,<0.4"] [project.urls] Homepage = "https://github.com/bhack/mini-eq" diff --git a/src/mini_eq/app.py b/src/mini_eq/app.py index b233eee..e6578cb 100644 --- a/src/mini_eq/app.py +++ b/src/mini_eq/app.py @@ -41,6 +41,9 @@ def __init__(self, args: Namespace) -> None: self.dbus_control: MiniEqDbusControl | None = None self.signal_source_ids: list[int] = [] self.window_present_source_id = 0 + self.window_starting = False + self.window_start_hold = False + self.pending_present_when_ready = False self.background_mode = load_background_mode() or bool(getattr(args, "background", False)) self.start_at_login = load_start_at_login() self.start_active_at_login = load_start_active_at_login() and self.start_at_login @@ -94,13 +97,15 @@ def ensure_window(self, *, present: bool) -> None: self.window.schedule_startup_ready() return + if self.window_starting: + self.pending_present_when_ready = self.pending_present_when_ready or present + return + controller: SystemWideEqController | None = None initial_curve_label: str | None = None try: controller = SystemWideEqController(self.args.output_sink) - controller.start() - if self.args.import_apo: controller.import_apo_preset(self.args.import_apo) initial_curve_label = imported_apo_curve_label(self.args.import_apo) @@ -110,14 +115,52 @@ def ensure_window(self, *, present: bool) -> None: raise SystemExit(str(exc)) from exc self.controller = controller - self.window = MiniEqWindow(self, self.controller, self.args.auto_route, initial_curve_label=initial_curve_label) - self.window.set_icon_name(APP_ICON_NAME) - self.window.present_when_ready = present - self.window.set_visible(False) - self.window.schedule_startup_ready() - if not present: - self.update_background_status() - self.emit_control_state_changed() + self.window_starting = True + self.window_start_hold = True + self.pending_present_when_ready = present + self.hold() + + def release_start_hold() -> None: + if not self.window_start_hold: + return + + self.window_start_hold = False + self.release() + + def on_ready() -> None: + if self.controller is not controller: + return + + try: + self.window_starting = False + present_when_ready = self.pending_present_when_ready + self.pending_present_when_ready = False + self.window = MiniEqWindow( + self, self.controller, self.args.auto_route, initial_curve_label=initial_curve_label + ) + self.window.set_icon_name(APP_ICON_NAME) + self.window.present_when_ready = present_when_ready + self.window.set_visible(False) + self.window.schedule_startup_ready() + if not present_when_ready: + self.update_background_status() + self.emit_control_state_changed() + finally: + release_start_hold() + + def on_error(exc: Exception) -> None: + self.window_starting = False + self.pending_present_when_ready = False + try: + controller.shutdown() + finally: + if self.controller is controller: + self.controller = None + print(str(exc), file=sys.stderr) + release_start_hold() + self.quit() + + controller.start(on_ready=on_ready, on_error=on_error) def present_main_window(self) -> None: self.ensure_window(present=True) @@ -220,34 +263,50 @@ def run_headless(args: Namespace) -> int: duration_ms = 0 controller: SystemWideEqController | None = None + exit_code = 0 + loop = GLib.MainLoop() + signal_source_ids = install_unix_signal_handlers(loop.quit) try: controller = SystemWideEqController(args.output_sink) - controller.start() if args.import_apo: controller.import_apo_preset(args.import_apo) - if args.auto_route: - controller.route_system_audio(True) - - loop = GLib.MainLoop() - signal_source_ids = install_unix_signal_handlers(loop.quit) - if duration_ms > 0: GLib.timeout_add(duration_ms, lambda: (loop.quit(), False)[1]) + def on_ready() -> None: + nonlocal exit_code + + try: + if args.auto_route: + controller.route_system_audio(True) + except Exception as exc: + exit_code = 1 + print(str(exc), file=sys.stderr) + loop.quit() + + def on_error(exc: Exception) -> None: + nonlocal exit_code + + exit_code = 1 + print(str(exc), file=sys.stderr) + loop.quit() + + controller.start(on_ready=on_ready, on_error=on_error) + try: - loop.run() + if exit_code == 0: + loop.run() except KeyboardInterrupt: pass - finally: - signal_source_ids.clear() finally: + signal_source_ids.clear() if controller is not None: controller.shutdown() - return 0 + return exit_code def install_unix_signal_handlers(callback) -> list[int]: diff --git a/src/mini_eq/deps.py b/src/mini_eq/deps.py index 2b2b3c3..432f005 100644 --- a/src/mini_eq/deps.py +++ b/src/mini_eq/deps.py @@ -12,15 +12,21 @@ Status = Literal["ok", "missing", "warning"] -PWG_REQUIRED_VERSION = "0.3.5" -PWG_REQUIRED_VERSION_PARTS = (0, 3, 5) +PWG_REQUIRED_VERSION = "0.3.6" +PWG_REQUIRED_VERSION_PARTS = (0, 3, 6) PWG_REQUIRED_SYMBOLS = ( "Core.set_pipewire_property", + "Core.sync", "Device.enum_all_params", "Device.enum_params", + "Device.enum_params_sync", "Device.new", "Device.subscribe_params", + "Device.sync", + "Metadata.sync", + "Node.sync", "Param.new_props_controls", + "Registry.sync", "RouteInfo.new_from_param", "Stream.set_pipewire_property", ) diff --git a/src/mini_eq/pipewire_backend.py b/src/mini_eq/pipewire_backend.py index 365b43b..4555b8a 100644 --- a/src/mini_eq/pipewire_backend.py +++ b/src/mini_eq/pipewire_backend.py @@ -1,11 +1,11 @@ from __future__ import annotations import json -import time +from collections.abc import Callable from dataclasses import dataclass, field from typing import Any -from .pipewire_routes import PipeWireRouteMixin +from .pipewire_routes import PipeWireOutputRoute, PipeWireRouteMixin DEFAULT_METADATA_NAME = "default" DEFAULT_AUDIO_SINK_KEY = "default.audio.sink" @@ -71,6 +71,19 @@ class PipeWireBackendError(RuntimeError): pass +class PipeWireNodeWatch: + def __init__(self, cancel_callback: Callable[[], None]) -> None: + self._cancel_callback = cancel_callback + + def cancel(self) -> None: + cancel_callback = self._cancel_callback + if cancel_callback is None: + return + + self._cancel_callback = None + cancel_callback() + + def build_props_controls_param(Pwg, GLib, controls: dict[str, float]): variant = GLib.Variant("a{sd}", {name: float(value) for name, value in controls.items()}) param = Pwg.Param.new_props_controls(variant) @@ -147,6 +160,7 @@ def __init__(self, timeout_ms: int = 2000) -> None: self._loaded_modules: list[Any] = [] self._cached_defaults = PipeWireDefaults(None, None) self._device_route_refreshing_bound_ids: set[int] = set() + self._device_active_output_routes: dict[int, dict[int, PipeWireOutputRoute]] = {} def __enter__(self) -> PipeWireBackend: self.connect() @@ -177,7 +191,7 @@ def connect(self) -> None: if not self._metadata.start(): raise PipeWireBackendError("failed to start PipeWire default metadata discovery") - self._wait_for_initial_state() + self._sync_initial_state() self._connected = True def close(self) -> None: @@ -240,6 +254,7 @@ def close(self) -> None: self._registry = None self._metadata = None self._cached_defaults = PipeWireDefaults(None, None) + self._device_active_output_routes.clear() def list_nodes(self) -> list[PipeWireNode]: self._ensure_connected() @@ -262,6 +277,126 @@ def list_output_streams(self) -> list[PipeWireNode]: def node_from_proxy(self, node) -> PipeWireNode: return self._node_from_global(node) + def watch_for_node( + self, + predicate: Callable[[PipeWireNode], bool], + callback: Callable[[PipeWireNode | None], None], + timeout_ms: int | None = None, + ) -> PipeWireNodeWatch: + self._ensure_connected() + + if self._GLib is None or self._GObject is None or self._registry is None: + raise PipeWireBackendError("PipeWire registry is not connected") + + deadline_ms = max(int(self.timeout_ms if timeout_ms is None else timeout_ms), 1) + state = { + "done": False, + "scheduled": False, + "handler_id": 0, + "timeout_id": 0, + "idle_id": 0, + } + + def maybe_match(global_) -> PipeWireNode | None: + try: + node = self._node_from_global(global_) + except UnicodeDecodeError: + return None + except Exception: + return None + + return node if predicate(node) else None + + def cleanup(*, remove_idle: bool) -> None: + handler_id = int(state["handler_id"]) + state["handler_id"] = 0 + if handler_id > 0: + try: + self._registry.disconnect(handler_id) + except Exception: + pass + + timeout_id = int(state["timeout_id"]) + state["timeout_id"] = 0 + if timeout_id > 0: + try: + self._GLib.source_remove(timeout_id) + except Exception: + pass + + idle_id = int(state["idle_id"]) + if remove_idle and idle_id > 0: + state["idle_id"] = 0 + try: + self._GLib.source_remove(idle_id) + except Exception: + pass + + def complete(node: PipeWireNode | None) -> bool: + if state["done"] or state["scheduled"]: + return False + + state["done"] = True + cleanup(remove_idle=False) + callback(node) + return False + + def complete_from_idle(node: PipeWireNode) -> bool: + state["idle_id"] = 0 + if state["done"]: + return False + + state["done"] = True + callback(node) + return False + + def schedule_complete(node: PipeWireNode) -> None: + if state["done"] or state["scheduled"]: + return + + state["scheduled"] = True + cleanup(remove_idle=False) + state["idle_id"] = self._GLib.idle_add(lambda: complete_from_idle(node)) + + def on_global_added(_registry, global_) -> None: + node = maybe_match(global_) + if node is not None: + complete(node) + + def on_timeout() -> bool: + state["timeout_id"] = 0 + return complete(None) + + def cancel() -> None: + if state["done"]: + return + + state["done"] = True + cleanup(remove_idle=True) + + state["handler_id"] = self._GObject.Object.connect(self._registry, "global-added", on_global_added) + watch = PipeWireNodeWatch(cancel) + for global_ in self._iterate_model(self._registry.dup_globals_by_interface(PIPEWIRE_NODE_INTERFACE)): + node = maybe_match(global_) + if node is not None: + schedule_complete(node) + return watch + + state["timeout_id"] = self._GLib.timeout_add(deadline_ms, on_timeout) + return watch + + def watch_for_audio_sink( + self, + sink_name: str, + callback: Callable[[PipeWireNode | None], None], + timeout_ms: int | None = None, + ) -> PipeWireNodeWatch: + return self.watch_for_node( + lambda node: node.is_audio_sink and node.node_name == sink_name, + callback, + timeout_ms=timeout_ms, + ) + def connect_object_added(self, callback) -> int: self._ensure_connected() handler_id = self._GObject.Object.connect(self._registry, "global-added", callback) @@ -329,7 +464,8 @@ def on_device_param(_device, param) -> None: if param_id != route_param_id or int(device_bound_id) in self._device_route_refreshing_bound_ids: return - callback() + if self._remember_device_route_param(device_bound_id, param): + callback() handler_id = self._GObject.Object.connect(device, "param", on_device_param) self._device_signal_objects[handler_id] = device @@ -363,7 +499,8 @@ def disconnect_device_handler(self, handler_id: int) -> None: def sync(self) -> None: self._ensure_connected() - self._sync_core() + self._sync_registry() + self._sync_metadata() def defaults(self) -> PipeWireDefaults: if self._has_cached_defaults(): @@ -377,7 +514,7 @@ def refresh_defaults(self) -> PipeWireDefaults: return self._cached_defaults except UnicodeDecodeError: try: - self._sync_core() + self._sync_metadata() self._cached_defaults = self._read_defaults() return self._cached_defaults except UnicodeDecodeError as retry_exc: @@ -453,7 +590,7 @@ def set_stream_target(self, stream_bound_id: int, target_bound_id: int, target_s for key, value in ((TARGET_NODE_KEY, str(target_bound_id)), (TARGET_OBJECT_KEY, target_serial)): if not metadata.set(stream_bound_id, key, SPA_ID_TYPE, value): raise PipeWireBackendError(f"failed to set stream target metadata: {stream_bound_id}") - self._sync_core() + self._sync_metadata() def restore_stream_target(self, stream_bound_id: int, target: PipeWireStreamTarget) -> None: metadata = self._default_metadata() @@ -463,7 +600,7 @@ def restore_stream_target(self, stream_bound_id: int, target: PipeWireStreamTarg ): if not metadata.set(stream_bound_id, key, type_name, value): raise PipeWireBackendError(f"failed to restore stream target metadata: {stream_bound_id}") - self._sync_core() + self._sync_metadata() def output_stream_by_bound_id(self, bound_id: int) -> PipeWireNode | None: for stream in self.list_output_streams(): @@ -575,6 +712,7 @@ def _node_proxy_by_bound_id(self, bound_id: int): return None if not node.start(): raise PipeWireBackendError(f"failed to bind node: {bound_id}") + self._sync_proxy(node, "node") self._node_proxies[int(bound_id)] = node return node @@ -593,6 +731,7 @@ def _device_proxy_by_bound_id(self, bound_id: int): return None if not device.start(): raise PipeWireBackendError(f"failed to bind device: {bound_id}") + self._sync_proxy(device, "device") self._device_proxies[int(bound_id)] = device return device @@ -682,36 +821,58 @@ def _ensure_connected(self) -> None: if not self._connected: self.connect() - def _wait_for_initial_state(self) -> None: - deadline = time.monotonic() + (self.timeout_ms / 1000.0) - context = self._GLib.MainContext.default() - - while time.monotonic() < deadline: - while context.pending(): - context.iteration(False) - - registry_ready = self._registry.get_globals().get_n_items() > 0 - metadata_ready = self._metadata.get_bound() - if registry_ready and metadata_ready: - return - - time.sleep(0.01) + def _sync_initial_state(self) -> None: + self._sync_registry() + self._sync_metadata() missing = [] if self._registry.get_globals().get_n_items() <= 0: missing.append("registry") if not self._metadata.get_bound(): missing.append("metadata") - raise PipeWireBackendError(f"PipeWire initialization timed out waiting for: {', '.join(missing)}") + if not missing: + return - def _sync_core(self) -> None: - if self._GLib is None: + raise PipeWireBackendError(f"PipeWire initialization did not report: {', '.join(missing)}") + + def _sync_registry(self) -> None: + if self._registry is None: return - deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.05) - context = self._GLib.MainContext.default() - while time.monotonic() < deadline and context.pending(): - context.iteration(False) + try: + synced = self._registry.sync(max(int(self.timeout_ms), 1)) + except Exception as exc: + raise PipeWireBackendError(f"PipeWire registry sync failed: {exc}") from exc + if synced is False: + raise PipeWireBackendError("PipeWire registry sync failed") + + def _sync_metadata(self) -> None: + if self._metadata is None: + return + + try: + synced = self._metadata.sync(max(int(self.timeout_ms), 1)) + except Exception as exc: + raise PipeWireBackendError(f"PipeWire metadata sync failed: {exc}") from exc + if synced is False: + raise PipeWireBackendError("PipeWire metadata sync failed") + + def _sync_proxy(self, proxy, label: str) -> None: + try: + synced = proxy.sync(max(int(self.timeout_ms), 1)) + except Exception as exc: + raise PipeWireBackendError(f"PipeWire {label} sync failed: {exc}") from exc + if synced is False: + raise PipeWireBackendError(f"PipeWire {label} sync failed") + + def _sync_core(self) -> None: + if self._core is not None: + try: + synced = self._core.sync(max(int(self.timeout_ms), 1)) + except Exception as exc: + raise PipeWireBackendError(f"PipeWire core sync failed: {exc}") from exc + if synced is False: + raise PipeWireBackendError("PipeWire core sync failed") @staticmethod def _import_pipewire_gobject(): diff --git a/src/mini_eq/pipewire_routes.py b/src/mini_eq/pipewire_routes.py index 0bec84d..fd363d8 100644 --- a/src/mini_eq/pipewire_routes.py +++ b/src/mini_eq/pipewire_routes.py @@ -1,6 +1,5 @@ from __future__ import annotations -import time from dataclasses import dataclass, field from urllib.parse import quote @@ -76,7 +75,14 @@ def output_preset_target_for_sink_name(self, sink_name: str | None) -> PipeWireO return PipeWireOutputPresetTarget(node_name, route, tuple(dict.fromkeys(keys))) def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: - if sink is None or sink.device_id <= 0 or not self._has_device_route_api(): + if sink is None or sink.device_id <= 0: + return None + + route = self._cached_output_route_for_sink(sink) + if route is not None: + return route + + if not self._has_device_route_api(): return None device = self._device_proxy_by_bound_id(sink.device_id) @@ -90,6 +96,18 @@ def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: if str(route.direction or "").casefold() == OUTPUT_ROUTE_DIRECTION and (route.availability or "unknown").casefold() != "no" ] + return self._select_output_route_for_sink(sink, output_routes) + + def _cached_output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: + try: + output_routes = tuple(self._device_active_output_routes.get(int(sink.device_id), {}).values()) + except Exception: + output_routes = () + return self._select_output_route_for_sink(sink, output_routes) + + def _select_output_route_for_sink( + self, sink, output_routes: list[PipeWireOutputRoute] | tuple[PipeWireOutputRoute, ...] + ) -> PipeWireOutputRoute | None: if not output_routes: return None @@ -107,13 +125,53 @@ def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None: return None + def _remember_device_route_param(self, device_bound_id: int, param) -> bool: + if not self._has_device_route_api(): + return False + + try: + route_info = self._Pwg.RouteInfo.new_from_param(param) + except Exception: + return False + + if route_info is None: + return False + + route = self._output_route_from_info( + route_info, + device_bound_id, + self._device_name_by_bound_id(device_bound_id), + ) + if str(route.direction or "").casefold() != OUTPUT_ROUTE_DIRECTION: + return False + + device_bound_id = int(device_bound_id) + cached_routes = self._device_active_output_routes.get(device_bound_id, {}) + previous_routes = dict(cached_routes) + if (route.availability or "unknown").casefold() == "no": + if route.route_device not in cached_routes: + return False + + cached_routes = dict(cached_routes) + cached_routes.pop(route.route_device, None) + if cached_routes: + self._device_active_output_routes[device_bound_id] = cached_routes + else: + self._device_active_output_routes.pop(device_bound_id, None) + return True + + current_routes = {route.route_device: route} + self._device_active_output_routes[device_bound_id] = current_routes + return previous_routes != current_routes + def _has_device_route_api(self) -> bool: return ( self._Pwg is not None and hasattr(self._Pwg, "Device") and hasattr(self._Pwg, "RouteInfo") - and hasattr(self._Pwg.Device, "enum_params") + and hasattr(self._Pwg.Device, "enum_params_sync") and hasattr(self._Pwg.Device, "new") + and hasattr(self._Pwg.Device, "sync") and hasattr(self._Pwg.RouteInfo, "new_from_param") ) @@ -166,14 +224,13 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir bound_id = int(device_bound_id) self._device_route_refreshing_bound_ids.add(bound_id) try: - seq = int(device.enum_params(route_param_id, 0, 0)) - if seq < 0: + params = device.enum_params_sync(route_param_id, 0, 0, max(int(self.timeout_ms), 1)) + if params is None: return [] - self._wait_for_param_sequence(lambda: device.get_params(), seq) device_name = self._device_name_by_bound_id(device_bound_id) routes: list[PipeWireOutputRoute] = [] - for param in self._iterate_model(device.get_params()): + for param in self._iterate_model(params): try: param_name = param.dup_name() except Exception: @@ -200,7 +257,7 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir def _device_route_param_id(self, device) -> int | None: route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME) if route_param_id is None: - self._wait_for_device_param_info(device, DEVICE_ROUTE_PARAM_NAME) + self._sync_proxy(device, "device") route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME) return route_param_id @@ -217,21 +274,6 @@ def _device_param_id_by_name(self, device, name: str) -> int | None: return None - def _wait_for_device_param_info(self, device, name: str) -> None: - if self._GLib is None: - return - - deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.2) - context = self._GLib.MainContext.default() - while time.monotonic() < deadline: - while context.pending(): - context.iteration(False) - - if self._device_param_id_by_name(device, name) is not None: - return - - time.sleep(0.005) - def _output_route_from_info( self, route_info, @@ -252,31 +294,6 @@ def _output_route_from_info( info=self._variant_to_string_dict(route_info.get_info()), ) - def _wait_for_param_sequence(self, model_factory, seq: int) -> None: - if self._GLib is None: - return - - deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.2) - context = self._GLib.MainContext.default() - while time.monotonic() < deadline: - while context.pending(): - context.iteration(False) - - if self._model_has_param_sequence(model_factory(), seq): - return - - time.sleep(0.005) - - def _model_has_param_sequence(self, model, seq: int) -> bool: - for param in self._iterate_model(model): - try: - if int(param.get_seq()) == seq: - return True - except Exception: - continue - - return False - @staticmethod def _variant_to_string_dict(variant) -> dict[str, str]: if variant is None: diff --git a/src/mini_eq/routing.py b/src/mini_eq/routing.py index b87724b..291cbe0 100644 --- a/src/mini_eq/routing.py +++ b/src/mini_eq/routing.py @@ -2,7 +2,6 @@ import json import sys -import time from collections.abc import Callable from dataclasses import replace @@ -65,6 +64,8 @@ def __init__(self, output_sink: str | None) -> None: self._output_preset_target: PipeWireOutputPresetTarget | None = None self.filter_output_name = f"{self.virtual_sink_name}{FILTER_OUTPUT_SUFFIX}" self.engine_module = None + self.engine_start_watch = None + self.engine_start_pending = False self.filter_node_id: int | None = None self.output_event_source_id = 0 self.output_object_added_handler_id = 0 @@ -99,8 +100,9 @@ def emit_status(self, message: str) -> None: if getattr(self, "shutting_down", False): return - if self.status_callback is not None: - self.status_callback(message) + status_callback = getattr(self, "status_callback", None) + if status_callback is not None: + status_callback(message) print(message, file=sys.stderr) @@ -239,7 +241,17 @@ def set_analyzer_enabled(self, enabled: bool) -> bool: analyzer.set_enabled(False) self.restore_engine_after_analyzer_failure() return False - self.start_engine() + + def on_ready() -> None: + if self.routed and self.stream_router is not None: + self.stream_router.route_output_streams() + + def on_error(exc: Exception) -> None: + analyzer.set_enabled(False) + self.emit_status(f"filter-chain restart after analyzer enable failed: {exc}") + self.restore_engine_after_analyzer_failure() + + self.start_engine(on_ready=on_ready, on_error=on_error) except Exception: analyzer.set_enabled(False) try: @@ -248,9 +260,6 @@ def set_analyzer_enabled(self, enabled: bool) -> bool: self.emit_status(f"filter-chain restore after analyzer failure failed: {restore_exc}") raise - if self.routed and self.stream_router is not None: - self.stream_router.route_output_streams() - return started return analyzer.set_enabled(enabled) @@ -499,40 +508,6 @@ def route_system_audio(self, enabled: bool, announce: bool = True, *, refresh_ou def build_default_bands(self) -> list[EqBand]: return default_eq_bands() - def wait_for_virtual_sink(self, timeout_seconds: float = 3.0) -> None: - deadline = time.monotonic() + timeout_seconds - - while time.monotonic() < deadline: - try: - self.output_backend.sync() - except Exception: - pass - - if self.get_sink(self.virtual_sink_name) is not None: - return - - time.sleep(0.05) - - raise RuntimeError(f"virtual sink did not appear: {self.virtual_sink_name}") - - def wait_for_filter_node(self, timeout_seconds: float = 3.0) -> None: - deadline = time.monotonic() + timeout_seconds - - while time.monotonic() < deadline: - node_id = self.find_filter_node_id() - - if node_id is not None: - self.filter_node_id = node_id - return - - time.sleep(0.05) - - raise RuntimeError(f"filter-chain did not create {self.virtual_sink_name}") - - def find_filter_node_id(self) -> int | None: - sink = self.get_sink(self.virtual_sink_name) - return sink.bound_id if sink is not None else None - def active_sample_rate(self) -> float: for sink_name in (self.virtual_sink_name, self.output_sink): rate = node_sample_rate(self.get_sink(sink_name)) @@ -551,25 +526,70 @@ def build_filter_chain_module_args(self) -> str: output_sink=self.output_sink, ) - def start_engine(self) -> None: - if self.engine_module is not None: + def cancel_pending_engine_start(self) -> None: + watch = getattr(self, "engine_start_watch", None) + self.engine_start_watch = None + self.engine_start_pending = False + if watch is not None: + watch.cancel() + + def start_engine( + self, + *, + on_ready: Callable[[], None] | None = None, + on_error: Callable[[Exception], None] | None = None, + ) -> None: + if self.running: + if on_ready is not None: + on_ready() + return + + if getattr(self, "engine_start_pending", False): return self.engine_module = self.output_backend.load_filter_chain_module(self.build_filter_chain_module_args()) + self.engine_start_pending = True - try: - self.wait_for_virtual_sink() - self.wait_for_filter_node() - self.running = True - self.emit_status(f"filter-chain PipeWire EQ ready: {self.virtual_sink_name} -> {self.output_sink}") - except Exception: + def fail(exc: Exception) -> None: + self.engine_start_watch = None + self.engine_start_pending = False self.engine_module = None self.filter_node_id = None try: self.output_backend.sync() except Exception: pass - raise + if on_error is not None: + on_error(exc) + else: + self.emit_status(str(exc)) + + def on_sink_ready(sink: PipeWireNode | None) -> None: + self.engine_start_watch = None + self.engine_start_pending = False + + if getattr(self, "shutting_down", False) or self.engine_module is None: + return + + if sink is None: + fail(RuntimeError(f"filter-chain did not create {self.virtual_sink_name}")) + return + + self.filter_node_id = sink.bound_id + self.running = True + self.emit_status(f"filter-chain PipeWire EQ ready: {self.virtual_sink_name} -> {self.output_sink}") + self.apply_state_to_engine() + if on_ready is not None: + on_ready() + + try: + self.engine_start_watch = self.output_backend.watch_for_audio_sink( + self.virtual_sink_name, + on_sink_ready, + timeout_ms=3000, + ) + except Exception as exc: + fail(exc) def retarget_filter_output(self) -> bool: if not self.running or self.filter_node_id is None: @@ -585,15 +605,18 @@ def retarget_filter_output(self) -> bool: return False def restore_engine_after_analyzer_failure(self) -> None: - if self.running or self.engine_module is not None: + if self.running or getattr(self, "engine_module", None) is not None: return - self.start_engine() - if self.routed and self.stream_router is not None: - self.stream_router.route_output_streams() + def on_ready() -> None: + if self.routed and self.stream_router is not None: + self.stream_router.route_output_streams() + + self.start_engine(on_ready=on_ready, on_error=lambda exc: self.emit_status(str(exc))) def stop_engine(self, announce: bool = True) -> None: - module = self.engine_module + self.cancel_pending_engine_start() + module = getattr(self, "engine_module", None) if module is None: self.filter_node_id = None self.running = False @@ -630,10 +653,18 @@ def restart_engine(self) -> None: stream_router.emit_warning(exc) self.stop_engine(announce=False) - self.start_engine() - if stream_router is not None: - stream_router.start_monitoring(require_initial_route=True) + def on_ready() -> None: + if stream_router is not None: + stream_router.start_monitoring(require_initial_route=True) + + def on_error(exc: Exception) -> None: + if stream_router is not None: + stream_router.emit_warning(exc) + else: + self.emit_status(str(exc)) + + self.start_engine(on_ready=on_ready, on_error=on_error) def set_filter_controls(self, controls: dict[str, float]) -> None: if self.filter_node_id is None or not self.running: @@ -666,19 +697,57 @@ def apply_state_to_engine(self) -> None: controls = builtin_biquad_control_values(self.bands, self.preamp_db, self.eq_enabled, self.active_sample_rate()) self.set_filter_controls(controls) - def start(self) -> None: + def start( + self, + *, + on_ready: Callable[[], None] | None = None, + on_error: Callable[[Exception], None] | None = None, + ) -> None: try: self.refresh_followed_output_sink() self.prepare_output_analyzer() - self.start_engine() - self.start_output_event_monitoring() - except Exception: + except Exception as exc: if self.stream_router is not None: self.stream_router.stop_monitoring() self.stop_engine() self.stop_output_event_monitoring() + if on_error is not None: + on_error(exc) + return raise + def on_engine_ready() -> None: + try: + self.start_output_event_monitoring() + except Exception as exc: + if self.stream_router is not None: + self.stream_router.stop_monitoring() + self.stop_engine() + self.stop_output_event_monitoring() + if on_error is not None: + on_error(exc) + return + raise + if on_ready is not None: + on_ready() + + def on_engine_error(exc: Exception) -> None: + if self.stream_router is not None: + self.stream_router.stop_monitoring() + self.stop_engine() + self.stop_output_event_monitoring() + if on_error is not None: + on_error(exc) + else: + self.emit_status(str(exc)) + + try: + self.start_engine(on_ready=on_engine_ready, on_error=on_engine_error) + except Exception as exc: + on_engine_error(exc) + if on_error is None: + raise + def shutdown(self) -> None: self.shutting_down = True self.status_callback = None diff --git a/tests/test_mini_eq_app.py b/tests/test_mini_eq_app.py index 7166426..17692aa 100644 --- a/tests/test_mini_eq_app.py +++ b/tests/test_mini_eq_app.py @@ -141,6 +141,38 @@ def test_finish_startup_notification_ignores_missing_gdk_startup_id(monkeypatch) app.MiniEqApplication.finish_startup_notification(SimpleNamespace()) +def test_run_headless_skips_loop_after_synchronous_start_error(monkeypatch, capsys) -> None: + calls: list[str] = [] + + class FakeLoop: + def run(self) -> None: + calls.append("run") + raise AssertionError("run should not be called after a synchronous startup error") + + def quit(self) -> None: + calls.append("quit") + + class FakeController: + def __init__(self, output_sink: str | None) -> None: + calls.append(f"controller:{output_sink}") + + def start(self, *, on_ready=None, on_error=None) -> None: + calls.append("start") + on_error(RuntimeError("startup failed")) + + def shutdown(self) -> None: + calls.append("shutdown") + + monkeypatch.setattr(app.GLib, "MainLoop", FakeLoop) + monkeypatch.setattr(app, "install_unix_signal_handlers", lambda _callback: []) + monkeypatch.setattr(app, "SystemWideEqController", FakeController) + args = SimpleNamespace(duration=None, output_sink="speakers", import_apo=None, auto_route=False) + + assert app.run_headless(args) == 1 + assert calls == ["controller:speakers", "start", "quit", "shutdown"] + assert "startup failed" in capsys.readouterr().err + + def test_close_action_closes_active_window() -> None: window = FakeWindow(ui_shutting_down=False) application = FakeApplication(window=window) diff --git a/tests/test_mini_eq_deps.py b/tests/test_mini_eq_deps.py index 9ce30c5..7b0074a 100644 --- a/tests/test_mini_eq_deps.py +++ b/tests/test_mini_eq_deps.py @@ -93,15 +93,20 @@ def fake_check(namespace: str, version: str, label: str, required: bool, hint: s def test_pipewire_gobject_check_requires_current_library_version(monkeypatch) -> None: fake_pwg = SimpleNamespace( - get_library_version=lambda: "0.3.4", - Core=SimpleNamespace(set_pipewire_property=object()), + get_library_version=lambda: "0.3.5", + Core=SimpleNamespace(set_pipewire_property=object(), sync=object()), Device=SimpleNamespace( enum_all_params=object(), enum_params=object(), + enum_params_sync=object(), new=object(), subscribe_params=object(), + sync=object(), ), + Metadata=SimpleNamespace(sync=object()), + Node=SimpleNamespace(sync=object()), Param=SimpleNamespace(new_props_controls=object()), + Registry=SimpleNamespace(sync=object()), RouteInfo=SimpleNamespace(new_from_param=object()), Stream=SimpleNamespace(set_pipewire_property=object()), ) @@ -121,20 +126,25 @@ def test_pipewire_gobject_check_requires_current_library_version(monkeypatch) -> check = deps.check_pipewire_gobject() assert not check.ok - assert "older than required 0.3.5" in check.detail + assert "older than required 0.3.6" in check.detail def test_pipewire_gobject_check_requires_property_override_symbols(monkeypatch) -> None: fake_pwg = SimpleNamespace( - get_library_version=lambda: "0.3.5", + get_library_version=lambda: "0.3.6", Core=SimpleNamespace(), Device=SimpleNamespace( enum_all_params=object(), enum_params=object(), + enum_params_sync=object(), new=object(), subscribe_params=object(), + sync=object(), ), + Metadata=SimpleNamespace(sync=object()), + Node=SimpleNamespace(sync=object()), Param=SimpleNamespace(new_props_controls=object()), + Registry=SimpleNamespace(sync=object()), RouteInfo=SimpleNamespace(new_from_param=object()), Stream=SimpleNamespace(set_pipewire_property=object()), ) diff --git a/tests/test_mini_eq_pipewire_backend.py b/tests/test_mini_eq_pipewire_backend.py index f3082b5..b1d8124 100644 --- a/tests/test_mini_eq_pipewire_backend.py +++ b/tests/test_mini_eq_pipewire_backend.py @@ -94,6 +94,100 @@ def get_global_properties(self) -> FakeGlobalProperties: return self.properties +def make_node_global( + bound_id: int, + name: str | None, + media_class: str = pw_backend.AUDIO_SINK, +) -> FakePropertyProxy: + properties = [ + FakePropertyItem("object.serial", str(bound_id + 1000)), + FakePropertyItem("media.class", media_class), + ] + if name is not None: + properties.append(FakePropertyItem("node.name", name)) + + class FakeGlobal(FakePropertyProxy): + def get_id(self) -> int: + return bound_id + + return FakeGlobal(FakeGlobalProperties(properties)) + + +class FakeModel: + def __init__(self, items: list[object]) -> None: + self.items = items + + def get_n_items(self) -> int: + return len(self.items) + + def get_item(self, index: int) -> object: + return self.items[index] + + +class FakeWaitRegistry: + def __init__(self, globals_: list[object]) -> None: + self.globals = globals_ + self.callbacks = {} + self.disconnected: list[int] = [] + self.next_handler_id = 1 + + def dup_globals_by_interface(self, interface_type: str) -> FakeModel: + assert interface_type == pw_backend.PIPEWIRE_NODE_INTERFACE + return FakeModel(self.globals) + + def connect(self, signal_name: str, callback) -> int: + assert signal_name == "global-added" + handler_id = self.next_handler_id + self.next_handler_id += 1 + self.callbacks[handler_id] = callback + return handler_id + + def disconnect(self, handler_id: int) -> None: + self.disconnected.append(handler_id) + self.callbacks.pop(handler_id, None) + + def emit_global_added(self, global_) -> None: + for callback in list(self.callbacks.values()): + callback(self, global_) + + +class FakeWaitObject: + @staticmethod + def connect(obj, signal_name: str, callback) -> int: + return obj.connect(signal_name, callback) + + +class FakeWaitGObject: + Object = FakeWaitObject + + +class FakeWaitGLib: + def __init__(self) -> None: + self.idle_callback = None + self.removed_sources: list[int] = [] + self.timeout_callback = None + self.timeout_ms: int | None = None + + def idle_add(self, callback) -> int: + self.idle_callback = callback + return 88 + + def timeout_add(self, timeout_ms: int, callback) -> int: + self.timeout_ms = timeout_ms + self.timeout_callback = callback + return 77 + + def source_remove(self, source_id: int) -> bool: + self.removed_sources.append(source_id) + return True + + def run_idle(self) -> None: + assert self.idle_callback is not None + callback = self.idle_callback + self.idle_callback = None + callback() + + class FakeSource: def __init__(self) -> None: self.destroyed = False @@ -104,19 +198,12 @@ def destroy(self) -> None: class FakeSyncCore: def __init__(self) -> None: - self.callback = None - - def sync(self, _cancellable, callback, _user_data) -> bool: - self.callback = callback - return True + self.sync_calls: list[int] = [] - def sync_finish(self, _result) -> bool: + def sync(self, timeout_ms: int) -> bool: + self.sync_calls.append(timeout_ms) return True - def complete_sync(self) -> None: - assert self.callback is not None - self.callback(self, object(), None) - class FakeMainContext: def __init__(self, source: FakeSource) -> None: @@ -138,28 +225,12 @@ def find_source_by_id(self, source_id: int) -> FakeSource | None: return self.source if source_id == 77 else None -class FakeSyncLoop: - def __init__(self, core: FakeSyncCore) -> None: - self.core = core - self.quit_count = 0 - - def run(self) -> None: - self.core.complete_sync() - - def quit(self) -> None: - self.quit_count += 1 - - class FakeSyncGLib: def __init__(self, core: FakeSyncCore) -> None: - self.core = core self.source = FakeSource() self.MainContext = FakeMainContext(self.source) self.timeout_callback = None - def MainLoop(self) -> FakeSyncLoop: - return FakeSyncLoop(self.core) - def timeout_add(self, _timeout_ms: int, callback) -> int: self.timeout_callback = callback return 77 @@ -190,7 +261,7 @@ class FakePwg: class FakeDeviceApi: @staticmethod - def enum_params(): + def enum_params_sync(): return None @staticmethod @@ -201,6 +272,10 @@ def new(): def subscribe_params(): return None + @staticmethod + def sync(): + return None + class FakeRouteInfoApi: @staticmethod @@ -378,16 +453,73 @@ def test_new_core_uses_pipewire_gobject_core_constructor() -> None: } -def test_sync_core_drains_pending_main_context_events() -> None: +def test_sync_core_uses_roundtrip() -> None: core = FakeSyncCore() - glib = FakeSyncGLib(core) backend = pw_backend.PipeWireBackend() backend._core = core - backend._GLib = glib backend._sync_core() - assert glib.MainContext.iterations == 1 + assert core.sync_calls == [2000] + + +def test_watch_for_audio_sink_reports_existing_registry_node_on_idle() -> None: + registry = FakeWaitRegistry([make_node_global(42, "mini_eq_sink")]) + glib = FakeWaitGLib() + backend = pw_backend.PipeWireBackend() + backend._ensure_connected = lambda: None + backend._registry = registry + backend._GLib = glib + backend._GObject = FakeWaitGObject + nodes: list[pw_backend.PipeWireNode | None] = [] + + backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234) + + assert nodes == [] + glib.run_idle() + assert nodes[0] is not None + assert nodes[0].bound_id == 42 + assert glib.timeout_ms is None + assert registry.disconnected == [1] + + +def test_watch_for_audio_sink_resolves_from_global_added_signal() -> None: + registry = FakeWaitRegistry([make_node_global(1, "speakers")]) + glib = FakeWaitGLib() + backend = pw_backend.PipeWireBackend() + backend._ensure_connected = lambda: None + backend._registry = registry + backend._GLib = glib + backend._GObject = FakeWaitGObject + nodes: list[pw_backend.PipeWireNode | None] = [] + + backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234) + registry.emit_global_added(make_node_global(42, "mini_eq_sink")) + + assert nodes[0] is not None + assert nodes[0].bound_id == 42 + assert glib.timeout_ms == 1234 + assert glib.removed_sources == [77] + assert registry.disconnected == [1] + + +def test_watch_for_audio_sink_reports_none_on_timeout() -> None: + registry = FakeWaitRegistry([make_node_global(1, "speakers")]) + glib = FakeWaitGLib() + backend = pw_backend.PipeWireBackend() + backend._ensure_connected = lambda: None + backend._registry = registry + backend._GLib = glib + backend._GObject = FakeWaitGObject + nodes: list[pw_backend.PipeWireNode | None] = [] + + backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234) + glib.timeout_callback() + + assert nodes == [None] + assert glib.timeout_ms == 1234 + assert glib.removed_sources == [] + assert registry.disconnected == [1] def test_move_stream_to_target_sets_stream_target_without_metadata_readback() -> None: @@ -579,11 +711,16 @@ def test_output_preset_keys_fall_back_to_sink_name_without_route_api(monkeypatch def test_enumerate_device_routes_ignores_enum_route_params(monkeypatch: pytest.MonkeyPatch) -> None: class FakeParam: - def __init__(self, name: str) -> None: + def __init__(self, name: str, *, seq: int = 12, next_index: int = 0) -> None: self.name = name + self.seq = seq + self.next_index = next_index def get_seq(self) -> int: - return 12 + return self.seq + + def get_next(self) -> int: + return self.next_index def dup_name(self) -> str: return self.name @@ -611,9 +748,9 @@ def __init__(self) -> None: self.param_infos = FakeModel([FakeParamInfo()]) self.enum_calls: list[tuple[int, int, int]] = [] - def enum_params(self, param_id: int, start: int, num: int) -> int: + def enum_params_sync(self, param_id: int, start: int, num: int, _timeout_ms: int) -> FakeModel: self.enum_calls.append((param_id, start, num)) - return 12 + return self.params def get_params(self) -> FakeModel: return self.params @@ -668,10 +805,111 @@ def new_from_param(param: FakeParam) -> FakeRouteInfo: assert backend._device_route_refreshing_bound_ids == set() +def test_enumerate_device_routes_uses_request_scoped_params(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeParam: + def __init__(self, name: str, *, seq: int, route_name: str) -> None: + self.name = name + self.seq = seq + self.route_name = route_name + + def get_seq(self) -> int: + return self.seq + + def get_next(self) -> int: + return 0 + + def dup_name(self) -> str: + return self.name + + class FakeParamInfo: + def get_id(self) -> int: + return 13 + + def dup_name(self) -> str: + return "Route" + + class FakeModel: + def __init__(self, items: list[object]) -> None: + self.items = items + + def get_n_items(self) -> int: + return len(self.items) + + def get_item(self, index: int) -> object: + return self.items[index] + + class FakeDevice: + def __init__(self) -> None: + self.params = FakeModel( + [ + FakeParam("Route", seq=12, route_name="analog-output-headphones"), + ] + ) + self.param_infos = FakeModel([FakeParamInfo()]) + + def enum_params_sync(self, _param_id: int, _start: int, _num: int, _timeout_ms: int) -> FakeModel: + return self.params + + def get_params(self) -> FakeModel: + return self.params + + def get_param_infos(self) -> FakeModel: + return self.param_infos + + class FakeRouteInfo: + def __init__(self, param: FakeParam) -> None: + self.param = param + + def get_index(self) -> int: + return 1 + + def get_device(self) -> int: + return 6 + + def get_profile(self) -> int: + return 0 + + def get_priority(self) -> int: + return 200 + + def dup_direction(self) -> str: + return "output" + + def dup_name(self) -> str: + return self.param.route_name + + def dup_description(self) -> str: + return self.param.route_name + + def dup_availability(self) -> str: + return "yes" + + def get_info(self) -> dict[str, str]: + return {} + + backend = pw_backend.PipeWireBackend() + backend._Pwg = SimpleNamespace(RouteInfo=SimpleNamespace(new_from_param=FakeRouteInfo)) + monkeypatch.setattr(backend, "_device_name_by_bound_id", lambda _bound_id: "alsa_card.test") + + routes = backend._enumerate_device_routes(FakeDevice(), 72) + + assert [route.name for route in routes] == ["analog-output-headphones"] + + def test_connect_device_route_changed_subscribes_to_route_param(monkeypatch: pytest.MonkeyPatch) -> None: class FakeParam: - def __init__(self, param_id: int) -> None: + def __init__( + self, + param_id: int, + *, + route_device: int = 6, + direction: str = "output", + route_name: str = "analog-output-headphones", + ) -> None: self.param_id = param_id + self.route_device = route_device + self.direction = direction + self.route_name = route_name def get_id(self) -> int: return self.param_id @@ -709,9 +947,40 @@ def subscribe_params(self, ids: FakeVariant) -> None: def disconnect(self, handler_id: int) -> None: self.disconnected.append(handler_id) - def emit_param(self, param_id: int) -> None: + def emit_param(self, param_id: int, **kwargs) -> None: assert self.param_callback is not None - self.param_callback(self, FakeParam(param_id)) + self.param_callback(self, FakeParam(param_id, **kwargs)) + + class FakeRouteInfo: + def __init__(self, param: FakeParam) -> None: + self.param = param + + def get_index(self) -> int: + return 1 + + def get_device(self) -> int: + return self.param.route_device + + def get_profile(self) -> int: + return 0 + + def get_priority(self) -> int: + return 200 + + def dup_direction(self) -> str: + return self.param.direction + + def dup_name(self) -> str: + return self.param.route_name + + def dup_description(self) -> str: + return self.param.route_name + + def dup_availability(self) -> str: + return "yes" + + def get_info(self) -> dict[str, str]: + return {} class FakeGObjectObject: @staticmethod @@ -722,11 +991,12 @@ def connect(device: FakeDevice, signal_name: str, callback) -> int: backend = pw_backend.PipeWireBackend() device = FakeDevice() - backend._Pwg = SimpleNamespace(Device=FakeDeviceApi, RouteInfo=FakeRouteInfoApi) + backend._Pwg = SimpleNamespace(Device=FakeDeviceApi, RouteInfo=SimpleNamespace(new_from_param=FakeRouteInfo)) backend._GLib = FakeGLib backend._GObject = SimpleNamespace(Object=FakeGObjectObject) backend._ensure_connected = lambda: None monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: device) + monkeypatch.setattr(backend, "_device_name_by_bound_id", lambda _bound_id: "alsa_card.test") calls: list[str] = [] handler_id = backend.connect_device_route_changed(72, lambda: calls.append("route")) @@ -737,18 +1007,64 @@ def connect(device: FakeDevice, signal_name: str, callback) -> int: device.emit_param(12) assert calls == [] - device.emit_param(13) + device.emit_param(13, direction="input") + assert calls == [] + + device.emit_param(13, route_name="analog-output-headphones") assert calls == ["route"] + assert backend._device_active_output_routes[72][6].name == "analog-output-headphones" + + device.emit_param(13, route_device=7, route_name="analog-output-speaker") + assert calls == ["route", "route"] + assert tuple(backend._device_active_output_routes[72]) == (7,) + assert backend._device_active_output_routes[72][7].name == "analog-output-speaker" backend._device_route_refreshing_bound_ids.add(72) - device.emit_param(13) - assert calls == ["route"] + device.emit_param(13, route_device=6, route_name="analog-output-headphones") + assert calls == ["route", "route"] + assert tuple(backend._device_active_output_routes[72]) == (7,) + assert backend._device_active_output_routes[72][7].name == "analog-output-speaker" backend.disconnect_device_handler(handler_id) assert [(variant.signature, variant.value) for variant in device.subscriptions] == [("au", [13]), ("au", [])] assert device.disconnected == [77] +def test_output_preset_keys_use_subscribed_active_route_cache(monkeypatch: pytest.MonkeyPatch) -> None: + backend = pw_backend.PipeWireBackend() + route = pw_routes.PipeWireOutputRoute( + device_bound_id=72, + device_name="alsa_card.test", + index=1, + route_device=7, + profile=0, + priority=200, + direction="output", + name="analog-output-speaker", + description="Speakers", + availability="yes", + ) + sink = pw_backend.PipeWireNode( + bound_id=39, + object_serial="67", + media_class=pw_backend.AUDIO_SINK, + node_name="alsa_output.test", + node_description="Test Sink", + application_name=None, + node_dont_move=False, + device_id=72, + card_profile_device=6, + ) + backend._device_active_output_routes = {72: {7: route}} + backend._Pwg = SimpleNamespace() + monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink) + + assert backend.output_preset_keys_for_sink_name("alsa_output.test") == ( + "pipewire-route:v1:device=alsa_card.test;route=analog-output-speaker;route-device=7", + "alsa_output.test", + ) + + def test_move_named_output_stream_to_target_uses_matching_stream() -> None: backend = pw_backend.PipeWireBackend() stream = pw_backend.PipeWireNode( @@ -792,7 +1108,7 @@ def set(self, subject: int, key: str, type_name: str, value: str) -> bool: metadata = FakeMetadata() syncs: list[str] = [] backend._default_metadata = lambda: metadata - backend._sync_core = lambda: syncs.append("sync") + backend._sync_metadata = lambda: syncs.append("sync") backend.set_stream_target(126, 39, "67") @@ -842,7 +1158,7 @@ def set(self, subject: int, key: str, type_name: str | None, value: str | None) metadata = FakeMetadata() syncs: list[str] = [] backend._default_metadata = lambda: metadata - backend._sync_core = lambda: syncs.append("sync") + backend._sync_metadata = lambda: syncs.append("sync") backend.restore_stream_target( 126, @@ -948,7 +1264,7 @@ def raise_decode_error(): raise UnicodeDecodeError("utf-8", b"\xb1", 0, 1, "invalid start byte") monkeypatch.setattr(backend, "_read_defaults", raise_decode_error) - monkeypatch.setattr(backend, "_sync_core", lambda: syncs.append(True)) + monkeypatch.setattr(backend, "_sync_metadata", lambda: syncs.append(True)) assert backend.refresh_defaults().default_audio_sink == "cached.default" assert syncs == [True] diff --git a/tests/test_mini_eq_routing.py b/tests/test_mini_eq_routing.py index a6aa69a..633094a 100644 --- a/tests/test_mini_eq_routing.py +++ b/tests/test_mini_eq_routing.py @@ -506,9 +506,11 @@ def stop_engine(*, announce: bool = True) -> None: calls.append(("stop", announce)) controller.running = False - def start_engine() -> None: + def start_engine(*, on_ready=None, on_error=None) -> None: calls.append("start") controller.running = True + if on_ready is not None: + on_ready() controller.output_backend = FakeBackend([make_node(1, "speakers"), make_node(2, "hdmi")]) controller.output_sink = "speakers" @@ -561,9 +563,11 @@ def stop_engine(*, announce: bool = True) -> None: calls.append(f"stop:{announce}") controller.running = False - def start_engine() -> None: + def start_engine(*, on_ready=None, on_error=None) -> None: calls.append("start") controller.running = True + if on_ready is not None: + on_ready() controller.stop_engine = stop_engine controller.start_engine = start_engine @@ -689,13 +693,15 @@ def stop_engine(*, announce: bool = True) -> None: start_attempts = 0 - def start_engine() -> None: + def start_engine(*, on_ready=None, on_error=None) -> None: nonlocal start_attempts start_attempts += 1 calls.append("start") if start_attempts == 1: raise RuntimeError("virtual sink did not appear") controller.running = True + if on_ready is not None: + on_ready() controller.stop_engine = stop_engine controller.start_engine = start_engine @@ -735,9 +741,11 @@ def stop_engine(*, announce: bool = True) -> None: calls.append(f"stop:{announce}") controller.running = False - def start_engine() -> None: + def start_engine(*, on_ready=None, on_error=None) -> None: calls.append("start") controller.running = True + if on_ready is not None: + on_ready() controller.stop_engine = stop_engine controller.start_engine = start_engine @@ -849,7 +857,7 @@ def test_start_prepares_analyzer_before_filter_chain_engine() -> None: controller.refresh_followed_output_sink = lambda: calls.append("refresh") controller.prepare_output_analyzer = lambda: calls.append("prepare") or True - controller.start_engine = lambda: calls.append("engine") + controller.start_engine = lambda *, on_ready=None, on_error=None: (calls.append("engine"), on_ready and on_ready()) controller.start_output_event_monitoring = lambda: calls.append("monitor") controller.stream_router = None controller.stop_engine = lambda: calls.append("stop-engine") @@ -1093,6 +1101,104 @@ def disable(self, announce: bool = True) -> None: assert controller.routed is False +def test_start_engine_waits_for_filter_chain_node_from_registry() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + module = object() + calls: list[str] = [] + + class FakeWatch: + def cancel(self) -> None: + calls.append("cancel") + + class FakeBackend: + def __init__(self) -> None: + self.callback = None + + def load_filter_chain_module(self, arguments: str): + calls.append(f"load:{arguments}") + return module + + def watch_for_audio_sink(self, sink_name: str, callback, *, timeout_ms: int): + calls.append(f"wait:{sink_name}:{timeout_ms}") + self.callback = callback + return FakeWatch() + + backend = FakeBackend() + controller.engine_module = None + controller.engine_start_pending = False + controller.engine_start_watch = None + controller.filter_node_id = None + controller.running = False + controller.output_backend = backend + controller.virtual_sink_name = "mini_eq_sink" + controller.output_sink = "speakers" + controller.build_filter_chain_module_args = lambda: "module args" + controller.emit_status = lambda message: calls.append(f"status:{message}") + controller.apply_state_to_engine = lambda: calls.append("apply") + + routing.SystemWideEqController.start_engine(controller) + assert backend.callback is not None + backend.callback(make_node(42, "mini_eq_sink")) + + assert calls == [ + "load:module args", + "wait:mini_eq_sink:3000", + "status:filter-chain PipeWire EQ ready: mini_eq_sink -> speakers", + "apply", + ] + assert controller.engine_module is module + assert controller.filter_node_id == 42 + assert controller.running is True + + +def test_start_engine_clears_module_when_filter_chain_node_times_out() -> None: + controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) + module = object() + calls: list[str] = [] + + class FakeWatch: + def cancel(self) -> None: + calls.append("cancel") + + class FakeBackend: + def __init__(self) -> None: + self.callback = None + + def load_filter_chain_module(self, arguments: str): + calls.append(f"load:{arguments}") + return module + + def watch_for_audio_sink(self, sink_name: str, callback, *, timeout_ms: int): + calls.append(f"wait:{sink_name}:{timeout_ms}") + self.callback = callback + return FakeWatch() + + def sync(self) -> None: + calls.append("sync") + + backend = FakeBackend() + controller.engine_module = None + controller.engine_start_pending = False + controller.engine_start_watch = None + controller.filter_node_id = None + controller.running = False + controller.output_backend = backend + controller.virtual_sink_name = "mini_eq_sink" + controller.output_sink = "speakers" + controller.build_filter_chain_module_args = lambda: "module args" + errors: list[str] = [] + + routing.SystemWideEqController.start_engine(controller, on_error=lambda exc: errors.append(str(exc))) + assert backend.callback is not None + backend.callback(None) + + assert calls == ["load:module args", "wait:mini_eq_sink:3000", "sync"] + assert errors == ["filter-chain did not create mini_eq_sink"] + assert controller.engine_module is None + assert controller.filter_node_id is None + assert controller.running is False + + def test_stop_engine_unloads_filter_chain_module_before_clearing_state() -> None: controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController) module = object() @@ -1141,7 +1247,10 @@ def start_monitoring(self, *, require_initial_route: bool = False) -> None: controller.stream_router = FakeRouter() controller.stop_engine = lambda *, announce=True: calls.append(f"stop-engine:{announce}") - controller.start_engine = lambda: calls.append("start-engine") + controller.start_engine = lambda *, on_ready=None, on_error=None: ( + calls.append("start-engine"), + on_ready and on_ready(), + ) routing.SystemWideEqController.restart_engine(controller) @@ -1176,7 +1285,10 @@ def start_monitoring(self, *, require_initial_route: bool = False) -> None: controller.stream_router = FakeRouter() controller.stop_engine = lambda *, announce=True: calls.append(f"stop-engine:{announce}") - controller.start_engine = lambda: calls.append("start-engine") + controller.start_engine = lambda *, on_ready=None, on_error=None: ( + calls.append("start-engine"), + on_ready and on_ready(), + ) routing.SystemWideEqController.restart_engine(controller) diff --git a/tools/check_live_ui_runtime.py b/tools/check_live_ui_runtime.py index 63dfc90..66eef35 100755 --- a/tools/check_live_ui_runtime.py +++ b/tools/check_live_ui_runtime.py @@ -1024,15 +1024,15 @@ def run_ui_flow( timeout_seconds, ) compare_switch = driver.wait_for_accessible( - "Compare switch", - lambda: driver.find(frame, name="Compare", role="switch", showing=True), + "A/B compare switch", + lambda: driver.find(frame, name="A/B", role="switch", showing=True), timeout_seconds, ) if not driver.sensitive(route_switch): raise AssertionError("System-wide EQ switch is not sensitive") if not driver.sensitive(compare_switch): - raise AssertionError("Compare switch should become sensitive when routing is active") + raise AssertionError("A/B compare switch should become sensitive when routing is active") verify_dropdown_exposes_options( driver, diff --git a/tools/check_pipewire_gobject.py b/tools/check_pipewire_gobject.py index 0b95b22..54e0a9a 100644 --- a/tools/check_pipewire_gobject.py +++ b/tools/check_pipewire_gobject.py @@ -9,18 +9,24 @@ "Core.new", "Core.load_module", "Core.set_pipewire_property", + "Core.sync", "Device.enum_all_params", "Device.enum_params", + "Device.enum_params_sync", "Device.new", + "Device.sync", "Global.dup_property", "get_library_version", "Metadata.new", "Metadata.set", + "Metadata.sync", "Node.new", "Node.set_param", + "Node.sync", "Param.new_props_controls", "Registry.new", "Registry.dup_globals_by_interface", + "Registry.sync", "RouteInfo.new_from_param", "Stream.new_audio_capture", "Stream.set_deliver_audio_blocks",