From bb5faf2fcee6681cec30f4074737b7641446b490 Mon Sep 17 00:00:00 2001 From: Eugene Liu Date: Thu, 21 May 2026 16:11:50 +0100 Subject: [PATCH 1/2] feat(runtime): implement Real-Time Chunking (RTC) execution strategy and related components --- examples/runtime/rtc_inference.py | 155 +++++++++ .../inference/callbacks/__init__.py | 2 + .../inference/callbacks/rtc_latency.py | 110 ++++++ src/physicalai/runtime/__init__.py | 4 + src/physicalai/runtime/_rtc_action_queue.py | 156 +++++++++ src/physicalai/runtime/rtc_execution.py | 323 ++++++++++++++++++ src/physicalai/runtime/runtime.py | 3 +- 7 files changed, 752 insertions(+), 1 deletion(-) create mode 100644 examples/runtime/rtc_inference.py create mode 100644 src/physicalai/inference/callbacks/rtc_latency.py create mode 100644 src/physicalai/runtime/_rtc_action_queue.py create mode 100644 src/physicalai/runtime/rtc_execution.py diff --git a/examples/runtime/rtc_inference.py b/examples/runtime/rtc_inference.py new file mode 100644 index 0000000..ed6b8da --- /dev/null +++ b/examples/runtime/rtc_inference.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Real-Time Chunking (RTC) inference with PolicyRuntime. + +Demonstrates how to run a Pi0.5 model with RTC denoising baked into +the graph. The model produces 50-action chunks; the RTCExecution +strategy handles async inference, dual-track queue management, and +latency-aware delay compensation. + +Usage: + python examples/runtime/rtc_inference.py \ + --model ./exports/pi05_rtc_openvino \ + --device GPU.0 \ + --port /dev/ttyACM0 \ + --calibration /path/to/calibration.json \ + --overhead-camera /dev/v4l/by-id/usb-... \ + --arm-camera 353322271391 \ + --fps 30 \ + --duration-s 60 +""" + +from __future__ import annotations + +import argparse + +from physicalai.capture.transport import SharedCamera +from physicalai.inference import InferenceModel +from physicalai.inference.callbacks import RTCLatencyTracker +from physicalai.robot import SO101 +from physicalai.runtime import ( + PolicyRuntime, + RTCActionQueue, + RTCExecution, +) +from physicalai.inference.component_factory import instantiate_component, resolve_artifact +from physicalai.inference.manifest import Manifest +import openvino_tokenizers # noqa: F401 — registers OV tokenizer ops + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run Pi0.5 RTC policy with PolicyRuntime") + parser.add_argument("--model", required=True, help="Exported RTC model directory") + parser.add_argument("--device", default="GPU.0", help="OpenVINO device") + parser.add_argument("--port", default="/dev/ttyACM0", help="Robot serial port") + parser.add_argument("--calibration", required=True, help="Robot calibration file") + parser.add_argument("--overhead-camera", required=True, help="Overhead camera device path (e.g. /dev/v4l/by-id/usb-...)") + parser.add_argument("--arm-camera", required=True, help="Arm camera device path (e.g. /dev/v4l/by-id/usb-...)") + parser.add_argument("--width", type=int, default=640) + parser.add_argument("--height", type=int, default=480) + parser.add_argument("--fps", type=float, default=30.0) + parser.add_argument("--duration-s", type=float, default=60.0) + # RTC parameters + parser.add_argument("--chunk-size", type=int, default=50) + parser.add_argument("--execution-horizon", type=int, default=10) + parser.add_argument("--max-action-dim", type=int, default=32) + parser.add_argument("--max-guidance-weight", type=float, default=10.0) + parser.add_argument("--queue-threshold", type=int, default=30) + args = parser.parse_args() + + # --- Latency tracker callback (lives on the model) --- + latency_tracker = RTCLatencyTracker(window_size=100) + + # --- Load model WITHOUT postprocessors (RTCExecution owns denorm) --- + model = InferenceModel.load( + args.model, + device=args.device, + postprocessors=[], # skip manifest denorm — execution handles it + callbacks=[latency_tracker], + ) + + # --- Load denormalization postprocessors from manifest --- + # RTCExecution applies these in its background thread to produce + # the processed (denormalized) track for the robot. + + manifest = Manifest.load(f"{args.model}/manifest.json") + rtc_postprocessors = [ + instantiate_component(resolve_artifact(spec, args.model)) + for spec in manifest.model.postprocessors + ] + + # --- RTC queue (shared between execution and runtime) --- + rtc_queue = RTCActionQueue() + + # --- RTC execution strategy --- + execution = RTCExecution( + chunk_size=args.chunk_size, + execution_horizon=args.execution_horizon, + fps=args.fps, + max_action_dim=args.max_action_dim, + max_guidance_weight=args.max_guidance_weight, + queue_threshold=args.queue_threshold, + latency_tracker=latency_tracker, + postprocessors=rtc_postprocessors, + ) + + # --- Hardware --- + robot = SO101(port=args.port, calibration=args.calibration, role="follower") + cameras = { + "overhead": SharedCamera( + "uvc", device=args.overhead_camera, + width=args.width, height=args.height, fps=int(args.fps), + ), + "arm": SharedCamera( + "uvc", device=args.arm_camera, + width=args.width, height=args.height, fps=int(args.fps), + ), + } + + # --- PolicyRuntime --- + runtime = PolicyRuntime( + robot=robot, + model=model, + execution=execution, + action_queue=rtc_queue, + cameras=cameras, + fps=args.fps, + ) + + try: + runtime.connect() + except Exception as e: + print(f"Failed to connect: {e}") + from physicalai.capture import discover_all + print("Available cameras:") + for driver, devices in discover_all().items(): + for dev in devices: + print(f" Driver: {driver}, Device: {dev.device_id}, Info: {dev.name}") + return + + for name, cam in cameras.items(): + print(f"Camera '{name}': {cam.actual_width}x{cam.actual_height} @ {cam.actual_fps}fps") + + print( + f"Starting RTC runtime — chunk={args.chunk_size}, " + f"horizon={args.execution_horizon}, fps={args.fps}" + ) + try: + stats = runtime.run(duration_s=args.duration_s) + print( + f"\nDone — {stats.steps} steps, {stats.inference_count} inferences, " + f"{stats.total_holds} holds" + ) + print( + f"Latency — max={latency_tracker.max_latency_s:.3f}s, " + f"p95={latency_tracker.percentile_s(95):.3f}s" + ) + finally: + runtime.disconnect() + print("Disconnected") + + +if __name__ == "__main__": + main() diff --git a/src/physicalai/inference/callbacks/__init__.py b/src/physicalai/inference/callbacks/__init__.py index 5cf2108..901046c 100644 --- a/src/physicalai/inference/callbacks/__init__.py +++ b/src/physicalai/inference/callbacks/__init__.py @@ -10,10 +10,12 @@ from physicalai.inference.callbacks.base import Callback from physicalai.inference.callbacks.latency import LatencyMonitor +from physicalai.inference.callbacks.rtc_latency import RTCLatencyTracker from physicalai.inference.callbacks.throughput import ThroughputMonitor __all__ = [ "Callback", "LatencyMonitor", + "RTCLatencyTracker", "ThroughputMonitor", ] diff --git a/src/physicalai/inference/callbacks/rtc_latency.py b/src/physicalai/inference/callbacks/rtc_latency.py new file mode 100644 index 0000000..60be70a --- /dev/null +++ b/src/physicalai/inference/callbacks/rtc_latency.py @@ -0,0 +1,110 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Latency tracker callback for RTC inference delay estimation. + +Measures wall-clock inference latency over a sliding window and +exposes the worst-case value for computing ``inference_delay``. +""" + +from __future__ import annotations + +import math +import time +from collections import deque +from typing import Any, override + +import numpy as np + +from physicalai.inference.callbacks.base import Callback + + +class RTCLatencyTracker(Callback): + """Track inference latency for RTC delay computation. + + Measures the duration of each ``model(inputs)`` call via + ``on_predict_start`` / ``on_predict_end`` hooks. Exposes + ``max_latency_s`` and ``compute_delay(fps)`` for the + ``RTCExecution`` to read. + + Args: + window_size: Number of recent measurements to retain. + + Examples: + >>> tracker = RTCLatencyTracker() + >>> model = InferenceModel.load("./exports/pi05_rtc", callbacks=[tracker]) + >>> # After some predictions: + >>> delay = tracker.compute_delay(fps=30, chunk_size=50, execution_horizon=10) + """ + + def __init__(self, window_size: int = 100) -> None: + self._window: deque[float] = deque(maxlen=window_size) + self._start_time: float = 0.0 + + @override + def on_predict_start(self, inputs: dict[str, Any]) -> None: + """Record prediction start time.""" + self._start_time = time.perf_counter() + + @override + def on_predict_end(self, outputs: dict[str, Any]) -> None: + """Record prediction duration.""" + elapsed = time.perf_counter() - self._start_time + self._window.append(elapsed) + + @override + def on_reset(self) -> None: + """Clear recorded latencies.""" + self._window.clear() + self._start_time = 0.0 + + @property + def max_latency_s(self) -> float: + """Worst-case latency in seconds over the sliding window. + + Returns 0.0 if no measurements recorded yet. + """ + return max(self._window) if self._window else 0.0 + + @property + def latest_latency_s(self) -> float: + """Most recent inference latency in seconds.""" + return self._window[-1] if self._window else 0.0 + + def percentile_s(self, q: float) -> float: + """Compute a percentile of recorded latencies. + + Args: + q: Percentile in [0, 100]. + + Returns: + The q-th percentile in seconds, or 0.0 if empty. + """ + if not self._window: + return 0.0 + return float(np.percentile(np.array(self._window), q)) + + def compute_delay(self, fps: float) -> int: + """Compute integer delay for the RTC model input. + + ``delay = ceil(max_latency * fps)`` + + Args: + fps: Robot control frequency in Hz. + + Returns: + Integer delay (number of action steps consumed during inference). + """ + if self.max_latency_s <= 0: + return 0 + return int(math.ceil(self.max_latency_s * fps)) + + @override + def __repr__(self) -> str: + """Return string representation.""" + return ( + f"RTCLatencyTracker(" + f"max={self.max_latency_s:.3f}s, " + f"latest={self.latest_latency_s:.3f}s, " + f"samples={len(self._window)})" + ) diff --git a/src/physicalai/runtime/__init__.py b/src/physicalai/runtime/__init__.py index 4a4ec70..e4259d3 100644 --- a/src/physicalai/runtime/__init__.py +++ b/src/physicalai/runtime/__init__.py @@ -12,12 +12,14 @@ """ from physicalai.runtime._action_queue import ActionQueue # noqa: PLC2701 +from physicalai.runtime._rtc_action_queue import RTCActionQueue # noqa: PLC2701 from physicalai.runtime.execution import ( AsyncExecution, Execution, SyncExecution, WorkerDiedError, ) +from physicalai.runtime.rtc_execution import RTCExecution from physicalai.runtime.runtime import ( PolicyRuntime, RunStats, @@ -32,6 +34,8 @@ "Execution", "LerpSmoother", "PolicyRuntime", + "RTCActionQueue", + "RTCExecution", "ReplaceSmoother", "RunStats", "RuntimeCallback", diff --git a/src/physicalai/runtime/_rtc_action_queue.py b/src/physicalai/runtime/_rtc_action_queue.py new file mode 100644 index 0000000..61ced16 --- /dev/null +++ b/src/physicalai/runtime/_rtc_action_queue.py @@ -0,0 +1,156 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Dual-track action queue for Real-Time Chunking (RTC). + +Stores both raw (normalized) actions for ``prev_chunk_left_over`` +feedback and postprocessed (denormalized) actions for robot execution. +""" + +from __future__ import annotations + +import logging +import threading + +import numpy as np + +logger = logging.getLogger(__name__) + + +class RTCActionQueue: + """Thread-safe dual-track action queue for RTC inference. + + Maintains two parallel tracks: + - **raw**: normalized model output, used as ``prev_chunk_left_over`` + for the next inference call. + - **processed**: denormalized actions sent to the robot. + + A cursor tracks consumption. ``pop()`` returns one processed action. + ``get_left_over()`` returns the unconsumed raw tail. + ``merge()`` replaces both tracks, trimming stale prefix. + + All public methods are thread-safe. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._raw: np.ndarray | None = None + self._processed: np.ndarray | None = None + self._cursor: int = 0 + self._total_pops: int = 0 + self._total_holds: int = 0 + self._consecutive_holds: int = 0 + + @property + def total_pops(self) -> int: + """Total number of actions popped.""" + return self._total_pops + + @property + def total_holds(self) -> int: + """Total number of hold events (pop on empty queue).""" + return self._total_holds + + @property + def consecutive_holds(self) -> int: + """Number of consecutive holds (resets on successful pop).""" + return self._consecutive_holds + + @property + def remaining(self) -> int: + """Number of unconsumed actions in the queue.""" + with self._lock: + if self._processed is None: + return 0 + return max(0, len(self._processed) - self._cursor) + + def get_action_index(self) -> int: + """Current consumption cursor (snapshot for delay cross-check).""" + with self._lock: + return self._cursor + + def pop(self) -> np.ndarray | None: + """Pop one processed (denormalized) action. + + Returns: + Action array of shape ``(action_dim,)``, or ``None`` if empty. + """ + with self._lock: + if self._processed is None or self._cursor >= len(self._processed): + self._consecutive_holds += 1 + self._total_holds += 1 + return None + action = self._processed[self._cursor].copy() + self._cursor += 1 + self._consecutive_holds = 0 + self._total_pops += 1 + return action + + def get_left_over(self) -> np.ndarray | None: + """Return unconsumed raw (normalized) actions. + + Called from the RTC background thread to build + ``prev_chunk_left_over`` for the next inference call. + + Returns: + Array of shape ``(remaining, action_dim)`` or ``None``. + """ + with self._lock: + if self._raw is None or self._cursor >= len(self._raw): + return None + return self._raw[self._cursor:].copy() + + def merge( + self, + raw: np.ndarray, + processed: np.ndarray, + real_delay: int, + action_index_before_inference: int | None = None, + ) -> None: + """Replace queue contents, trimming the first ``real_delay`` actions. + + Args: + raw: Raw model output, shape ``(chunk_size, action_dim)``. + processed: Postprocessed actions, shape ``(chunk_size, robot_dof)``. + real_delay: Number of leading actions to discard (already + consumed during inference latency). + action_index_before_inference: Cursor snapshot taken before + inference started. Used to compute additional actions + consumed while inference was running. + """ + with self._lock: + # Validate: warn if actual consumed differs from real_delay + if action_index_before_inference is not None: + indexes_diff = max(0, self._cursor - action_index_before_inference) + if indexes_diff != real_delay: + logger.warning( + "Indexes diff != real delay: indexes_diff=%d, real_delay=%d", + indexes_diff, real_delay, + ) + + # Trim stale prefix (actions consumed during inference latency) + trim = max(0, min(real_delay, len(raw), len(processed))) + + self._raw = raw[trim:] + self._processed = processed[trim:] + self._cursor = 0 + + logger.debug( + "RTCActionQueue.merge: trim=%d, remaining=%d", + trim, len(self._processed), + ) + + def below_threshold(self, threshold: int) -> bool: + """Check if remaining actions are below threshold.""" + with self._lock: + if self._processed is None: + return True + return (len(self._processed) - self._cursor) < threshold + + def clear(self) -> None: + """Clear all state.""" + with self._lock: + self._raw = None + self._processed = None + self._cursor = 0 + self._consecutive_holds = 0 diff --git a/src/physicalai/runtime/rtc_execution.py b/src/physicalai/runtime/rtc_execution.py new file mode 100644 index 0000000..5a87da1 --- /dev/null +++ b/src/physicalai/runtime/rtc_execution.py @@ -0,0 +1,323 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Real-Time Chunking (RTC) execution strategy. + +Runs inference in a background daemon thread, injecting RTC-specific +inputs (noise, prev_chunk_left_over, inference_delay, etc.) and +managing a dual-track action queue for continuous robot control. +""" + +from __future__ import annotations + +import logging +import math +import threading +import time +from copy import deepcopy +from typing import TYPE_CHECKING, Any + +import numpy as np + +from physicalai.runtime._rtc_action_queue import RTCActionQueue +from physicalai.runtime.execution import Execution, WorkerDiedError + +if TYPE_CHECKING: + from physicalai.inference.callbacks.rtc_latency import RTCLatencyTracker + from physicalai.inference.model import InferenceModel + from physicalai.inference.postprocessors.base import Postprocessor + from physicalai.runtime._action_queue import ActionQueue + +logger = logging.getLogger(__name__) + +_NOT_STARTED = "start() must be called before this method" +_IDLE_SLEEP_S: float = 0.005 +_ERROR_RETRY_DELAY_S: float = 0.5 +_MAX_CONSECUTIVE_ERRORS: int = 10 +_JOIN_TIMEOUT_S: float = 5.0 + + +class RTCExecution(Execution): + """Async RTC execution strategy with background inference thread. + + The background thread continuously predicts action chunks and + merges them into an :class:`RTCActionQueue`. The main thread pops + one action per tick — never blocking on inference. + + RTC-specific inputs injected before each inference call: + - ``noise``: random noise for denoising (shape: 1×chunk×action_dim) + - ``prev_chunk_left_over``: unconsumed tail (shape: 1×chunk×action_dim) + - ``inference_delay``: integer derived from measured latency + - ``max_guidance_weight``: classifier-free guidance weight + - ``execution_horizon``: number of fresh actions per chunk + + Args: + chunk_size: Number of actions per model output chunk. + execution_horizon: Fresh actions to execute per chunk. + fps: Robot control frequency in Hz. + max_action_dim: Model's internal action dimension (for noise/padding). + max_guidance_weight: Classifier-free guidance weight. + queue_threshold: Re-infer when queue drops below this level. + latency_tracker: Callback that measures inference latency. + If None, delay defaults to 0. + postprocessors: Denormalization pipeline applied to raw actions. + These run in the background thread to produce the processed + track stored in the queue. + """ + + def __init__( + self, + chunk_size: int = 50, + execution_horizon: int = 10, + fps: float = 30.0, + max_action_dim: int = 32, + max_guidance_weight: float = 10.0, + queue_threshold: int | None = None, + latency_tracker: RTCLatencyTracker | None = None, + postprocessors: list[Postprocessor] | None = None, + ) -> None: + self._chunk_size = chunk_size + self._execution_horizon = execution_horizon + self._fps = fps + self._max_action_dim = max_action_dim + self._max_guidance_weight = max_guidance_weight + self._queue_threshold = queue_threshold if queue_threshold is not None else execution_horizon * 3 + self._latency_tracker = latency_tracker + self._postprocessors: list[Postprocessor] = postprocessors or [] + + self._rtc_queue: RTCActionQueue | None = None + self._model: InferenceModel | None = None + self._chunk_size_discovered: int = 0 + + # Thread state + self._obs_lock = threading.Lock() + self._obs_slot: dict[str, np.ndarray] | None = None + self._stop_event = threading.Event() + self._first_chunk_ready = threading.Event() + self._thread: threading.Thread | None = None + self._death_cause: BaseException | None = None + self._inference_count: int = 0 + + @property + def chunk_size(self) -> int: + """Discovered chunk size (from warmup or config).""" + return self._chunk_size_discovered or self._chunk_size + + @property + def inference_count(self) -> int: + """Number of completed inference calls.""" + return self._inference_count + + def start(self, model: InferenceModel, action_queue: ActionQueue) -> None: + """Bind model and queue, spawn background thread. + + The ``action_queue`` parameter is accepted for interface + compatibility but ignored — RTCExecution uses its own + :class:`RTCActionQueue` (passed via constructor or created + internally). The ``PolicyRuntime`` should be given the same + ``RTCActionQueue`` instance. + """ + self._model = model + # Accept RTCActionQueue from PolicyRuntime + if isinstance(action_queue, RTCActionQueue): + self._rtc_queue = action_queue + elif self._rtc_queue is None: + self._rtc_queue = RTCActionQueue() + + self._stop_event.clear() + self._first_chunk_ready.clear() + self._thread = threading.Thread( + target=self._rtc_loop, + name="rtc-inference", + daemon=True, + ) + self._thread.start() + logger.info( + "RTCExecution started (fps=%.1f, chunk=%d, horizon=%d, threshold=%d)", + self._fps, self._chunk_size, self._execution_horizon, self._queue_threshold, + ) + + def warmup(self, sample_observation: dict[str, np.ndarray]) -> None: + """Run one inference to seed the queue and discover chunk size. + + Blocks until the first chunk is produced by the background + thread (or timeout). + + Raises: + RuntimeError: If start() not called or thread dies during warmup. + """ + if self._model is None or self._rtc_queue is None: + raise RuntimeError(_NOT_STARTED) + + # Publish the sample observation for the background thread + with self._obs_lock: + self._obs_slot = deepcopy(sample_observation) + + # Wait for the first chunk with a generous timeout + if not self._first_chunk_ready.wait(timeout=30.0): + if self._death_cause is not None: + msg = f"RTC thread died during warmup: {self._death_cause}" + raise WorkerDiedError(msg) from self._death_cause + msg = "RTCExecution warmup timed out waiting for first chunk" + raise RuntimeError(msg) + + self._chunk_size_discovered = self._chunk_size + logger.info("RTCExecution warmup complete — chunk_size=%d", self._chunk_size_discovered) + + def maybe_request(self, observation: dict[str, np.ndarray]) -> None: + """Publish latest observation for the background thread. + + The background thread decides when to re-infer based on + queue threshold. This just updates the observation slot. + + Raises: + WorkerDiedError: If the inference thread has died. + """ + if self._thread is not None and not self._thread.is_alive() and self._death_cause is not None: + msg = f"RTC inference thread died: {self._death_cause}" + raise WorkerDiedError(msg) from self._death_cause + + with self._obs_lock: + self._obs_slot = { + k: v.copy() if isinstance(v, np.ndarray) else v + for k, v in observation.items() + } + + def stop(self) -> None: + """Signal shutdown and join the background thread.""" + if self._thread is not None: + self._stop_event.set() + self._thread.join(timeout=_JOIN_TIMEOUT_S) + if self._thread.is_alive(): + logger.warning("RTC thread did not join within %.1fs", _JOIN_TIMEOUT_S) + self._thread = None + logger.info("RTCExecution stopped (%d inferences)", self._inference_count) + + def _rtc_loop(self) -> None: + """Background loop: infer chunks and merge into queue.""" + assert self._model is not None # noqa: S101 + assert self._rtc_queue is not None # noqa: S101 + consecutive_errors = 0 + + while not self._stop_event.is_set(): + # Only re-infer when queue is running low + if not self._rtc_queue.below_threshold(self._queue_threshold): + time.sleep(_IDLE_SLEEP_S) + continue + + # Snapshot observation + with self._obs_lock: + if self._obs_slot is None: + time.sleep(_IDLE_SLEEP_S) + continue + inputs = deepcopy(self._obs_slot) + + # Build RTC-specific inputs + inputs = self._inject_rtc_inputs(inputs) + + # Snapshot cursor before inference + action_index_before = self._rtc_queue.get_action_index() + + # Run inference (callbacks fire inside model.__call__) + try: + t0 = time.perf_counter() + outputs = self._model(inputs) + elapsed = time.perf_counter() - t0 + consecutive_errors = 0 + except Exception: + consecutive_errors += 1 + logger.exception( + "RTC inference error (%d/%d)", + consecutive_errors, _MAX_CONSECUTIVE_ERRORS, + ) + if consecutive_errors >= _MAX_CONSECUTIVE_ERRORS: + self._death_cause = RuntimeError("Too many consecutive RTC errors") + logger.error("RTC thread shutting down after %d consecutive errors", consecutive_errors) + return + time.sleep(_ERROR_RETRY_DELAY_S) + continue + + self._inference_count += 1 + + # Extract raw actions: (1, chunk_size, action_dim) → (chunk_size, action_dim) + raw_actions = outputs["action"] + if raw_actions.ndim == 3: # noqa: PLR2004 + raw_actions = raw_actions[0] + + # Postprocess (denormalize) for robot + processed_actions = self._postprocess(raw_actions) + + # Compute real delay from actual elapsed time + real_delay = int(math.ceil(elapsed * self._fps)) + + # Merge into dual-track queue (queue clamps trim internally) + self._rtc_queue.merge( + raw_actions, + processed_actions, + real_delay, + action_index_before_inference=action_index_before, + ) + self._first_chunk_ready.set() + + logger.debug( + "RTC chunk: latency=%.3fs delay=%d remaining=%d", + elapsed, real_delay, self._rtc_queue.remaining, + ) + + def _inject_rtc_inputs(self, inputs: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + """Add RTC-specific model inputs.""" + assert self._rtc_queue is not None # noqa: S101 + + # prev_chunk_left_over from queue + prev_chunk = self._rtc_queue.get_left_over() + if prev_chunk is None: + prev_chunk_padded = np.zeros( + (1, self._chunk_size, self._max_action_dim), + dtype=np.float32, + ) + else: + remaining = prev_chunk.shape[0] + out_dim = prev_chunk.shape[-1] + + # Pad action dim to model's max_action_dim if needed + if out_dim < self._max_action_dim: + prev_chunk = np.pad( + prev_chunk, + ((0, 0), (0, self._max_action_dim - out_dim)), + ) + + # Reshape to (1, remaining, max_action_dim) and pad time to chunk_size + prev_chunk_padded = prev_chunk.reshape(1, remaining, self._max_action_dim) + pad_len = self._chunk_size - remaining + if pad_len > 0: + prev_chunk_padded = np.pad(prev_chunk_padded, ((0, 0), (0, pad_len), (0, 0))) + + # Compute delay from latency tracker + if self._latency_tracker is not None: + delay = self._latency_tracker.compute_delay(self._fps) + else: + delay = 0 + + inputs["prev_chunk_left_over"] = prev_chunk_padded + inputs["inference_delay"] = np.int64(delay) + inputs["max_guidance_weight"] = np.float32(self._max_guidance_weight) + inputs["execution_horizon"] = np.int64(self._execution_horizon) + + return inputs + + def _postprocess(self, actions: np.ndarray) -> np.ndarray: + """Apply postprocessors (denormalization) to raw actions. + + Args: + actions: Shape ``(chunk_size, action_dim)``. + + Returns: + Postprocessed actions, same shape. + """ + if not self._postprocessors: + return actions.copy() + + outputs: dict[str, Any] = {"action": actions} + for pp in self._postprocessors: + outputs = pp(outputs) + return outputs["action"] diff --git a/src/physicalai/runtime/runtime.py b/src/physicalai/runtime/runtime.py index 801daa9..4e99828 100644 --- a/src/physicalai/runtime/runtime.py +++ b/src/physicalai/runtime/runtime.py @@ -13,6 +13,7 @@ import numpy as np from physicalai.runtime._action_queue import ActionQueue # noqa: PLC2701 +from physicalai.runtime._rtc_action_queue import RTCActionQueue # noqa: PLC2701 from physicalai.runtime.execution import Execution, WorkerDiedError from physicalai.runtime.smoothers import LerpSmoother @@ -74,7 +75,7 @@ def __init__( # noqa: D107 execution: Execution, fps: float, cameras: Mapping[str, Camera] | None = None, - action_queue: ActionQueue | None = None, + action_queue: ActionQueue | RTCActionQueue | None = None, callbacks: Sequence[RuntimeCallback] = (), ) -> None: if fps <= 0: From c57cbc313bd6aad585f45dec765ad8862b1889b4 Mon Sep 17 00:00:00 2001 From: Eugene Liu Date: Fri, 22 May 2026 13:27:41 +0100 Subject: [PATCH 2/2] feat(runtime): enhance RTC execution with warmup inferences and improved action trimming --- examples/runtime/rtc_inference.py | 13 +++++++- .../capture/transport/_shared_camera.py | 2 ++ src/physicalai/runtime/_rtc_action_queue.py | 31 +++++++++-------- src/physicalai/runtime/rtc_execution.py | 33 +++++++++++++------ 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/examples/runtime/rtc_inference.py b/examples/runtime/rtc_inference.py index ed6b8da..d4392c1 100644 --- a/examples/runtime/rtc_inference.py +++ b/examples/runtime/rtc_inference.py @@ -50,7 +50,7 @@ def main() -> None: parser.add_argument("--width", type=int, default=640) parser.add_argument("--height", type=int, default=480) parser.add_argument("--fps", type=float, default=30.0) - parser.add_argument("--duration-s", type=float, default=60.0) + parser.add_argument("--duration-s", type=float, default=None, help="Run duration in seconds (default: run indefinitely)") # RTC parameters parser.add_argument("--chunk-size", type=int, default=50) parser.add_argument("--execution-horizon", type=int, default=10) @@ -147,6 +147,17 @@ def main() -> None: f"p95={latency_tracker.percentile_s(95):.3f}s" ) finally: + print("\nRobot is holding position (torque ON).") + try: + resp = input("Disable torque? The arm will drop under gravity. [y/N]: ").strip().lower() + if resp == "y": + robot.set_torque(enabled=False) + robot.torque_on_disconnect = False + print("Torque disabled — arm is free.") + else: + print("Torque remains enabled — arm holds position.") + except (KeyboardInterrupt, EOFError): + print("\nTorque remains enabled.") runtime.disconnect() print("Disconnected") diff --git a/src/physicalai/capture/transport/_shared_camera.py b/src/physicalai/capture/transport/_shared_camera.py index 6af534d..68a0719 100644 --- a/src/physicalai/capture/transport/_shared_camera.py +++ b/src/physicalai/capture/transport/_shared_camera.py @@ -270,6 +270,7 @@ def read(self, timeout: float = 2.0) -> Frame: if sample is None: break newest_sample = sample + sample = None if newest_sample is not None: header, frame = self._decode_sample(newest_sample) @@ -297,6 +298,7 @@ def read_latest(self) -> Frame: if sample is None: break newest_sample = sample + sample = None if newest_sample is not None: header, frame = self._decode_sample(newest_sample) diff --git a/src/physicalai/runtime/_rtc_action_queue.py b/src/physicalai/runtime/_rtc_action_queue.py index 61ced16..329ad03 100644 --- a/src/physicalai/runtime/_rtc_action_queue.py +++ b/src/physicalai/runtime/_rtc_action_queue.py @@ -104,32 +104,31 @@ def merge( self, raw: np.ndarray, processed: np.ndarray, - real_delay: int, action_index_before_inference: int | None = None, ) -> None: - """Replace queue contents, trimming the first ``real_delay`` actions. + """Replace queue contents, trimming actions consumed during inference. + + Trim is derived from actual cursor movement (actions the robot + consumed from the previous chunk while inference was running). + For the first chunk, cursor hasn't moved so trim=0 and the + full chunk is kept. Args: raw: Raw model output, shape ``(chunk_size, action_dim)``. processed: Postprocessed actions, shape ``(chunk_size, robot_dof)``. - real_delay: Number of leading actions to discard (already - consumed during inference latency). action_index_before_inference: Cursor snapshot taken before - inference started. Used to compute additional actions - consumed while inference was running. + inference started. Used to compute how many actions were + consumed during inference. """ with self._lock: - # Validate: warn if actual consumed differs from real_delay + # Trim = actual actions consumed during inference if action_index_before_inference is not None: - indexes_diff = max(0, self._cursor - action_index_before_inference) - if indexes_diff != real_delay: - logger.warning( - "Indexes diff != real delay: indexes_diff=%d, real_delay=%d", - indexes_diff, real_delay, - ) - - # Trim stale prefix (actions consumed during inference latency) - trim = max(0, min(real_delay, len(raw), len(processed))) + trim = max(0, self._cursor - action_index_before_inference) + else: + trim = 0 + + # Clamp to array length + trim = min(trim, len(raw), len(processed)) self._raw = raw[trim:] self._processed = processed[trim:] diff --git a/src/physicalai/runtime/rtc_execution.py b/src/physicalai/runtime/rtc_execution.py index 5a87da1..3b0e0fc 100644 --- a/src/physicalai/runtime/rtc_execution.py +++ b/src/physicalai/runtime/rtc_execution.py @@ -60,6 +60,9 @@ class RTCExecution(Execution): queue_threshold: Re-infer when queue drops below this level. latency_tracker: Callback that measures inference latency. If None, delay defaults to 0. + warmup_inferences: Number of initial inferences treated as warmup. + The latency tracker is reset after these to discard + compilation/kernel-build overhead (e.g. OpenVINO first-run). postprocessors: Denormalization pipeline applied to raw actions. These run in the background thread to produce the processed track stored in the queue. @@ -74,6 +77,7 @@ def __init__( max_guidance_weight: float = 10.0, queue_threshold: int | None = None, latency_tracker: RTCLatencyTracker | None = None, + warmup_inferences: int = 2, postprocessors: list[Postprocessor] | None = None, ) -> None: self._chunk_size = chunk_size @@ -83,6 +87,7 @@ def __init__( self._max_guidance_weight = max_guidance_weight self._queue_threshold = queue_threshold if queue_threshold is not None else execution_horizon * 3 self._latency_tracker = latency_tracker + self._warmup_inferences = max(1, warmup_inferences) self._postprocessors: list[Postprocessor] = postprocessors or [] self._rtc_queue: RTCActionQueue | None = None @@ -154,7 +159,7 @@ def warmup(self, sample_observation: dict[str, np.ndarray]) -> None: self._obs_slot = deepcopy(sample_observation) # Wait for the first chunk with a generous timeout - if not self._first_chunk_ready.wait(timeout=30.0): + if not self._first_chunk_ready.wait(timeout=120.0): if self._death_cause is not None: msg = f"RTC thread died during warmup: {self._death_cause}" raise WorkerDiedError(msg) from self._death_cause @@ -239,6 +244,15 @@ def _rtc_loop(self) -> None: self._inference_count += 1 + # Reset latency tracker after warmup inferences to discard + # compilation overhead (e.g. OpenVINO first-run latency). + if self._inference_count <= self._warmup_inferences and self._latency_tracker is not None: + self._latency_tracker.on_reset() + logger.info( + "Warmup inference %d/%d complete (%.2fs) — latency tracker reset", + self._inference_count, self._warmup_inferences, elapsed, + ) + # Extract raw actions: (1, chunk_size, action_dim) → (chunk_size, action_dim) raw_actions = outputs["action"] if raw_actions.ndim == 3: # noqa: PLR2004 @@ -247,21 +261,20 @@ def _rtc_loop(self) -> None: # Postprocess (denormalize) for robot processed_actions = self._postprocess(raw_actions) - # Compute real delay from actual elapsed time - real_delay = int(math.ceil(elapsed * self._fps)) - - # Merge into dual-track queue (queue clamps trim internally) + # Merge into dual-track queue — trim is based on actual + # actions consumed (cursor movement) during inference, NOT + # wall-clock time. For the first chunk nothing was consumed + # so trim=0 and the full chunk is kept. self._rtc_queue.merge( - raw_actions, - processed_actions, - real_delay, + raw_actions, + processed_actions, action_index_before_inference=action_index_before, ) self._first_chunk_ready.set() logger.debug( - "RTC chunk: latency=%.3fs delay=%d remaining=%d", - elapsed, real_delay, self._rtc_queue.remaining, + "RTC chunk: latency=%.3fs remaining=%d", + elapsed, self._rtc_queue.remaining, ) def _inject_rtc_inputs(self, inputs: dict[str, np.ndarray]) -> dict[str, np.ndarray]: