diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f85a282..b22800f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -44,6 +44,10 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
+env:
+ PWG_VERSION: "0.3.6"
+ PWG_REQUIREMENT: "pipewire-gobject>=0.3.6,<0.4"
+
jobs:
changes:
runs-on: ubuntu-24.04
@@ -291,11 +295,11 @@ jobs:
run: |
python3 -m venv /tmp/mini-eq-pwg-build
/tmp/mini-eq-pwg-build/bin/python -m pip install --upgrade pip
- /tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.5,<0.4' -w /tmp/mini-eq-wheelhouse
+ /tmp/mini-eq-pwg-build/bin/python -m pip wheel "$PWG_REQUIREMENT" -w /tmp/mini-eq-wheelhouse
python3 -m venv --system-site-packages .venv
.venv/bin/python -m pip install --upgrade pip
- .venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4'
+ .venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse "$PWG_REQUIREMENT"
.venv/bin/python -m pip install -e '.[dev]'
- name: Lint
@@ -323,7 +327,7 @@ jobs:
run: |
python3 -m venv --system-site-packages /tmp/mini-eq-wheel-test
/tmp/mini-eq-wheel-test/bin/python -m pip install --upgrade pip
- /tmp/mini-eq-wheel-test/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4'
+ /tmp/mini-eq-wheel-test/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse "$PWG_REQUIREMENT"
/tmp/mini-eq-wheel-test/bin/python -m pip install dist/mini_eq-*.whl
/tmp/mini-eq-wheel-test/bin/mini-eq --help
@@ -369,7 +373,7 @@ jobs:
- name: Check pipewire-gobject GI compatibility
run: |
flatpak run --filesystem="$PWD":ro --command=python3 io.github.bhack.mini-eq \
- "$PWD/tools/check_pipewire_gobject.py" --expect-version 0.3.5
+ "$PWD/tools/check_pipewire_gobject.py" --expect-version "$PWG_VERSION"
- name: Smoke-test Flatpak runtime modules
run: |
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 97dc090..c7dbe99 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,15 @@
# Changelog
+## 0.7.3 - 2026-05-11
+
+- Require pipewire-gobject 0.3.6 and use its synchronous PipeWire registry,
+ metadata, node, and device APIs for startup and routing state.
+- Replace Mini EQ's PipeWire startup and routing sleep loops with bounded sync
+ or registry-event waits for route params, virtual sink creation, and stream
+ routing.
+- Avoid a headless startup hang when system-wide EQ fails synchronously before
+ the GLib main loop starts.
+
## 0.7.2 - 2026-05-10
- Finish GNOME Shell startup notification after Mini EQ's delayed first-window
diff --git a/data/io.github.bhack.mini-eq.metainfo.xml b/data/io.github.bhack.mini-eq.metainfo.xml
index 28369ba..63e4254 100644
--- a/data/io.github.bhack.mini-eq.metainfo.xml
+++ b/data/io.github.bhack.mini-eq.metainfo.xml
@@ -33,11 +33,11 @@
- https://raw.githubusercontent.com/bhack/mini-eq/v0.7.2/docs/screenshots/mini-eq.png
+ https://raw.githubusercontent.com/bhack/mini-eq/v0.7.3/docs/screenshots/mini-eq.png
Adjust sound output with equalizer controls
- https://raw.githubusercontent.com/bhack/mini-eq/v0.7.2/docs/screenshots/mini-eq-dark.png
+ https://raw.githubusercontent.com/bhack/mini-eq/v0.7.3/docs/screenshots/mini-eq-dark.png
Use the equalizer with dark style
@@ -45,6 +45,15 @@
https://github.com/bhack/mini-eq/issues
https://github.com/bhack/mini-eq
+
+
+
+ - Require pipewire-gobject 0.3.6 and use its synchronous PipeWire registry, metadata, node, and device APIs for startup and routing state.
+ - Replace Mini EQ's PipeWire startup and routing sleep loops with bounded sync or registry-event waits for route params, virtual sink creation, and stream routing.
+ - Avoid a headless startup hang when system-wide EQ fails synchronously before the GLib main loop starts.
+
+
+
diff --git a/docs/development.md b/docs/development.md
index 1424b7a..59d40e4 100644
--- a/docs/development.md
+++ b/docs/development.md
@@ -84,11 +84,11 @@ Install the Python package after the system packages are present:
```bash
python3 -m venv /tmp/mini-eq-pwg-build
/tmp/mini-eq-pwg-build/bin/python -m pip install --upgrade pip
-/tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.5,<0.4' -w /tmp/mini-eq-wheelhouse
+/tmp/mini-eq-pwg-build/bin/python -m pip wheel 'pipewire-gobject>=0.3.6,<0.4' -w /tmp/mini-eq-wheelhouse
python3 -m venv --system-site-packages ~/.local/share/mini-eq/venv
~/.local/share/mini-eq/venv/bin/python -m pip install --upgrade pip
-~/.local/share/mini-eq/venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.5,<0.4'
+~/.local/share/mini-eq/venv/bin/python -m pip install --no-index --find-links /tmp/mini-eq-wheelhouse 'pipewire-gobject>=0.3.6,<0.4'
~/.local/share/mini-eq/venv/bin/python -m pip install mini-eq
~/.local/share/mini-eq/venv/bin/mini-eq --check-deps
~/.local/share/mini-eq/venv/bin/mini-eq
diff --git a/io.github.bhack.mini-eq.yaml b/io.github.bhack.mini-eq.yaml
index 7076270..8bc527a 100644
--- a/io.github.bhack.mini-eq.yaml
+++ b/io.github.bhack.mini-eq.yaml
@@ -124,8 +124,8 @@ modules:
sources:
- type: git
url: https://github.com/bhack/pipewire-gobject.git
- tag: 0.3.5
- commit: b570bc4ff0a53223416fff9a4fc05d367273789d
+ tag: 0.3.6
+ commit: 7e20b29865c0a869695bc730bacb68e2f0853077
- python3-dependencies.yaml
diff --git a/pyproject.toml b/pyproject.toml
index 467dbb4..1c6916c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "mini-eq"
-version = "0.7.2"
+version = "0.7.3"
description = "Compact PipeWire system-wide parametric equalizer for Linux desktops."
readme = "README.md"
requires-python = ">=3.11"
@@ -40,7 +40,7 @@ classifiers = [
"Topic :: Multimedia :: Sound/Audio :: Analysis",
"Topic :: Multimedia :: Sound/Audio :: Mixers",
]
-dependencies = ["numpy>=1.26", "pipewire-gobject>=0.3.5,<0.4"]
+dependencies = ["numpy>=1.26", "pipewire-gobject>=0.3.6,<0.4"]
[project.urls]
Homepage = "https://github.com/bhack/mini-eq"
diff --git a/src/mini_eq/app.py b/src/mini_eq/app.py
index b233eee..e6578cb 100644
--- a/src/mini_eq/app.py
+++ b/src/mini_eq/app.py
@@ -41,6 +41,9 @@ def __init__(self, args: Namespace) -> None:
self.dbus_control: MiniEqDbusControl | None = None
self.signal_source_ids: list[int] = []
self.window_present_source_id = 0
+ self.window_starting = False
+ self.window_start_hold = False
+ self.pending_present_when_ready = False
self.background_mode = load_background_mode() or bool(getattr(args, "background", False))
self.start_at_login = load_start_at_login()
self.start_active_at_login = load_start_active_at_login() and self.start_at_login
@@ -94,13 +97,15 @@ def ensure_window(self, *, present: bool) -> None:
self.window.schedule_startup_ready()
return
+ if self.window_starting:
+ self.pending_present_when_ready = self.pending_present_when_ready or present
+ return
+
controller: SystemWideEqController | None = None
initial_curve_label: str | None = None
try:
controller = SystemWideEqController(self.args.output_sink)
- controller.start()
-
if self.args.import_apo:
controller.import_apo_preset(self.args.import_apo)
initial_curve_label = imported_apo_curve_label(self.args.import_apo)
@@ -110,14 +115,52 @@ def ensure_window(self, *, present: bool) -> None:
raise SystemExit(str(exc)) from exc
self.controller = controller
- self.window = MiniEqWindow(self, self.controller, self.args.auto_route, initial_curve_label=initial_curve_label)
- self.window.set_icon_name(APP_ICON_NAME)
- self.window.present_when_ready = present
- self.window.set_visible(False)
- self.window.schedule_startup_ready()
- if not present:
- self.update_background_status()
- self.emit_control_state_changed()
+ self.window_starting = True
+ self.window_start_hold = True
+ self.pending_present_when_ready = present
+ self.hold()
+
+ def release_start_hold() -> None:
+ if not self.window_start_hold:
+ return
+
+ self.window_start_hold = False
+ self.release()
+
+ def on_ready() -> None:
+ if self.controller is not controller:
+ return
+
+ try:
+ self.window_starting = False
+ present_when_ready = self.pending_present_when_ready
+ self.pending_present_when_ready = False
+ self.window = MiniEqWindow(
+ self, self.controller, self.args.auto_route, initial_curve_label=initial_curve_label
+ )
+ self.window.set_icon_name(APP_ICON_NAME)
+ self.window.present_when_ready = present_when_ready
+ self.window.set_visible(False)
+ self.window.schedule_startup_ready()
+ if not present_when_ready:
+ self.update_background_status()
+ self.emit_control_state_changed()
+ finally:
+ release_start_hold()
+
+ def on_error(exc: Exception) -> None:
+ self.window_starting = False
+ self.pending_present_when_ready = False
+ try:
+ controller.shutdown()
+ finally:
+ if self.controller is controller:
+ self.controller = None
+ print(str(exc), file=sys.stderr)
+ release_start_hold()
+ self.quit()
+
+ controller.start(on_ready=on_ready, on_error=on_error)
def present_main_window(self) -> None:
self.ensure_window(present=True)
@@ -220,34 +263,50 @@ def run_headless(args: Namespace) -> int:
duration_ms = 0
controller: SystemWideEqController | None = None
+ exit_code = 0
+ loop = GLib.MainLoop()
+ signal_source_ids = install_unix_signal_handlers(loop.quit)
try:
controller = SystemWideEqController(args.output_sink)
- controller.start()
if args.import_apo:
controller.import_apo_preset(args.import_apo)
- if args.auto_route:
- controller.route_system_audio(True)
-
- loop = GLib.MainLoop()
- signal_source_ids = install_unix_signal_handlers(loop.quit)
-
if duration_ms > 0:
GLib.timeout_add(duration_ms, lambda: (loop.quit(), False)[1])
+ def on_ready() -> None:
+ nonlocal exit_code
+
+ try:
+ if args.auto_route:
+ controller.route_system_audio(True)
+ except Exception as exc:
+ exit_code = 1
+ print(str(exc), file=sys.stderr)
+ loop.quit()
+
+ def on_error(exc: Exception) -> None:
+ nonlocal exit_code
+
+ exit_code = 1
+ print(str(exc), file=sys.stderr)
+ loop.quit()
+
+ controller.start(on_ready=on_ready, on_error=on_error)
+
try:
- loop.run()
+ if exit_code == 0:
+ loop.run()
except KeyboardInterrupt:
pass
- finally:
- signal_source_ids.clear()
finally:
+ signal_source_ids.clear()
if controller is not None:
controller.shutdown()
- return 0
+ return exit_code
def install_unix_signal_handlers(callback) -> list[int]:
diff --git a/src/mini_eq/deps.py b/src/mini_eq/deps.py
index 2b2b3c3..432f005 100644
--- a/src/mini_eq/deps.py
+++ b/src/mini_eq/deps.py
@@ -12,15 +12,21 @@
Status = Literal["ok", "missing", "warning"]
-PWG_REQUIRED_VERSION = "0.3.5"
-PWG_REQUIRED_VERSION_PARTS = (0, 3, 5)
+PWG_REQUIRED_VERSION = "0.3.6"
+PWG_REQUIRED_VERSION_PARTS = (0, 3, 6)
PWG_REQUIRED_SYMBOLS = (
"Core.set_pipewire_property",
+ "Core.sync",
"Device.enum_all_params",
"Device.enum_params",
+ "Device.enum_params_sync",
"Device.new",
"Device.subscribe_params",
+ "Device.sync",
+ "Metadata.sync",
+ "Node.sync",
"Param.new_props_controls",
+ "Registry.sync",
"RouteInfo.new_from_param",
"Stream.set_pipewire_property",
)
diff --git a/src/mini_eq/pipewire_backend.py b/src/mini_eq/pipewire_backend.py
index 365b43b..4555b8a 100644
--- a/src/mini_eq/pipewire_backend.py
+++ b/src/mini_eq/pipewire_backend.py
@@ -1,11 +1,11 @@
from __future__ import annotations
import json
-import time
+from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
-from .pipewire_routes import PipeWireRouteMixin
+from .pipewire_routes import PipeWireOutputRoute, PipeWireRouteMixin
DEFAULT_METADATA_NAME = "default"
DEFAULT_AUDIO_SINK_KEY = "default.audio.sink"
@@ -71,6 +71,19 @@ class PipeWireBackendError(RuntimeError):
pass
+class PipeWireNodeWatch:
+ def __init__(self, cancel_callback: Callable[[], None]) -> None:
+ self._cancel_callback = cancel_callback
+
+ def cancel(self) -> None:
+ cancel_callback = self._cancel_callback
+ if cancel_callback is None:
+ return
+
+ self._cancel_callback = None
+ cancel_callback()
+
+
def build_props_controls_param(Pwg, GLib, controls: dict[str, float]):
variant = GLib.Variant("a{sd}", {name: float(value) for name, value in controls.items()})
param = Pwg.Param.new_props_controls(variant)
@@ -147,6 +160,7 @@ def __init__(self, timeout_ms: int = 2000) -> None:
self._loaded_modules: list[Any] = []
self._cached_defaults = PipeWireDefaults(None, None)
self._device_route_refreshing_bound_ids: set[int] = set()
+ self._device_active_output_routes: dict[int, dict[int, PipeWireOutputRoute]] = {}
def __enter__(self) -> PipeWireBackend:
self.connect()
@@ -177,7 +191,7 @@ def connect(self) -> None:
if not self._metadata.start():
raise PipeWireBackendError("failed to start PipeWire default metadata discovery")
- self._wait_for_initial_state()
+ self._sync_initial_state()
self._connected = True
def close(self) -> None:
@@ -240,6 +254,7 @@ def close(self) -> None:
self._registry = None
self._metadata = None
self._cached_defaults = PipeWireDefaults(None, None)
+ self._device_active_output_routes.clear()
def list_nodes(self) -> list[PipeWireNode]:
self._ensure_connected()
@@ -262,6 +277,126 @@ def list_output_streams(self) -> list[PipeWireNode]:
def node_from_proxy(self, node) -> PipeWireNode:
return self._node_from_global(node)
+ def watch_for_node(
+ self,
+ predicate: Callable[[PipeWireNode], bool],
+ callback: Callable[[PipeWireNode | None], None],
+ timeout_ms: int | None = None,
+ ) -> PipeWireNodeWatch:
+ self._ensure_connected()
+
+ if self._GLib is None or self._GObject is None or self._registry is None:
+ raise PipeWireBackendError("PipeWire registry is not connected")
+
+ deadline_ms = max(int(self.timeout_ms if timeout_ms is None else timeout_ms), 1)
+ state = {
+ "done": False,
+ "scheduled": False,
+ "handler_id": 0,
+ "timeout_id": 0,
+ "idle_id": 0,
+ }
+
+ def maybe_match(global_) -> PipeWireNode | None:
+ try:
+ node = self._node_from_global(global_)
+ except UnicodeDecodeError:
+ return None
+ except Exception:
+ return None
+
+ return node if predicate(node) else None
+
+ def cleanup(*, remove_idle: bool) -> None:
+ handler_id = int(state["handler_id"])
+ state["handler_id"] = 0
+ if handler_id > 0:
+ try:
+ self._registry.disconnect(handler_id)
+ except Exception:
+ pass
+
+ timeout_id = int(state["timeout_id"])
+ state["timeout_id"] = 0
+ if timeout_id > 0:
+ try:
+ self._GLib.source_remove(timeout_id)
+ except Exception:
+ pass
+
+ idle_id = int(state["idle_id"])
+ if remove_idle and idle_id > 0:
+ state["idle_id"] = 0
+ try:
+ self._GLib.source_remove(idle_id)
+ except Exception:
+ pass
+
+ def complete(node: PipeWireNode | None) -> bool:
+ if state["done"] or state["scheduled"]:
+ return False
+
+ state["done"] = True
+ cleanup(remove_idle=False)
+ callback(node)
+ return False
+
+ def complete_from_idle(node: PipeWireNode) -> bool:
+ state["idle_id"] = 0
+ if state["done"]:
+ return False
+
+ state["done"] = True
+ callback(node)
+ return False
+
+ def schedule_complete(node: PipeWireNode) -> None:
+ if state["done"] or state["scheduled"]:
+ return
+
+ state["scheduled"] = True
+ cleanup(remove_idle=False)
+ state["idle_id"] = self._GLib.idle_add(lambda: complete_from_idle(node))
+
+ def on_global_added(_registry, global_) -> None:
+ node = maybe_match(global_)
+ if node is not None:
+ complete(node)
+
+ def on_timeout() -> bool:
+ state["timeout_id"] = 0
+ return complete(None)
+
+ def cancel() -> None:
+ if state["done"]:
+ return
+
+ state["done"] = True
+ cleanup(remove_idle=True)
+
+ state["handler_id"] = self._GObject.Object.connect(self._registry, "global-added", on_global_added)
+ watch = PipeWireNodeWatch(cancel)
+ for global_ in self._iterate_model(self._registry.dup_globals_by_interface(PIPEWIRE_NODE_INTERFACE)):
+ node = maybe_match(global_)
+ if node is not None:
+ schedule_complete(node)
+ return watch
+
+ state["timeout_id"] = self._GLib.timeout_add(deadline_ms, on_timeout)
+ return watch
+
+ def watch_for_audio_sink(
+ self,
+ sink_name: str,
+ callback: Callable[[PipeWireNode | None], None],
+ timeout_ms: int | None = None,
+ ) -> PipeWireNodeWatch:
+ return self.watch_for_node(
+ lambda node: node.is_audio_sink and node.node_name == sink_name,
+ callback,
+ timeout_ms=timeout_ms,
+ )
+
def connect_object_added(self, callback) -> int:
self._ensure_connected()
handler_id = self._GObject.Object.connect(self._registry, "global-added", callback)
@@ -329,7 +464,8 @@ def on_device_param(_device, param) -> None:
if param_id != route_param_id or int(device_bound_id) in self._device_route_refreshing_bound_ids:
return
- callback()
+ if self._remember_device_route_param(device_bound_id, param):
+ callback()
handler_id = self._GObject.Object.connect(device, "param", on_device_param)
self._device_signal_objects[handler_id] = device
@@ -363,7 +499,8 @@ def disconnect_device_handler(self, handler_id: int) -> None:
def sync(self) -> None:
self._ensure_connected()
- self._sync_core()
+ self._sync_registry()
+ self._sync_metadata()
def defaults(self) -> PipeWireDefaults:
if self._has_cached_defaults():
@@ -377,7 +514,7 @@ def refresh_defaults(self) -> PipeWireDefaults:
return self._cached_defaults
except UnicodeDecodeError:
try:
- self._sync_core()
+ self._sync_metadata()
self._cached_defaults = self._read_defaults()
return self._cached_defaults
except UnicodeDecodeError as retry_exc:
@@ -453,7 +590,7 @@ def set_stream_target(self, stream_bound_id: int, target_bound_id: int, target_s
for key, value in ((TARGET_NODE_KEY, str(target_bound_id)), (TARGET_OBJECT_KEY, target_serial)):
if not metadata.set(stream_bound_id, key, SPA_ID_TYPE, value):
raise PipeWireBackendError(f"failed to set stream target metadata: {stream_bound_id}")
- self._sync_core()
+ self._sync_metadata()
def restore_stream_target(self, stream_bound_id: int, target: PipeWireStreamTarget) -> None:
metadata = self._default_metadata()
@@ -463,7 +600,7 @@ def restore_stream_target(self, stream_bound_id: int, target: PipeWireStreamTarg
):
if not metadata.set(stream_bound_id, key, type_name, value):
raise PipeWireBackendError(f"failed to restore stream target metadata: {stream_bound_id}")
- self._sync_core()
+ self._sync_metadata()
def output_stream_by_bound_id(self, bound_id: int) -> PipeWireNode | None:
for stream in self.list_output_streams():
@@ -575,6 +712,7 @@ def _node_proxy_by_bound_id(self, bound_id: int):
return None
if not node.start():
raise PipeWireBackendError(f"failed to bind node: {bound_id}")
+ self._sync_proxy(node, "node")
self._node_proxies[int(bound_id)] = node
return node
@@ -593,6 +731,7 @@ def _device_proxy_by_bound_id(self, bound_id: int):
return None
if not device.start():
raise PipeWireBackendError(f"failed to bind device: {bound_id}")
+ self._sync_proxy(device, "device")
self._device_proxies[int(bound_id)] = device
return device
@@ -682,36 +821,58 @@ def _ensure_connected(self) -> None:
if not self._connected:
self.connect()
- def _wait_for_initial_state(self) -> None:
- deadline = time.monotonic() + (self.timeout_ms / 1000.0)
- context = self._GLib.MainContext.default()
-
- while time.monotonic() < deadline:
- while context.pending():
- context.iteration(False)
-
- registry_ready = self._registry.get_globals().get_n_items() > 0
- metadata_ready = self._metadata.get_bound()
- if registry_ready and metadata_ready:
- return
-
- time.sleep(0.01)
+ def _sync_initial_state(self) -> None:
+ self._sync_registry()
+ self._sync_metadata()
missing = []
if self._registry.get_globals().get_n_items() <= 0:
missing.append("registry")
if not self._metadata.get_bound():
missing.append("metadata")
- raise PipeWireBackendError(f"PipeWire initialization timed out waiting for: {', '.join(missing)}")
+ if not missing:
+ return
- def _sync_core(self) -> None:
- if self._GLib is None:
+ raise PipeWireBackendError(f"PipeWire initialization did not report: {', '.join(missing)}")
+
+ def _sync_registry(self) -> None:
+ if self._registry is None:
return
- deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.05)
- context = self._GLib.MainContext.default()
- while time.monotonic() < deadline and context.pending():
- context.iteration(False)
+ try:
+ synced = self._registry.sync(max(int(self.timeout_ms), 1))
+ except Exception as exc:
+ raise PipeWireBackendError(f"PipeWire registry sync failed: {exc}") from exc
+ if synced is False:
+ raise PipeWireBackendError("PipeWire registry sync failed")
+
+ def _sync_metadata(self) -> None:
+ if self._metadata is None:
+ return
+
+ try:
+ synced = self._metadata.sync(max(int(self.timeout_ms), 1))
+ except Exception as exc:
+ raise PipeWireBackendError(f"PipeWire metadata sync failed: {exc}") from exc
+ if synced is False:
+ raise PipeWireBackendError("PipeWire metadata sync failed")
+
+ def _sync_proxy(self, proxy, label: str) -> None:
+ try:
+ synced = proxy.sync(max(int(self.timeout_ms), 1))
+ except Exception as exc:
+ raise PipeWireBackendError(f"PipeWire {label} sync failed: {exc}") from exc
+ if synced is False:
+ raise PipeWireBackendError(f"PipeWire {label} sync failed")
+
+ def _sync_core(self) -> None:
+ if self._core is not None:
+ try:
+ synced = self._core.sync(max(int(self.timeout_ms), 1))
+ except Exception as exc:
+ raise PipeWireBackendError(f"PipeWire core sync failed: {exc}") from exc
+ if synced is False:
+ raise PipeWireBackendError("PipeWire core sync failed")
@staticmethod
def _import_pipewire_gobject():
diff --git a/src/mini_eq/pipewire_routes.py b/src/mini_eq/pipewire_routes.py
index 0bec84d..fd363d8 100644
--- a/src/mini_eq/pipewire_routes.py
+++ b/src/mini_eq/pipewire_routes.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import time
from dataclasses import dataclass, field
from urllib.parse import quote
@@ -76,7 +75,14 @@ def output_preset_target_for_sink_name(self, sink_name: str | None) -> PipeWireO
return PipeWireOutputPresetTarget(node_name, route, tuple(dict.fromkeys(keys)))
def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None:
- if sink is None or sink.device_id <= 0 or not self._has_device_route_api():
+ if sink is None or sink.device_id <= 0:
+ return None
+
+ route = self._cached_output_route_for_sink(sink)
+ if route is not None:
+ return route
+
+ if not self._has_device_route_api():
return None
device = self._device_proxy_by_bound_id(sink.device_id)
@@ -90,6 +96,18 @@ def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None:
if str(route.direction or "").casefold() == OUTPUT_ROUTE_DIRECTION
and (route.availability or "unknown").casefold() != "no"
]
+ return self._select_output_route_for_sink(sink, output_routes)
+
+ def _cached_output_route_for_sink(self, sink) -> PipeWireOutputRoute | None:
+ try:
+ output_routes = tuple(self._device_active_output_routes.get(int(sink.device_id), {}).values())
+ except Exception:
+ output_routes = ()
+ return self._select_output_route_for_sink(sink, output_routes)
+
+ def _select_output_route_for_sink(
+ self, sink, output_routes: list[PipeWireOutputRoute] | tuple[PipeWireOutputRoute, ...]
+ ) -> PipeWireOutputRoute | None:
if not output_routes:
return None
@@ -107,13 +125,53 @@ def output_route_for_sink(self, sink) -> PipeWireOutputRoute | None:
return None
+ def _remember_device_route_param(self, device_bound_id: int, param) -> bool:
+ if not self._has_device_route_api():
+ return False
+
+ try:
+ route_info = self._Pwg.RouteInfo.new_from_param(param)
+ except Exception:
+ return False
+
+ if route_info is None:
+ return False
+
+ route = self._output_route_from_info(
+ route_info,
+ device_bound_id,
+ self._device_name_by_bound_id(device_bound_id),
+ )
+ if str(route.direction or "").casefold() != OUTPUT_ROUTE_DIRECTION:
+ return False
+
+ device_bound_id = int(device_bound_id)
+ cached_routes = self._device_active_output_routes.get(device_bound_id, {})
+ previous_routes = dict(cached_routes)
+ if (route.availability or "unknown").casefold() == "no":
+ if route.route_device not in cached_routes:
+ return False
+
+ cached_routes = dict(cached_routes)
+ cached_routes.pop(route.route_device, None)
+ if cached_routes:
+ self._device_active_output_routes[device_bound_id] = cached_routes
+ else:
+ self._device_active_output_routes.pop(device_bound_id, None)
+ return True
+
+ current_routes = {route.route_device: route}
+ self._device_active_output_routes[device_bound_id] = current_routes
+ return previous_routes != current_routes
+
def _has_device_route_api(self) -> bool:
return (
self._Pwg is not None
and hasattr(self._Pwg, "Device")
and hasattr(self._Pwg, "RouteInfo")
- and hasattr(self._Pwg.Device, "enum_params")
+ and hasattr(self._Pwg.Device, "enum_params_sync")
and hasattr(self._Pwg.Device, "new")
+ and hasattr(self._Pwg.Device, "sync")
and hasattr(self._Pwg.RouteInfo, "new_from_param")
)
@@ -166,14 +224,13 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir
bound_id = int(device_bound_id)
self._device_route_refreshing_bound_ids.add(bound_id)
try:
- seq = int(device.enum_params(route_param_id, 0, 0))
- if seq < 0:
+ params = device.enum_params_sync(route_param_id, 0, 0, max(int(self.timeout_ms), 1))
+ if params is None:
return []
- self._wait_for_param_sequence(lambda: device.get_params(), seq)
device_name = self._device_name_by_bound_id(device_bound_id)
routes: list[PipeWireOutputRoute] = []
- for param in self._iterate_model(device.get_params()):
+ for param in self._iterate_model(params):
try:
param_name = param.dup_name()
except Exception:
@@ -200,7 +257,7 @@ def _enumerate_device_routes(self, device, device_bound_id: int) -> list[PipeWir
def _device_route_param_id(self, device) -> int | None:
route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME)
if route_param_id is None:
- self._wait_for_device_param_info(device, DEVICE_ROUTE_PARAM_NAME)
+ self._sync_proxy(device, "device")
route_param_id = self._device_param_id_by_name(device, DEVICE_ROUTE_PARAM_NAME)
return route_param_id
@@ -217,21 +274,6 @@ def _device_param_id_by_name(self, device, name: str) -> int | None:
return None
- def _wait_for_device_param_info(self, device, name: str) -> None:
- if self._GLib is None:
- return
-
- deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.2)
- context = self._GLib.MainContext.default()
- while time.monotonic() < deadline:
- while context.pending():
- context.iteration(False)
-
- if self._device_param_id_by_name(device, name) is not None:
- return
-
- time.sleep(0.005)
-
def _output_route_from_info(
self,
route_info,
@@ -252,31 +294,6 @@ def _output_route_from_info(
info=self._variant_to_string_dict(route_info.get_info()),
)
- def _wait_for_param_sequence(self, model_factory, seq: int) -> None:
- if self._GLib is None:
- return
-
- deadline = time.monotonic() + min(self.timeout_ms / 1000.0, 0.2)
- context = self._GLib.MainContext.default()
- while time.monotonic() < deadline:
- while context.pending():
- context.iteration(False)
-
- if self._model_has_param_sequence(model_factory(), seq):
- return
-
- time.sleep(0.005)
-
- def _model_has_param_sequence(self, model, seq: int) -> bool:
- for param in self._iterate_model(model):
- try:
- if int(param.get_seq()) == seq:
- return True
- except Exception:
- continue
-
- return False
-
@staticmethod
def _variant_to_string_dict(variant) -> dict[str, str]:
if variant is None:
diff --git a/src/mini_eq/routing.py b/src/mini_eq/routing.py
index b87724b..291cbe0 100644
--- a/src/mini_eq/routing.py
+++ b/src/mini_eq/routing.py
@@ -2,7 +2,6 @@
import json
import sys
-import time
from collections.abc import Callable
from dataclasses import replace
@@ -65,6 +64,8 @@ def __init__(self, output_sink: str | None) -> None:
self._output_preset_target: PipeWireOutputPresetTarget | None = None
self.filter_output_name = f"{self.virtual_sink_name}{FILTER_OUTPUT_SUFFIX}"
self.engine_module = None
+ self.engine_start_watch = None
+ self.engine_start_pending = False
self.filter_node_id: int | None = None
self.output_event_source_id = 0
self.output_object_added_handler_id = 0
@@ -99,8 +100,9 @@ def emit_status(self, message: str) -> None:
if getattr(self, "shutting_down", False):
return
- if self.status_callback is not None:
- self.status_callback(message)
+ status_callback = getattr(self, "status_callback", None)
+ if status_callback is not None:
+ status_callback(message)
print(message, file=sys.stderr)
@@ -239,7 +241,17 @@ def set_analyzer_enabled(self, enabled: bool) -> bool:
analyzer.set_enabled(False)
self.restore_engine_after_analyzer_failure()
return False
- self.start_engine()
+
+ def on_ready() -> None:
+ if self.routed and self.stream_router is not None:
+ self.stream_router.route_output_streams()
+
+ def on_error(exc: Exception) -> None:
+ analyzer.set_enabled(False)
+ self.emit_status(f"filter-chain restart after analyzer enable failed: {exc}")
+ self.restore_engine_after_analyzer_failure()
+
+ self.start_engine(on_ready=on_ready, on_error=on_error)
except Exception:
analyzer.set_enabled(False)
try:
@@ -248,9 +260,6 @@ def set_analyzer_enabled(self, enabled: bool) -> bool:
self.emit_status(f"filter-chain restore after analyzer failure failed: {restore_exc}")
raise
- if self.routed and self.stream_router is not None:
- self.stream_router.route_output_streams()
-
return started
return analyzer.set_enabled(enabled)
@@ -499,40 +508,6 @@ def route_system_audio(self, enabled: bool, announce: bool = True, *, refresh_ou
def build_default_bands(self) -> list[EqBand]:
return default_eq_bands()
- def wait_for_virtual_sink(self, timeout_seconds: float = 3.0) -> None:
- deadline = time.monotonic() + timeout_seconds
-
- while time.monotonic() < deadline:
- try:
- self.output_backend.sync()
- except Exception:
- pass
-
- if self.get_sink(self.virtual_sink_name) is not None:
- return
-
- time.sleep(0.05)
-
- raise RuntimeError(f"virtual sink did not appear: {self.virtual_sink_name}")
-
- def wait_for_filter_node(self, timeout_seconds: float = 3.0) -> None:
- deadline = time.monotonic() + timeout_seconds
-
- while time.monotonic() < deadline:
- node_id = self.find_filter_node_id()
-
- if node_id is not None:
- self.filter_node_id = node_id
- return
-
- time.sleep(0.05)
-
- raise RuntimeError(f"filter-chain did not create {self.virtual_sink_name}")
-
- def find_filter_node_id(self) -> int | None:
- sink = self.get_sink(self.virtual_sink_name)
- return sink.bound_id if sink is not None else None
-
def active_sample_rate(self) -> float:
for sink_name in (self.virtual_sink_name, self.output_sink):
rate = node_sample_rate(self.get_sink(sink_name))
@@ -551,25 +526,70 @@ def build_filter_chain_module_args(self) -> str:
output_sink=self.output_sink,
)
- def start_engine(self) -> None:
- if self.engine_module is not None:
+ def cancel_pending_engine_start(self) -> None:
+ watch = getattr(self, "engine_start_watch", None)
+ self.engine_start_watch = None
+ self.engine_start_pending = False
+ if watch is not None:
+ watch.cancel()
+
+ def start_engine(
+ self,
+ *,
+ on_ready: Callable[[], None] | None = None,
+ on_error: Callable[[Exception], None] | None = None,
+ ) -> None:
+ if self.running:
+ if on_ready is not None:
+ on_ready()
+ return
+
+ if getattr(self, "engine_start_pending", False):
return
self.engine_module = self.output_backend.load_filter_chain_module(self.build_filter_chain_module_args())
+ self.engine_start_pending = True
- try:
- self.wait_for_virtual_sink()
- self.wait_for_filter_node()
- self.running = True
- self.emit_status(f"filter-chain PipeWire EQ ready: {self.virtual_sink_name} -> {self.output_sink}")
- except Exception:
+ def fail(exc: Exception) -> None:
+ self.engine_start_watch = None
+ self.engine_start_pending = False
self.engine_module = None
self.filter_node_id = None
try:
self.output_backend.sync()
except Exception:
pass
- raise
+ if on_error is not None:
+ on_error(exc)
+ else:
+ self.emit_status(str(exc))
+
+ def on_sink_ready(sink: PipeWireNode | None) -> None:
+ self.engine_start_watch = None
+ self.engine_start_pending = False
+
+ if getattr(self, "shutting_down", False) or self.engine_module is None:
+ return
+
+ if sink is None:
+ fail(RuntimeError(f"filter-chain did not create {self.virtual_sink_name}"))
+ return
+
+ self.filter_node_id = sink.bound_id
+ self.running = True
+ self.emit_status(f"filter-chain PipeWire EQ ready: {self.virtual_sink_name} -> {self.output_sink}")
+ self.apply_state_to_engine()
+ if on_ready is not None:
+ on_ready()
+
+ try:
+ self.engine_start_watch = self.output_backend.watch_for_audio_sink(
+ self.virtual_sink_name,
+ on_sink_ready,
+ timeout_ms=3000,
+ )
+ except Exception as exc:
+ fail(exc)
def retarget_filter_output(self) -> bool:
if not self.running or self.filter_node_id is None:
@@ -585,15 +605,18 @@ def retarget_filter_output(self) -> bool:
return False
def restore_engine_after_analyzer_failure(self) -> None:
- if self.running or self.engine_module is not None:
+ if self.running or getattr(self, "engine_module", None) is not None:
return
- self.start_engine()
- if self.routed and self.stream_router is not None:
- self.stream_router.route_output_streams()
+ def on_ready() -> None:
+ if self.routed and self.stream_router is not None:
+ self.stream_router.route_output_streams()
+
+ self.start_engine(on_ready=on_ready, on_error=lambda exc: self.emit_status(str(exc)))
def stop_engine(self, announce: bool = True) -> None:
- module = self.engine_module
+ self.cancel_pending_engine_start()
+ module = getattr(self, "engine_module", None)
if module is None:
self.filter_node_id = None
self.running = False
@@ -630,10 +653,18 @@ def restart_engine(self) -> None:
stream_router.emit_warning(exc)
self.stop_engine(announce=False)
- self.start_engine()
- if stream_router is not None:
- stream_router.start_monitoring(require_initial_route=True)
+ def on_ready() -> None:
+ if stream_router is not None:
+ stream_router.start_monitoring(require_initial_route=True)
+
+ def on_error(exc: Exception) -> None:
+ if stream_router is not None:
+ stream_router.emit_warning(exc)
+ else:
+ self.emit_status(str(exc))
+
+ self.start_engine(on_ready=on_ready, on_error=on_error)
def set_filter_controls(self, controls: dict[str, float]) -> None:
if self.filter_node_id is None or not self.running:
@@ -666,19 +697,57 @@ def apply_state_to_engine(self) -> None:
controls = builtin_biquad_control_values(self.bands, self.preamp_db, self.eq_enabled, self.active_sample_rate())
self.set_filter_controls(controls)
- def start(self) -> None:
+ def start(
+ self,
+ *,
+ on_ready: Callable[[], None] | None = None,
+ on_error: Callable[[Exception], None] | None = None,
+ ) -> None:
try:
self.refresh_followed_output_sink()
self.prepare_output_analyzer()
- self.start_engine()
- self.start_output_event_monitoring()
- except Exception:
+ except Exception as exc:
if self.stream_router is not None:
self.stream_router.stop_monitoring()
self.stop_engine()
self.stop_output_event_monitoring()
+ if on_error is not None:
+ on_error(exc)
+ return
raise
+ def on_engine_ready() -> None:
+ try:
+ self.start_output_event_monitoring()
+ except Exception as exc:
+ if self.stream_router is not None:
+ self.stream_router.stop_monitoring()
+ self.stop_engine()
+ self.stop_output_event_monitoring()
+ if on_error is not None:
+ on_error(exc)
+ return
+ raise
+ if on_ready is not None:
+ on_ready()
+
+ def on_engine_error(exc: Exception) -> None:
+ if self.stream_router is not None:
+ self.stream_router.stop_monitoring()
+ self.stop_engine()
+ self.stop_output_event_monitoring()
+ if on_error is not None:
+ on_error(exc)
+ else:
+ self.emit_status(str(exc))
+
+ try:
+ self.start_engine(on_ready=on_engine_ready, on_error=on_engine_error)
+ except Exception as exc:
+ on_engine_error(exc)
+ if on_error is None:
+ raise
+
def shutdown(self) -> None:
self.shutting_down = True
self.status_callback = None
diff --git a/tests/test_mini_eq_app.py b/tests/test_mini_eq_app.py
index 7166426..17692aa 100644
--- a/tests/test_mini_eq_app.py
+++ b/tests/test_mini_eq_app.py
@@ -141,6 +141,38 @@ def test_finish_startup_notification_ignores_missing_gdk_startup_id(monkeypatch)
app.MiniEqApplication.finish_startup_notification(SimpleNamespace())
+def test_run_headless_skips_loop_after_synchronous_start_error(monkeypatch, capsys) -> None:
+ calls: list[str] = []
+
+ class FakeLoop:
+ def run(self) -> None:
+ calls.append("run")
+ raise AssertionError("run should not be called after a synchronous startup error")
+
+ def quit(self) -> None:
+ calls.append("quit")
+
+ class FakeController:
+ def __init__(self, output_sink: str | None) -> None:
+ calls.append(f"controller:{output_sink}")
+
+ def start(self, *, on_ready=None, on_error=None) -> None:
+ calls.append("start")
+ on_error(RuntimeError("startup failed"))
+
+ def shutdown(self) -> None:
+ calls.append("shutdown")
+
+ monkeypatch.setattr(app.GLib, "MainLoop", FakeLoop)
+ monkeypatch.setattr(app, "install_unix_signal_handlers", lambda _callback: [])
+ monkeypatch.setattr(app, "SystemWideEqController", FakeController)
+ args = SimpleNamespace(duration=None, output_sink="speakers", import_apo=None, auto_route=False)
+
+ assert app.run_headless(args) == 1
+ assert calls == ["controller:speakers", "start", "quit", "shutdown"]
+ assert "startup failed" in capsys.readouterr().err
+
+
def test_close_action_closes_active_window() -> None:
window = FakeWindow(ui_shutting_down=False)
application = FakeApplication(window=window)
diff --git a/tests/test_mini_eq_deps.py b/tests/test_mini_eq_deps.py
index 9ce30c5..7b0074a 100644
--- a/tests/test_mini_eq_deps.py
+++ b/tests/test_mini_eq_deps.py
@@ -93,15 +93,20 @@ def fake_check(namespace: str, version: str, label: str, required: bool, hint: s
def test_pipewire_gobject_check_requires_current_library_version(monkeypatch) -> None:
fake_pwg = SimpleNamespace(
- get_library_version=lambda: "0.3.4",
- Core=SimpleNamespace(set_pipewire_property=object()),
+ get_library_version=lambda: "0.3.5",
+ Core=SimpleNamespace(set_pipewire_property=object(), sync=object()),
Device=SimpleNamespace(
enum_all_params=object(),
enum_params=object(),
+ enum_params_sync=object(),
new=object(),
subscribe_params=object(),
+ sync=object(),
),
+ Metadata=SimpleNamespace(sync=object()),
+ Node=SimpleNamespace(sync=object()),
Param=SimpleNamespace(new_props_controls=object()),
+ Registry=SimpleNamespace(sync=object()),
RouteInfo=SimpleNamespace(new_from_param=object()),
Stream=SimpleNamespace(set_pipewire_property=object()),
)
@@ -121,20 +126,25 @@ def test_pipewire_gobject_check_requires_current_library_version(monkeypatch) ->
check = deps.check_pipewire_gobject()
assert not check.ok
- assert "older than required 0.3.5" in check.detail
+ assert "older than required 0.3.6" in check.detail
def test_pipewire_gobject_check_requires_property_override_symbols(monkeypatch) -> None:
fake_pwg = SimpleNamespace(
- get_library_version=lambda: "0.3.5",
+ get_library_version=lambda: "0.3.6",
Core=SimpleNamespace(),
Device=SimpleNamespace(
enum_all_params=object(),
enum_params=object(),
+ enum_params_sync=object(),
new=object(),
subscribe_params=object(),
+ sync=object(),
),
+ Metadata=SimpleNamespace(sync=object()),
+ Node=SimpleNamespace(sync=object()),
Param=SimpleNamespace(new_props_controls=object()),
+ Registry=SimpleNamespace(sync=object()),
RouteInfo=SimpleNamespace(new_from_param=object()),
Stream=SimpleNamespace(set_pipewire_property=object()),
)
diff --git a/tests/test_mini_eq_pipewire_backend.py b/tests/test_mini_eq_pipewire_backend.py
index f3082b5..b1d8124 100644
--- a/tests/test_mini_eq_pipewire_backend.py
+++ b/tests/test_mini_eq_pipewire_backend.py
@@ -94,6 +94,100 @@ def get_global_properties(self) -> FakeGlobalProperties:
return self.properties
+def make_node_global(
+ bound_id: int,
+ name: str | None,
+ media_class: str = pw_backend.AUDIO_SINK,
+) -> FakePropertyProxy:
+ properties = [
+ FakePropertyItem("object.serial", str(bound_id + 1000)),
+ FakePropertyItem("media.class", media_class),
+ ]
+ if name is not None:
+ properties.append(FakePropertyItem("node.name", name))
+
+ class FakeGlobal(FakePropertyProxy):
+ def get_id(self) -> int:
+ return bound_id
+
+ return FakeGlobal(FakeGlobalProperties(properties))
+
+
+class FakeModel:
+ def __init__(self, items: list[object]) -> None:
+ self.items = items
+
+ def get_n_items(self) -> int:
+ return len(self.items)
+
+ def get_item(self, index: int) -> object:
+ return self.items[index]
+
+
+class FakeWaitRegistry:
+ def __init__(self, globals_: list[object]) -> None:
+ self.globals = globals_
+ self.callbacks = {}
+ self.disconnected: list[int] = []
+ self.next_handler_id = 1
+
+ def dup_globals_by_interface(self, interface_type: str) -> FakeModel:
+ assert interface_type == pw_backend.PIPEWIRE_NODE_INTERFACE
+ return FakeModel(self.globals)
+
+ def connect(self, signal_name: str, callback) -> int:
+ assert signal_name == "global-added"
+ handler_id = self.next_handler_id
+ self.next_handler_id += 1
+ self.callbacks[handler_id] = callback
+ return handler_id
+
+ def disconnect(self, handler_id: int) -> None:
+ self.disconnected.append(handler_id)
+ self.callbacks.pop(handler_id, None)
+
+ def emit_global_added(self, global_) -> None:
+ for callback in list(self.callbacks.values()):
+ callback(self, global_)
+
+
+class FakeWaitObject:
+ @staticmethod
+ def connect(obj, signal_name: str, callback) -> int:
+ return obj.connect(signal_name, callback)
+
+
+class FakeWaitGObject:
+ Object = FakeWaitObject
+
+
+class FakeWaitGLib:
+ def __init__(self) -> None:
+ self.idle_callback = None
+ self.removed_sources: list[int] = []
+ self.timeout_callback = None
+ self.timeout_ms: int | None = None
+
+ def idle_add(self, callback) -> int:
+ self.idle_callback = callback
+ return 88
+
+ def timeout_add(self, timeout_ms: int, callback) -> int:
+ self.timeout_ms = timeout_ms
+ self.timeout_callback = callback
+ return 77
+
+ def source_remove(self, source_id: int) -> bool:
+ self.removed_sources.append(source_id)
+ return True
+
+ def run_idle(self) -> None:
+ assert self.idle_callback is not None
+ callback = self.idle_callback
+ self.idle_callback = None
+ callback()
+
+
class FakeSource:
def __init__(self) -> None:
self.destroyed = False
@@ -104,19 +198,12 @@ def destroy(self) -> None:
class FakeSyncCore:
def __init__(self) -> None:
- self.callback = None
-
- def sync(self, _cancellable, callback, _user_data) -> bool:
- self.callback = callback
- return True
+ self.sync_calls: list[int] = []
- def sync_finish(self, _result) -> bool:
+ def sync(self, timeout_ms: int) -> bool:
+ self.sync_calls.append(timeout_ms)
return True
- def complete_sync(self) -> None:
- assert self.callback is not None
- self.callback(self, object(), None)
-
class FakeMainContext:
def __init__(self, source: FakeSource) -> None:
@@ -138,28 +225,12 @@ def find_source_by_id(self, source_id: int) -> FakeSource | None:
return self.source if source_id == 77 else None
-class FakeSyncLoop:
- def __init__(self, core: FakeSyncCore) -> None:
- self.core = core
- self.quit_count = 0
-
- def run(self) -> None:
- self.core.complete_sync()
-
- def quit(self) -> None:
- self.quit_count += 1
-
-
class FakeSyncGLib:
def __init__(self, core: FakeSyncCore) -> None:
- self.core = core
self.source = FakeSource()
self.MainContext = FakeMainContext(self.source)
self.timeout_callback = None
- def MainLoop(self) -> FakeSyncLoop:
- return FakeSyncLoop(self.core)
-
def timeout_add(self, _timeout_ms: int, callback) -> int:
self.timeout_callback = callback
return 77
@@ -190,7 +261,7 @@ class FakePwg:
class FakeDeviceApi:
@staticmethod
- def enum_params():
+ def enum_params_sync():
return None
@staticmethod
@@ -201,6 +272,10 @@ def new():
def subscribe_params():
return None
+ @staticmethod
+ def sync():
+ return None
+
class FakeRouteInfoApi:
@staticmethod
@@ -378,16 +453,73 @@ def test_new_core_uses_pipewire_gobject_core_constructor() -> None:
}
-def test_sync_core_drains_pending_main_context_events() -> None:
+def test_sync_core_uses_roundtrip() -> None:
core = FakeSyncCore()
- glib = FakeSyncGLib(core)
backend = pw_backend.PipeWireBackend()
backend._core = core
- backend._GLib = glib
backend._sync_core()
- assert glib.MainContext.iterations == 1
+ assert core.sync_calls == [2000]
+
+
+def test_watch_for_audio_sink_reports_existing_registry_node_on_idle() -> None:
+ registry = FakeWaitRegistry([make_node_global(42, "mini_eq_sink")])
+ glib = FakeWaitGLib()
+ backend = pw_backend.PipeWireBackend()
+ backend._ensure_connected = lambda: None
+ backend._registry = registry
+ backend._GLib = glib
+ backend._GObject = FakeWaitGObject
+ nodes: list[pw_backend.PipeWireNode | None] = []
+
+ backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234)
+
+ assert nodes == []
+ glib.run_idle()
+ assert nodes[0] is not None
+ assert nodes[0].bound_id == 42
+ assert glib.timeout_ms is None
+ assert registry.disconnected == [1]
+
+
+def test_watch_for_audio_sink_resolves_from_global_added_signal() -> None:
+ registry = FakeWaitRegistry([make_node_global(1, "speakers")])
+ glib = FakeWaitGLib()
+ backend = pw_backend.PipeWireBackend()
+ backend._ensure_connected = lambda: None
+ backend._registry = registry
+ backend._GLib = glib
+ backend._GObject = FakeWaitGObject
+ nodes: list[pw_backend.PipeWireNode | None] = []
+
+ backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234)
+ registry.emit_global_added(make_node_global(42, "mini_eq_sink"))
+
+ assert nodes[0] is not None
+ assert nodes[0].bound_id == 42
+ assert glib.timeout_ms == 1234
+ assert glib.removed_sources == [77]
+ assert registry.disconnected == [1]
+
+
+def test_watch_for_audio_sink_reports_none_on_timeout() -> None:
+ registry = FakeWaitRegistry([make_node_global(1, "speakers")])
+ glib = FakeWaitGLib()
+ backend = pw_backend.PipeWireBackend()
+ backend._ensure_connected = lambda: None
+ backend._registry = registry
+ backend._GLib = glib
+ backend._GObject = FakeWaitGObject
+ nodes: list[pw_backend.PipeWireNode | None] = []
+
+ backend.watch_for_audio_sink("mini_eq_sink", nodes.append, timeout_ms=1234)
+ glib.timeout_callback()
+
+ assert nodes == [None]
+ assert glib.timeout_ms == 1234
+ assert glib.removed_sources == []
+ assert registry.disconnected == [1]
def test_move_stream_to_target_sets_stream_target_without_metadata_readback() -> None:
@@ -579,11 +711,16 @@ def test_output_preset_keys_fall_back_to_sink_name_without_route_api(monkeypatch
def test_enumerate_device_routes_ignores_enum_route_params(monkeypatch: pytest.MonkeyPatch) -> None:
class FakeParam:
- def __init__(self, name: str) -> None:
+ def __init__(self, name: str, *, seq: int = 12, next_index: int = 0) -> None:
self.name = name
+ self.seq = seq
+ self.next_index = next_index
def get_seq(self) -> int:
- return 12
+ return self.seq
+
+ def get_next(self) -> int:
+ return self.next_index
def dup_name(self) -> str:
return self.name
@@ -611,9 +748,9 @@ def __init__(self) -> None:
self.param_infos = FakeModel([FakeParamInfo()])
self.enum_calls: list[tuple[int, int, int]] = []
- def enum_params(self, param_id: int, start: int, num: int) -> int:
+ def enum_params_sync(self, param_id: int, start: int, num: int, _timeout_ms: int) -> FakeModel:
self.enum_calls.append((param_id, start, num))
- return 12
+ return self.params
def get_params(self) -> FakeModel:
return self.params
@@ -668,10 +805,111 @@ def new_from_param(param: FakeParam) -> FakeRouteInfo:
assert backend._device_route_refreshing_bound_ids == set()
+def test_enumerate_device_routes_uses_request_scoped_params(monkeypatch: pytest.MonkeyPatch) -> None:
+ class FakeParam:
+ def __init__(self, name: str, *, seq: int, route_name: str) -> None:
+ self.name = name
+ self.seq = seq
+ self.route_name = route_name
+
+ def get_seq(self) -> int:
+ return self.seq
+
+ def get_next(self) -> int:
+ return 0
+
+ def dup_name(self) -> str:
+ return self.name
+
+ class FakeParamInfo:
+ def get_id(self) -> int:
+ return 13
+
+ def dup_name(self) -> str:
+ return "Route"
+
+ class FakeModel:
+ def __init__(self, items: list[object]) -> None:
+ self.items = items
+
+ def get_n_items(self) -> int:
+ return len(self.items)
+
+ def get_item(self, index: int) -> object:
+ return self.items[index]
+
+ class FakeDevice:
+ def __init__(self) -> None:
+ self.params = FakeModel(
+ [
+ FakeParam("Route", seq=12, route_name="analog-output-headphones"),
+ ]
+ )
+ self.param_infos = FakeModel([FakeParamInfo()])
+
+ def enum_params_sync(self, _param_id: int, _start: int, _num: int, _timeout_ms: int) -> FakeModel:
+ return self.params
+
+ def get_params(self) -> FakeModel:
+ return self.params
+
+ def get_param_infos(self) -> FakeModel:
+ return self.param_infos
+
+ class FakeRouteInfo:
+ def __init__(self, param: FakeParam) -> None:
+ self.param = param
+
+ def get_index(self) -> int:
+ return 1
+
+ def get_device(self) -> int:
+ return 6
+
+ def get_profile(self) -> int:
+ return 0
+
+ def get_priority(self) -> int:
+ return 200
+
+ def dup_direction(self) -> str:
+ return "output"
+
+ def dup_name(self) -> str:
+ return self.param.route_name
+
+ def dup_description(self) -> str:
+ return self.param.route_name
+
+ def dup_availability(self) -> str:
+ return "yes"
+
+ def get_info(self) -> dict[str, str]:
+ return {}
+
+ backend = pw_backend.PipeWireBackend()
+ backend._Pwg = SimpleNamespace(RouteInfo=SimpleNamespace(new_from_param=FakeRouteInfo))
+ monkeypatch.setattr(backend, "_device_name_by_bound_id", lambda _bound_id: "alsa_card.test")
+
+ routes = backend._enumerate_device_routes(FakeDevice(), 72)
+
+ assert [route.name for route in routes] == ["analog-output-headphones"]
+
+
def test_connect_device_route_changed_subscribes_to_route_param(monkeypatch: pytest.MonkeyPatch) -> None:
class FakeParam:
- def __init__(self, param_id: int) -> None:
+ def __init__(
+ self,
+ param_id: int,
+ *,
+ route_device: int = 6,
+ direction: str = "output",
+ route_name: str = "analog-output-headphones",
+ ) -> None:
self.param_id = param_id
+ self.route_device = route_device
+ self.direction = direction
+ self.route_name = route_name
def get_id(self) -> int:
return self.param_id
@@ -709,9 +947,40 @@ def subscribe_params(self, ids: FakeVariant) -> None:
def disconnect(self, handler_id: int) -> None:
self.disconnected.append(handler_id)
- def emit_param(self, param_id: int) -> None:
+ def emit_param(self, param_id: int, **kwargs) -> None:
assert self.param_callback is not None
- self.param_callback(self, FakeParam(param_id))
+ self.param_callback(self, FakeParam(param_id, **kwargs))
+
+ class FakeRouteInfo:
+ def __init__(self, param: FakeParam) -> None:
+ self.param = param
+
+ def get_index(self) -> int:
+ return 1
+
+ def get_device(self) -> int:
+ return self.param.route_device
+
+ def get_profile(self) -> int:
+ return 0
+
+ def get_priority(self) -> int:
+ return 200
+
+ def dup_direction(self) -> str:
+ return self.param.direction
+
+ def dup_name(self) -> str:
+ return self.param.route_name
+
+ def dup_description(self) -> str:
+ return self.param.route_name
+
+ def dup_availability(self) -> str:
+ return "yes"
+
+ def get_info(self) -> dict[str, str]:
+ return {}
class FakeGObjectObject:
@staticmethod
@@ -722,11 +991,12 @@ def connect(device: FakeDevice, signal_name: str, callback) -> int:
backend = pw_backend.PipeWireBackend()
device = FakeDevice()
- backend._Pwg = SimpleNamespace(Device=FakeDeviceApi, RouteInfo=FakeRouteInfoApi)
+ backend._Pwg = SimpleNamespace(Device=FakeDeviceApi, RouteInfo=SimpleNamespace(new_from_param=FakeRouteInfo))
backend._GLib = FakeGLib
backend._GObject = SimpleNamespace(Object=FakeGObjectObject)
backend._ensure_connected = lambda: None
monkeypatch.setattr(backend, "_device_proxy_by_bound_id", lambda _bound_id: device)
+ monkeypatch.setattr(backend, "_device_name_by_bound_id", lambda _bound_id: "alsa_card.test")
calls: list[str] = []
handler_id = backend.connect_device_route_changed(72, lambda: calls.append("route"))
@@ -737,18 +1007,64 @@ def connect(device: FakeDevice, signal_name: str, callback) -> int:
device.emit_param(12)
assert calls == []
- device.emit_param(13)
+ device.emit_param(13, direction="input")
+ assert calls == []
+
+ device.emit_param(13, route_name="analog-output-headphones")
assert calls == ["route"]
+ assert backend._device_active_output_routes[72][6].name == "analog-output-headphones"
+
+ device.emit_param(13, route_device=7, route_name="analog-output-speaker")
+ assert calls == ["route", "route"]
+ assert tuple(backend._device_active_output_routes[72]) == (7,)
+ assert backend._device_active_output_routes[72][7].name == "analog-output-speaker"
backend._device_route_refreshing_bound_ids.add(72)
- device.emit_param(13)
- assert calls == ["route"]
+ device.emit_param(13, route_device=6, route_name="analog-output-headphones")
+ assert calls == ["route", "route"]
+ assert tuple(backend._device_active_output_routes[72]) == (7,)
+ assert backend._device_active_output_routes[72][7].name == "analog-output-speaker"
backend.disconnect_device_handler(handler_id)
assert [(variant.signature, variant.value) for variant in device.subscriptions] == [("au", [13]), ("au", [])]
assert device.disconnected == [77]
+def test_output_preset_keys_use_subscribed_active_route_cache(monkeypatch: pytest.MonkeyPatch) -> None:
+ backend = pw_backend.PipeWireBackend()
+ route = pw_routes.PipeWireOutputRoute(
+ device_bound_id=72,
+ device_name="alsa_card.test",
+ index=1,
+ route_device=7,
+ profile=0,
+ priority=200,
+ direction="output",
+ name="analog-output-speaker",
+ description="Speakers",
+ availability="yes",
+ )
+ sink = pw_backend.PipeWireNode(
+ bound_id=39,
+ object_serial="67",
+ media_class=pw_backend.AUDIO_SINK,
+ node_name="alsa_output.test",
+ node_description="Test Sink",
+ application_name=None,
+ node_dont_move=False,
+ device_id=72,
+ card_profile_device=6,
+ )
+ backend._device_active_output_routes = {72: {7: route}}
+ backend._Pwg = SimpleNamespace()
+ monkeypatch.setattr(backend, "audio_sink_by_name", lambda _name: sink)
+
+ assert backend.output_preset_keys_for_sink_name("alsa_output.test") == (
+ "pipewire-route:v1:device=alsa_card.test;route=analog-output-speaker;route-device=7",
+ "alsa_output.test",
+ )
+
+
def test_move_named_output_stream_to_target_uses_matching_stream() -> None:
backend = pw_backend.PipeWireBackend()
stream = pw_backend.PipeWireNode(
@@ -792,7 +1108,7 @@ def set(self, subject: int, key: str, type_name: str, value: str) -> bool:
metadata = FakeMetadata()
syncs: list[str] = []
backend._default_metadata = lambda: metadata
- backend._sync_core = lambda: syncs.append("sync")
+ backend._sync_metadata = lambda: syncs.append("sync")
backend.set_stream_target(126, 39, "67")
@@ -842,7 +1158,7 @@ def set(self, subject: int, key: str, type_name: str | None, value: str | None)
metadata = FakeMetadata()
syncs: list[str] = []
backend._default_metadata = lambda: metadata
- backend._sync_core = lambda: syncs.append("sync")
+ backend._sync_metadata = lambda: syncs.append("sync")
backend.restore_stream_target(
126,
@@ -948,7 +1264,7 @@ def raise_decode_error():
raise UnicodeDecodeError("utf-8", b"\xb1", 0, 1, "invalid start byte")
monkeypatch.setattr(backend, "_read_defaults", raise_decode_error)
- monkeypatch.setattr(backend, "_sync_core", lambda: syncs.append(True))
+ monkeypatch.setattr(backend, "_sync_metadata", lambda: syncs.append(True))
assert backend.refresh_defaults().default_audio_sink == "cached.default"
assert syncs == [True]
diff --git a/tests/test_mini_eq_routing.py b/tests/test_mini_eq_routing.py
index a6aa69a..633094a 100644
--- a/tests/test_mini_eq_routing.py
+++ b/tests/test_mini_eq_routing.py
@@ -506,9 +506,11 @@ def stop_engine(*, announce: bool = True) -> None:
calls.append(("stop", announce))
controller.running = False
- def start_engine() -> None:
+ def start_engine(*, on_ready=None, on_error=None) -> None:
calls.append("start")
controller.running = True
+ if on_ready is not None:
+ on_ready()
controller.output_backend = FakeBackend([make_node(1, "speakers"), make_node(2, "hdmi")])
controller.output_sink = "speakers"
@@ -561,9 +563,11 @@ def stop_engine(*, announce: bool = True) -> None:
calls.append(f"stop:{announce}")
controller.running = False
- def start_engine() -> None:
+ def start_engine(*, on_ready=None, on_error=None) -> None:
calls.append("start")
controller.running = True
+ if on_ready is not None:
+ on_ready()
controller.stop_engine = stop_engine
controller.start_engine = start_engine
@@ -689,13 +693,15 @@ def stop_engine(*, announce: bool = True) -> None:
start_attempts = 0
- def start_engine() -> None:
+ def start_engine(*, on_ready=None, on_error=None) -> None:
nonlocal start_attempts
start_attempts += 1
calls.append("start")
if start_attempts == 1:
raise RuntimeError("virtual sink did not appear")
controller.running = True
+ if on_ready is not None:
+ on_ready()
controller.stop_engine = stop_engine
controller.start_engine = start_engine
@@ -735,9 +741,11 @@ def stop_engine(*, announce: bool = True) -> None:
calls.append(f"stop:{announce}")
controller.running = False
- def start_engine() -> None:
+ def start_engine(*, on_ready=None, on_error=None) -> None:
calls.append("start")
controller.running = True
+ if on_ready is not None:
+ on_ready()
controller.stop_engine = stop_engine
controller.start_engine = start_engine
@@ -849,7 +857,7 @@ def test_start_prepares_analyzer_before_filter_chain_engine() -> None:
controller.refresh_followed_output_sink = lambda: calls.append("refresh")
controller.prepare_output_analyzer = lambda: calls.append("prepare") or True
- controller.start_engine = lambda: calls.append("engine")
+ controller.start_engine = lambda *, on_ready=None, on_error=None: (calls.append("engine"), on_ready and on_ready())
controller.start_output_event_monitoring = lambda: calls.append("monitor")
controller.stream_router = None
controller.stop_engine = lambda: calls.append("stop-engine")
@@ -1093,6 +1101,104 @@ def disable(self, announce: bool = True) -> None:
assert controller.routed is False
+def test_start_engine_waits_for_filter_chain_node_from_registry() -> None:
+ controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController)
+ module = object()
+ calls: list[str] = []
+
+ class FakeWatch:
+ def cancel(self) -> None:
+ calls.append("cancel")
+
+ class FakeBackend:
+ def __init__(self) -> None:
+ self.callback = None
+
+ def load_filter_chain_module(self, arguments: str):
+ calls.append(f"load:{arguments}")
+ return module
+
+ def watch_for_audio_sink(self, sink_name: str, callback, *, timeout_ms: int):
+ calls.append(f"wait:{sink_name}:{timeout_ms}")
+ self.callback = callback
+ return FakeWatch()
+
+ backend = FakeBackend()
+ controller.engine_module = None
+ controller.engine_start_pending = False
+ controller.engine_start_watch = None
+ controller.filter_node_id = None
+ controller.running = False
+ controller.output_backend = backend
+ controller.virtual_sink_name = "mini_eq_sink"
+ controller.output_sink = "speakers"
+ controller.build_filter_chain_module_args = lambda: "module args"
+ controller.emit_status = lambda message: calls.append(f"status:{message}")
+ controller.apply_state_to_engine = lambda: calls.append("apply")
+
+ routing.SystemWideEqController.start_engine(controller)
+ assert backend.callback is not None
+ backend.callback(make_node(42, "mini_eq_sink"))
+
+ assert calls == [
+ "load:module args",
+ "wait:mini_eq_sink:3000",
+ "status:filter-chain PipeWire EQ ready: mini_eq_sink -> speakers",
+ "apply",
+ ]
+ assert controller.engine_module is module
+ assert controller.filter_node_id == 42
+ assert controller.running is True
+
+
+def test_start_engine_clears_module_when_filter_chain_node_times_out() -> None:
+ controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController)
+ module = object()
+ calls: list[str] = []
+
+ class FakeWatch:
+ def cancel(self) -> None:
+ calls.append("cancel")
+
+ class FakeBackend:
+ def __init__(self) -> None:
+ self.callback = None
+
+ def load_filter_chain_module(self, arguments: str):
+ calls.append(f"load:{arguments}")
+ return module
+
+ def watch_for_audio_sink(self, sink_name: str, callback, *, timeout_ms: int):
+ calls.append(f"wait:{sink_name}:{timeout_ms}")
+ self.callback = callback
+ return FakeWatch()
+
+ def sync(self) -> None:
+ calls.append("sync")
+
+ backend = FakeBackend()
+ controller.engine_module = None
+ controller.engine_start_pending = False
+ controller.engine_start_watch = None
+ controller.filter_node_id = None
+ controller.running = False
+ controller.output_backend = backend
+ controller.virtual_sink_name = "mini_eq_sink"
+ controller.output_sink = "speakers"
+ controller.build_filter_chain_module_args = lambda: "module args"
+ errors: list[str] = []
+
+ routing.SystemWideEqController.start_engine(controller, on_error=lambda exc: errors.append(str(exc)))
+ assert backend.callback is not None
+ backend.callback(None)
+
+ assert calls == ["load:module args", "wait:mini_eq_sink:3000", "sync"]
+ assert errors == ["filter-chain did not create mini_eq_sink"]
+ assert controller.engine_module is None
+ assert controller.filter_node_id is None
+ assert controller.running is False
+
+
def test_stop_engine_unloads_filter_chain_module_before_clearing_state() -> None:
controller = routing.SystemWideEqController.__new__(routing.SystemWideEqController)
module = object()
@@ -1141,7 +1247,10 @@ def start_monitoring(self, *, require_initial_route: bool = False) -> None:
controller.stream_router = FakeRouter()
controller.stop_engine = lambda *, announce=True: calls.append(f"stop-engine:{announce}")
- controller.start_engine = lambda: calls.append("start-engine")
+ controller.start_engine = lambda *, on_ready=None, on_error=None: (
+ calls.append("start-engine"),
+ on_ready and on_ready(),
+ )
routing.SystemWideEqController.restart_engine(controller)
@@ -1176,7 +1285,10 @@ def start_monitoring(self, *, require_initial_route: bool = False) -> None:
controller.stream_router = FakeRouter()
controller.stop_engine = lambda *, announce=True: calls.append(f"stop-engine:{announce}")
- controller.start_engine = lambda: calls.append("start-engine")
+ controller.start_engine = lambda *, on_ready=None, on_error=None: (
+ calls.append("start-engine"),
+ on_ready and on_ready(),
+ )
routing.SystemWideEqController.restart_engine(controller)
diff --git a/tools/check_live_ui_runtime.py b/tools/check_live_ui_runtime.py
index 63dfc90..66eef35 100755
--- a/tools/check_live_ui_runtime.py
+++ b/tools/check_live_ui_runtime.py
@@ -1024,15 +1024,15 @@ def run_ui_flow(
timeout_seconds,
)
compare_switch = driver.wait_for_accessible(
- "Compare switch",
- lambda: driver.find(frame, name="Compare", role="switch", showing=True),
+ "A/B compare switch",
+ lambda: driver.find(frame, name="A/B", role="switch", showing=True),
timeout_seconds,
)
if not driver.sensitive(route_switch):
raise AssertionError("System-wide EQ switch is not sensitive")
if not driver.sensitive(compare_switch):
- raise AssertionError("Compare switch should become sensitive when routing is active")
+ raise AssertionError("A/B compare switch should become sensitive when routing is active")
verify_dropdown_exposes_options(
driver,
diff --git a/tools/check_pipewire_gobject.py b/tools/check_pipewire_gobject.py
index 0b95b22..54e0a9a 100644
--- a/tools/check_pipewire_gobject.py
+++ b/tools/check_pipewire_gobject.py
@@ -9,18 +9,24 @@
"Core.new",
"Core.load_module",
"Core.set_pipewire_property",
+ "Core.sync",
"Device.enum_all_params",
"Device.enum_params",
+ "Device.enum_params_sync",
"Device.new",
+ "Device.sync",
"Global.dup_property",
"get_library_version",
"Metadata.new",
"Metadata.set",
+ "Metadata.sync",
"Node.new",
"Node.set_param",
+ "Node.sync",
"Param.new_props_controls",
"Registry.new",
"Registry.dup_globals_by_interface",
+ "Registry.sync",
"RouteInfo.new_from_param",
"Stream.new_audio_capture",
"Stream.set_deliver_audio_blocks",