Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions examples/runtime/rtc_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/usr/bin/env python3
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Real-Time Chunking (RTC) inference with PolicyRuntime.

Demonstrates how to run a Pi0.5 model with RTC denoising baked into
the graph. The model produces 50-action chunks; the RTCExecution
strategy handles async inference, dual-track queue management, and
latency-aware delay compensation.

Usage:
python examples/runtime/rtc_inference.py \
--model ./exports/pi05_rtc_openvino \
--device GPU.0 \
--port /dev/ttyACM0 \
--calibration /path/to/calibration.json \
--overhead-camera /dev/v4l/by-id/usb-... \
--arm-camera 353322271391 \
--fps 30 \
--duration-s 60
"""

from __future__ import annotations

import argparse

from physicalai.capture.transport import SharedCamera
from physicalai.inference import InferenceModel
from physicalai.inference.callbacks import RTCLatencyTracker
from physicalai.robot import SO101
from physicalai.runtime import (
PolicyRuntime,
RTCActionQueue,
RTCExecution,
)
from physicalai.inference.component_factory import instantiate_component, resolve_artifact
from physicalai.inference.manifest import Manifest
import openvino_tokenizers # noqa: F401 — registers OV tokenizer ops


def main() -> None:
parser = argparse.ArgumentParser(description="Run Pi0.5 RTC policy with PolicyRuntime")
parser.add_argument("--model", required=True, help="Exported RTC model directory")
parser.add_argument("--device", default="GPU.0", help="OpenVINO device")
parser.add_argument("--port", default="/dev/ttyACM0", help="Robot serial port")
parser.add_argument("--calibration", required=True, help="Robot calibration file")
parser.add_argument("--overhead-camera", required=True, help="Overhead camera device path (e.g. /dev/v4l/by-id/usb-...)")
parser.add_argument("--arm-camera", required=True, help="Arm camera device path (e.g. /dev/v4l/by-id/usb-...)")
parser.add_argument("--width", type=int, default=640)
parser.add_argument("--height", type=int, default=480)
parser.add_argument("--fps", type=float, default=30.0)
parser.add_argument("--duration-s", type=float, default=None, help="Run duration in seconds (default: run indefinitely)")
# RTC parameters
parser.add_argument("--chunk-size", type=int, default=50)
parser.add_argument("--execution-horizon", type=int, default=10)
parser.add_argument("--max-action-dim", type=int, default=32)
parser.add_argument("--max-guidance-weight", type=float, default=10.0)
parser.add_argument("--queue-threshold", type=int, default=30)
args = parser.parse_args()

# --- Latency tracker callback (lives on the model) ---
latency_tracker = RTCLatencyTracker(window_size=100)

# --- Load model WITHOUT postprocessors (RTCExecution owns denorm) ---
model = InferenceModel.load(
args.model,
device=args.device,
postprocessors=[], # skip manifest denorm — execution handles it
callbacks=[latency_tracker],
)

# --- Load denormalization postprocessors from manifest ---
# RTCExecution applies these in its background thread to produce
# the processed (denormalized) track for the robot.

manifest = Manifest.load(f"{args.model}/manifest.json")
rtc_postprocessors = [
instantiate_component(resolve_artifact(spec, args.model))
for spec in manifest.model.postprocessors
]

# --- RTC queue (shared between execution and runtime) ---
rtc_queue = RTCActionQueue()

# --- RTC execution strategy ---
execution = RTCExecution(
chunk_size=args.chunk_size,
execution_horizon=args.execution_horizon,
fps=args.fps,
max_action_dim=args.max_action_dim,
max_guidance_weight=args.max_guidance_weight,
queue_threshold=args.queue_threshold,
latency_tracker=latency_tracker,
postprocessors=rtc_postprocessors,
)

# --- Hardware ---
robot = SO101(port=args.port, calibration=args.calibration, role="follower")
cameras = {
"overhead": SharedCamera(
"uvc", device=args.overhead_camera,
width=args.width, height=args.height, fps=int(args.fps),
),
"arm": SharedCamera(
"uvc", device=args.arm_camera,
width=args.width, height=args.height, fps=int(args.fps),
),
}

# --- PolicyRuntime ---
runtime = PolicyRuntime(
robot=robot,
model=model,
execution=execution,
action_queue=rtc_queue,
cameras=cameras,
fps=args.fps,
)

try:
runtime.connect()
except Exception as e:
print(f"Failed to connect: {e}")
from physicalai.capture import discover_all
print("Available cameras:")
for driver, devices in discover_all().items():
for dev in devices:
print(f" Driver: {driver}, Device: {dev.device_id}, Info: {dev.name}")
return

for name, cam in cameras.items():
print(f"Camera '{name}': {cam.actual_width}x{cam.actual_height} @ {cam.actual_fps}fps")

print(
f"Starting RTC runtime — chunk={args.chunk_size}, "
f"horizon={args.execution_horizon}, fps={args.fps}"
)
try:
stats = runtime.run(duration_s=args.duration_s)
print(
f"\nDone — {stats.steps} steps, {stats.inference_count} inferences, "
f"{stats.total_holds} holds"
)
print(
f"Latency — max={latency_tracker.max_latency_s:.3f}s, "
f"p95={latency_tracker.percentile_s(95):.3f}s"
)
finally:
print("\nRobot is holding position (torque ON).")
try:
resp = input("Disable torque? The arm will drop under gravity. [y/N]: ").strip().lower()
if resp == "y":
robot.set_torque(enabled=False)
robot.torque_on_disconnect = False
print("Torque disabled — arm is free.")
else:
print("Torque remains enabled — arm holds position.")
except (KeyboardInterrupt, EOFError):
print("\nTorque remains enabled.")
runtime.disconnect()
print("Disconnected")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/physicalai/capture/transport/_shared_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def read(self, timeout: float = 2.0) -> Frame:
if sample is None:
break
newest_sample = sample
sample = None

if newest_sample is not None:
header, frame = self._decode_sample(newest_sample)
Expand Down Expand Up @@ -297,6 +298,7 @@ def read_latest(self) -> Frame:
if sample is None:
break
newest_sample = sample
sample = None

if newest_sample is not None:
header, frame = self._decode_sample(newest_sample)
Expand Down
2 changes: 2 additions & 0 deletions src/physicalai/inference/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

from physicalai.inference.callbacks.base import Callback
from physicalai.inference.callbacks.latency import LatencyMonitor
from physicalai.inference.callbacks.rtc_latency import RTCLatencyTracker
from physicalai.inference.callbacks.throughput import ThroughputMonitor

__all__ = [
"Callback",
"LatencyMonitor",
"RTCLatencyTracker",
"ThroughputMonitor",
]
110 changes: 110 additions & 0 deletions src/physicalai/inference/callbacks/rtc_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Latency tracker callback for RTC inference delay estimation.

Measures wall-clock inference latency over a sliding window and
exposes the worst-case value for computing ``inference_delay``.
"""

from __future__ import annotations

import math
import time
from collections import deque
from typing import Any, override

import numpy as np

from physicalai.inference.callbacks.base import Callback


class RTCLatencyTracker(Callback):
"""Track inference latency for RTC delay computation.

Measures the duration of each ``model(inputs)`` call via
``on_predict_start`` / ``on_predict_end`` hooks. Exposes
``max_latency_s`` and ``compute_delay(fps)`` for the
``RTCExecution`` to read.

Args:
window_size: Number of recent measurements to retain.

Examples:
>>> tracker = RTCLatencyTracker()
>>> model = InferenceModel.load("./exports/pi05_rtc", callbacks=[tracker])
>>> # After some predictions:
>>> delay = tracker.compute_delay(fps=30, chunk_size=50, execution_horizon=10)
"""

def __init__(self, window_size: int = 100) -> None:
self._window: deque[float] = deque(maxlen=window_size)
self._start_time: float = 0.0

@override
def on_predict_start(self, inputs: dict[str, Any]) -> None:
"""Record prediction start time."""
self._start_time = time.perf_counter()

@override
def on_predict_end(self, outputs: dict[str, Any]) -> None:
"""Record prediction duration."""
elapsed = time.perf_counter() - self._start_time
self._window.append(elapsed)

@override
def on_reset(self) -> None:
"""Clear recorded latencies."""
self._window.clear()
self._start_time = 0.0

@property
def max_latency_s(self) -> float:
"""Worst-case latency in seconds over the sliding window.

Returns 0.0 if no measurements recorded yet.
"""
return max(self._window) if self._window else 0.0

@property
def latest_latency_s(self) -> float:
"""Most recent inference latency in seconds."""
return self._window[-1] if self._window else 0.0

def percentile_s(self, q: float) -> float:
"""Compute a percentile of recorded latencies.

Args:
q: Percentile in [0, 100].

Returns:
The q-th percentile in seconds, or 0.0 if empty.
"""
if not self._window:
return 0.0
return float(np.percentile(np.array(self._window), q))

def compute_delay(self, fps: float) -> int:
"""Compute integer delay for the RTC model input.

``delay = ceil(max_latency * fps)``

Args:
fps: Robot control frequency in Hz.

Returns:
Integer delay (number of action steps consumed during inference).
"""
if self.max_latency_s <= 0:
return 0
return int(math.ceil(self.max_latency_s * fps))

@override
def __repr__(self) -> str:
"""Return string representation."""
return (
f"RTCLatencyTracker("
f"max={self.max_latency_s:.3f}s, "
f"latest={self.latest_latency_s:.3f}s, "
f"samples={len(self._window)})"
)
4 changes: 4 additions & 0 deletions src/physicalai/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
"""

from physicalai.runtime._action_queue import ActionQueue # noqa: PLC2701
from physicalai.runtime._rtc_action_queue import RTCActionQueue # noqa: PLC2701
from physicalai.runtime.execution import (
AsyncExecution,
Execution,
SyncExecution,
WorkerDiedError,
)
from physicalai.runtime.rtc_execution import RTCExecution
from physicalai.runtime.runtime import (
PolicyRuntime,
RunStats,
Expand All @@ -32,6 +34,8 @@
"Execution",
"LerpSmoother",
"PolicyRuntime",
"RTCActionQueue",
"RTCExecution",
"ReplaceSmoother",
"RunStats",
"RuntimeCallback",
Expand Down
Loading
Loading