Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions fluster/decoders/gstreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,30 @@


import os
import shlex
import subprocess
from functools import lru_cache
from typing import Any, Dict, List, Optional

from fluster.codec import Codec, OutputFormat
from fluster.decoder import Decoder, register_decoder
from fluster.gstreamer import run_pipeline
from fluster.utils import (
file_checksum,
normalize_binary_cmd,
run_command,
run_command_with_output,
)

PIPELINE_TPL = "{} --no-fault filesrc location={} ! {} ! {} ! {} ! {} {}"
PIPELINE_TPL = "filesrc location={} ! {} ! {} ! {} ! {} {}"


@lru_cache(maxsize=None)
def _videocodectestsink_supports_format(gst_fmt: str) -> bool:
"""Check whether videocodectestsink supports a given raw format by probing a minimal pipeline."""
cmd = normalize_binary_cmd("gst-launch-1.0")
pipeline = (
"videotestsrc num-buffers=1 ! video/x-raw,width=1,height=1 "
f"! videoconvert ! video/x-raw,format={gst_fmt} ! videocodectestsink"
)
try:
run_command(
shlex.split(
f"{cmd} --no-fault videotestsrc num-buffers=1 ! video/x-raw,width=1,height=1"
f" ! videoconvert ! video/x-raw,format={gst_fmt} ! videocodectestsink"
),
verbose=False,
)
run_pipeline(pipeline, verbose=False)
return True
except Exception:
return False
Expand Down Expand Up @@ -114,7 +109,6 @@ def __init__(self) -> None:
if not self.name:
self.name = f"{self.provider}-{self.codec.value}-{self.api}"
self.description = f"{self.provider} {self.codec.value} {self.api} decoder for GStreamer"
self.cmd = normalize_binary_cmd(self.cmd)

if not gst_element_exists(self.sink):
self.sink = "filesink"
Expand All @@ -129,7 +123,6 @@ def gen_pipeline(
"""Generate the GStreamer pipeline used to decode the test vector"""
output = f"location={output_filepath}" if output_filepath else ""
return PIPELINE_TPL.format(
self.cmd,
input_filepath,
self.parser,
self.decoder_bin,
Expand Down Expand Up @@ -177,24 +170,23 @@ def decode(
if self._get_sink_for_format(output_format) == "videocodectestsink":
output_param = output_filepath if keep_files else None
pipeline = self.gen_pipeline(input_filepath, output_param, output_format, optional_params)
command = shlex.split(pipeline)
command.append("-m")
data = run_command_with_output(command, timeout=timeout, verbose=verbose).splitlines()
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose, print_messages=True)
data = result.stdout.splitlines()
return self.parse_videocodectestsink_md5sum(data)

pipeline = self.gen_pipeline(input_filepath, output_filepath, output_format, optional_params)
run_command(shlex.split(pipeline), timeout=timeout, verbose=verbose)
result = run_pipeline(pipeline, timeout=timeout, verbose=verbose)
return file_checksum(output_filepath)

@lru_cache(maxsize=128)
def check(self, verbose: bool) -> bool:
"""Check if GStreamer decoder is valid (better than gst-inspect)"""
try:
pipeline = f"{self.cmd} --no-fault appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
run_command(shlex.split(pipeline), verbose=verbose)
pipeline = f"appsrc num-buffers=0 ! {self.decoder_bin} ! fakesink"
run_pipeline(pipeline, verbose=verbose)
return True
except Exception:
return False
return True


class GStreamerVideo(GStreamer):
Expand Down Expand Up @@ -237,7 +229,6 @@ def gen_pipeline(
caps = f"{self.caps} ! videoconvert dither=none ! {raw_caps}"
output = f"location={output_filepath}" if output_filepath else ""
return PIPELINE_TPL.format(
self.cmd,
input_filepath,
self.parser,
self.decoder_bin,
Expand Down
105 changes: 105 additions & 0 deletions fluster/gstreamer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Fluster - testing framework for decoders conformance
# Copyright (C) 2025, Fluendo, S.A.
# Author: Andoni Morales Alastruey <amorales@fluendo.com>, Fluendo, S.A.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <https://www.gnu.org/licenses/>.

"""
GStreamer utilities for Fluster.

This package provides ctypes bindings for GStreamer and a pipeline runner
that can be used to run GStreamer pipelines without depending on the
GStreamer Python bindings (gi.repository.Gst).
"""

from __future__ import annotations

import os
import subprocess
import sys
from typing import Optional

from fluster.decoder import NotSupportedError
from fluster.gstreamer.gst_ctypes import GStreamerInstallation
from fluster.gstreamer.runner import ExitCode as ExitCode


def run_pipeline(
pipeline: str,
timeout: Optional[int] = None,
verbose: bool = False,
quiet: bool = False,
print_messages: bool = False,
no_fault: bool = True,
) -> subprocess.CompletedProcess[str]:
"""
Run a GStreamer pipeline in a subprocess with proper environment setup.

This is a convenience function that handles environment configuration and
spawns the GStreamer runner as a subprocess. It's the recommended way to
run GStreamer pipelines from fluster.

Args:
pipeline: The GStreamer pipeline description string (gst-launch format).
timeout: Timeout in seconds for the pipeline to complete. None for no timeout.
verbose: Enable verbose output from the runner.
quiet: Suppress output except errors.
print_messages: Print all bus messages (like gst-launch -m).
no_fault: Disable fault handling in the runner.

Returns:
subprocess.CompletedProcess with returncode, stdout, and stderr.

Raises:
subprocess.TimeoutExpired: When a timeout occurs.
subprocess.CalledProcessError: For other non-zero exit codes.

Exit codes (see ExitCode enum):
SUCCESS (0) - Pipeline completed successfully (EOS)
ERROR (1) - Pipeline error occurred
INIT_ERROR (2) - Invalid arguments or initialization error
TIMEOUT (3) - Timeout occurred
"""
cmd = [sys.executable, "-m", "fluster.gstreamer.runner"]
if verbose:
cmd.append("--verbose")
if quiet:
cmd.append("--quiet")
if print_messages:
cmd.append("--messages")
if no_fault:
cmd.append("--no-fault")
if timeout is not None:
cmd.extend(["--timeout", str(timeout)])
cmd.append(pipeline)
env = os.environ.copy()
env.update(GStreamerInstallation().get_environment())
result = subprocess.run(cmd, env=env, capture_output=True, text=True, check=False)
if result.returncode == ExitCode.SUCCESS:
return result
elif result.returncode == ExitCode.NOT_SUPPORTED:
raise NotSupportedError(f"GStreamer runner not supported error: {result.stderr.strip()}")
elif result.returncode == ExitCode.TIMEOUT:
raise subprocess.TimeoutExpired(
result.args,
timeout if timeout is not None else 0,
output=result.stdout,
stderr=result.stderr,
)
raise subprocess.CalledProcessError(
result.returncode,
result.args,
output=result.stdout,
stderr=result.stderr,
)
Loading
Loading