diff --git a/examples/runtime/async_inference.py b/examples/runtime/async_inference.py new file mode 100644 index 0000000..6b07efa --- /dev/null +++ b/examples/runtime/async_inference.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Async inference with PolicyRuntime. + +python examples/runtime/async_inference.py \ + --model ./exports/pi05_cans_openvino \ + --device GPU.0 \ + --port /dev/ttyACM0 \ + --calibration /home/max/.cache/physicalai/robots/a8d8d997-a59e-4423-9006-5d991d223887/calibrations/0b2f185a-8ab2-4956-91c2-3a2ac2dbd8c1.json \ + --overhead-camera /dev/v4l/by-id/usb-UGREEN_Camera_2K_UGREEN_Camera_2K_SN0001-video-index0 \ + --arm-camera 353322271391 \ + --front-camera /dev/v4l/by-id/usb-Innomaker_Innomaker-U20CAM-1080p-S1_SN0001-video-index0 \ + --width 640 \ + --height 480 \ + --fps 30 \ + --duration-s 60 +""" + +from __future__ import annotations + +import argparse + +import openvino as ov +import numpy as np + +from physicalai.capture import discover_all +from physicalai.capture.transport import SharedCamera +from physicalai.inference import InferenceModel +from physicalai.robot import SO101 +from physicalai.runtime import ( + ActionQueue, + AsyncExecution, + LerpSmoother, + PolicyRuntime, +) + + +def main(): + parser = argparse.ArgumentParser(description="Run policy with PolicyRuntime") + parser.add_argument("--model", required=True, help="Exported model directory") + parser.add_argument("--device", default="GPU.0", help="OpenVINO device") + parser.add_argument("--port", default="/dev/ttyACM0", help="Robot serial port") + parser.add_argument("--calibration", required=True, help="Robot calibration file") + parser.add_argument("--overhead-camera", required=True, help="Overhead camera device path") + parser.add_argument("--arm-camera", required=True, help="Arm camera serial number") + parser.add_argument("--front-camera", required=True, help="Front camera device path") + parser.add_argument("--width", type=int, default=640) + parser.add_argument("--height", type=int, default=480) + parser.add_argument("--duration-s", type=float, default=60.0) + parser.add_argument("--fps", type=float, default=30.0) + args = parser.parse_args() + + import openvino_tokenizers # noqa: F401 — registers OV tokenizer ops + + print(f"Available devices:") + core = ov.Core() + devices = core.available_devices + for dev in devices: + print(f" {dev}: {core.get_property(dev, 'FULL_DEVICE_NAME')}") + print(f"Selected device: {args.device}") + + model = InferenceModel.load(args.model, device=args.device) + robot = SO101(port=args.port, calibration=args.calibration, role="follower") + cameras = { + "overhead": SharedCamera("uvc", device=args.overhead_camera, width=args.width, height=args.height, fps=int(args.fps)), + "front": SharedCamera("uvc", device=args.front_camera, width=args.width, height=args.height, fps=int(args.fps)), + "arm": SharedCamera("realsense", serial_number=args.arm_camera, width=args.width, height=args.height, fps=int(args.fps)), + } + + runtime = PolicyRuntime( + robot=robot, + model=model, + execution=AsyncExecution(threshold=0.3, fps=int(args.fps)), + action_queue=ActionQueue(smoother=LerpSmoother(duration_frames=5)), + cameras=cameras, + fps=args.fps, + ) + + try: + runtime.connect() + except Exception as e: + print(f"Failed to connect: {e}") + print("Available cameras:") + for driver, devices in discover_all().items(): + for dev in devices: + print(f" Driver: {driver}, Device: {dev.device_id}, Info: {dev.name}") + return + + for name, cam in cameras.items(): + print(f"Camera '{name}' connected: {cam.actual_width}x{cam.actual_height} @ {cam.actual_fps}fps") + + print("Starting policy runtime...") + try: + stats = runtime.run(duration_s=args.duration_s) + print(f"\nDone — {stats.steps} steps, {stats.inference_count} inferences, {stats.total_holds} holds") + finally: + runtime.disconnect() + print("Disconnected") + + +if __name__ == "__main__": + main() diff --git a/src/physicalai/inference/__init__.py b/src/physicalai/inference/__init__.py index aacf56f..186b175 100644 --- a/src/physicalai/inference/__init__.py +++ b/src/physicalai/inference/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Production inference module for exported policies. diff --git a/src/physicalai/inference/adapters/__init__.py b/src/physicalai/inference/adapters/__init__.py index f3124c7..debc1d8 100644 --- a/src/physicalai/inference/adapters/__init__.py +++ b/src/physicalai/inference/adapters/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Inference adapters for different backend runtimes. diff --git a/src/physicalai/inference/adapters/_discovery.py b/src/physicalai/inference/adapters/_discovery.py index ed00a77..154c203 100644 --- a/src/physicalai/inference/adapters/_discovery.py +++ b/src/physicalai/inference/adapters/_discovery.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Adapter discovery and factory helpers.""" diff --git a/src/physicalai/inference/adapters/openvino.py b/src/physicalai/inference/adapters/openvino.py index 08a3ff4..81b5d2e 100644 --- a/src/physicalai/inference/adapters/openvino.py +++ b/src/physicalai/inference/adapters/openvino.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """OpenVINO adapter for inference.""" diff --git a/src/physicalai/inference/model.py b/src/physicalai/inference/model.py index 39f4fc4..4964969 100644 --- a/src/physicalai/inference/model.py +++ b/src/physicalai/inference/model.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Production-ready inference model with unified API.""" @@ -7,9 +7,11 @@ import json import warnings +from collections import deque from pathlib import Path from typing import TYPE_CHECKING, Any, Self +import numpy as np import yaml from physicalai.inference.adapters import adapter_registry, get_adapter @@ -17,11 +19,8 @@ from physicalai.inference.constants import ACTION from physicalai.inference.manifest import ComponentSpec, Manifest from physicalai.inference.runners import get_runner -from physicalai.inference.utils import ActionCursor if TYPE_CHECKING: - import numpy as np - from physicalai.inference.adapters.base import RuntimeAdapter from physicalai.inference.callbacks.base import Callback from physicalai.inference.postprocessors.base import Postprocessor @@ -125,7 +124,7 @@ def __init__( for callback in self.callbacks: callback.on_load(self) - self.cursor = ActionCursor() + self._action_buffer: deque[np.ndarray] = deque() @property def chunk_size(self) -> int: @@ -207,31 +206,43 @@ def select_action(self, observation: dict[str, np.ndarray]) -> np.ndarray: observation: Observation dict mapping names to numpy arrays. Returns: - Action array to execute. + 1-D action vector with shape ``(action_dim,)``. Examples: >>> obs = env.reset() >>> action = policy.select_action(obs) >>> next_obs, reward, done = env.step(action) """ - if self.cursor.empty: - self.cursor.push_chunk(self(observation)[ACTION]) - - return self.cursor.pop() + if not self._action_buffer: + self._action_buffer.extend(self.predict_action_chunk(observation)) + return self._action_buffer.popleft() def predict_action_chunk(self, observation: dict[str, np.ndarray]) -> np.ndarray: """Predict a chunk of actions for the given observation. - Delegates to ``__call__`` and extracts the ``"action_chunk"`` key. + Delegates to ``__call__`` and extracts the ``"action"`` key. Args: observation: Observation dict mapping names to numpy arrays. Returns: - Chunk of actions to execute. + 2-D action chunk with shape ``(chunk_size, action_dim)``. + + Raises: + ValueError: If the output has a batch dimension greater than 1. """ outputs = self(observation) - return outputs[ACTION] + actions = outputs[ACTION] + # Strip the batch dimension; reject actual batches (batch > 1). + if actions.ndim == 3: # noqa: PLR2004 + if actions.shape[0] != 1: + msg = ( + f"Batched inference is not supported by predict_action_chunk: " + f"expected batch dimension of 1, got shape {actions.shape}" + ) + raise ValueError(msg) + actions = actions[0] + return np.atleast_2d(actions) def reset(self) -> None: """Reset policy state for new episode. @@ -250,7 +261,7 @@ def reset(self) -> None: ... obs, reward, done = env.step(action) """ self.runner.reset() - self.cursor.reset() + self._action_buffer.clear() for callback in self.callbacks: callback.on_reset() diff --git a/src/physicalai/inference/preprocessors/pi05.py b/src/physicalai/inference/preprocessors/pi05.py index 0d5c68e..c3d631f 100644 --- a/src/physicalai/inference/preprocessors/pi05.py +++ b/src/physicalai/inference/preprocessors/pi05.py @@ -135,7 +135,9 @@ def _preprocess_images( if img.ndim == max_image_dim: img = img[:, -1, :, :, :] - if img.dtype != np.float32: + if img.dtype == np.uint8: + img = img.astype(np.float32) / 255.0 + elif img.dtype != np.float32: img = img.astype(np.float32) # Detect layout: assume channels-first when dim-1 == 3 @@ -152,8 +154,8 @@ def _preprocess_images( # [0, 1] -> [-1, 1] img = img * 2.0 - 1.0 - if channels_first: - img = np.transpose(img, (0, 3, 1, 2)) # -> (B, C, H, W) + # Output is always (B, C, H, W) — img is HWC at this point + img = np.transpose(img, (0, 3, 1, 2)) bsize = img.shape[0] mask = np.ones(bsize, dtype=np.bool_) diff --git a/src/physicalai/inference/utils/__init__.py b/src/physicalai/inference/utils/__init__.py deleted file mode 100644 index c576e49..0000000 --- a/src/physicalai/inference/utils/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (C) 2026 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -"""Inference utilities.""" - -from physicalai.inference.utils.action_cursor import ActionCursor - -__all__ = ["ActionCursor"] diff --git a/src/physicalai/inference/utils/action_cursor.py b/src/physicalai/inference/utils/action_cursor.py deleted file mode 100644 index a219124..0000000 --- a/src/physicalai/inference/utils/action_cursor.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright (C) 2026 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -"""ActionCursor: action-chunk buffer for temporal action dispensing.""" - -from __future__ import annotations - -from collections import deque - -import numpy as np - - -class ActionCursor: - """Buffer that queues an action chunk and dispenses one timestep per call. - - Call :meth:`push_chunk` with the full action output from the runner - (shape ``(batch, T, action_dim)``), then call :meth:`pop` repeatedly - to retrieve individual timestep actions (shape ``(batch, action_dim)``). - When the buffer is exhausted, :attr:`empty` is ``True`` and a new - chunk should be pushed. - - Examples: - >>> cursor = ActionCursor() - >>> cursor.empty - True - >>> chunk = np.random.randn(1, 10, 7) # batch=1, T=10, action_dim=7 - >>> cursor.push_chunk(chunk) - >>> cursor.empty - False - >>> action = cursor.pop() # shape (1, 7) - >>> cursor.reset() - >>> cursor.empty - True - """ - - def __init__(self) -> None: - """Initialize an empty ActionCursor with no buffered actions.""" - self._queue: deque[np.ndarray] = deque() - - @property - def empty(self) -> bool: - """True when there are no buffered actions remaining.""" - return len(self._queue) == 0 - - def push_chunk(self, chunk: np.ndarray) -> None: - """Queue all timestep slices from an action chunk. - - Args: - chunk: Action array with shape ``(batch, T, action_dim)`` or - ``(T, action_dim)``. Each of the ``T`` timestep slices is - enqueued individually. - - Raises: - ValueError: If ``chunk.ndim`` is not 2 or 3. - """ - min_batched_action_dim = 2 - batched_temporal_dim = 3 - if chunk.ndim not in {min_batched_action_dim, batched_temporal_dim}: - msg = f"Chunk must be a 2-D or 3-D array, got ndim={chunk.ndim}." - raise ValueError(msg) - - if chunk.ndim == min_batched_action_dim: - # (T, action_dim) - no batch dimension - self._queue.extend(chunk) - else: - # (batch, T, action_dim) - transpose to (T, batch, action_dim) - self._queue.extend(np.transpose(chunk, (1, 0, 2))) - - def pop(self) -> np.ndarray: - """Return and remove the next buffered action. - - Returns: - Action array for the current timestep, shape ``(batch, action_dim)`` - or ``(action_dim,)`` depending on what was pushed. - - Raises: - IndexError: If the queue is empty. - """ - if self.empty: - msg = "ActionCursor is empty; call push_chunk before pop." - raise IndexError(msg) - return self._queue.popleft() - - def reset(self) -> None: - """Clear all buffered actions.""" - self._queue.clear() - - def __repr__(self) -> str: - """Return string representation.""" - return f"ActionCursor(buffered={len(self._queue)})" diff --git a/src/physicalai/robot/__init__.py b/src/physicalai/robot/__init__.py index 1465750..2c55631 100644 --- a/src/physicalai/robot/__init__.py +++ b/src/physicalai/robot/__init__.py @@ -13,12 +13,20 @@ from __future__ import annotations +from typing import TYPE_CHECKING + from physicalai.robot.connect import connect -from physicalai.robot.interface import Robot +from physicalai.robot.interface import Robot, RobotObservation from physicalai.robot.verify import verify_robot +if TYPE_CHECKING: + from physicalai.robot.so101 import SO101 as SO101 + from physicalai.robot.trossen import BimanualWidowXAI as BimanualWidowXAI + from physicalai.robot.trossen import WidowXAI as WidowXAI + __all__ = [ "Robot", + "RobotObservation", "connect", "verify_robot", ] diff --git a/src/physicalai/runtime/__init__.py b/src/physicalai/runtime/__init__.py new file mode 100644 index 0000000..4a4ec70 --- /dev/null +++ b/src/physicalai/runtime/__init__.py @@ -0,0 +1,40 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Runtime system for running trained policies on robot hardware. + +Public API:: + + from physicalai.runtime import PolicyRuntime, RunStats, RuntimeCallback + from physicalai.runtime import SyncExecution, AsyncExecution, Execution, WorkerDiedError + from physicalai.runtime import ActionQueue + from physicalai.runtime import ChunkSmoother, LerpSmoother, ReplaceSmoother +""" + +from physicalai.runtime._action_queue import ActionQueue # noqa: PLC2701 +from physicalai.runtime.execution import ( + AsyncExecution, + Execution, + SyncExecution, + WorkerDiedError, +) +from physicalai.runtime.runtime import ( + PolicyRuntime, + RunStats, + RuntimeCallback, +) +from physicalai.runtime.smoothers import ChunkSmoother, LerpSmoother, ReplaceSmoother + +__all__ = [ + "ActionQueue", + "AsyncExecution", + "ChunkSmoother", + "Execution", + "LerpSmoother", + "PolicyRuntime", + "ReplaceSmoother", + "RunStats", + "RuntimeCallback", + "SyncExecution", + "WorkerDiedError", +] diff --git a/src/physicalai/runtime/_action_queue.py b/src/physicalai/runtime/_action_queue.py new file mode 100644 index 0000000..c195edf --- /dev/null +++ b/src/physicalai/runtime/_action_queue.py @@ -0,0 +1,72 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import threading +from collections import deque + +import numpy as np + +from physicalai.runtime.smoothers import ChunkSmoother, ReplaceSmoother + + +class ActionQueue: + """Thread-safe action queue with chunk smoothing.""" + + def __init__(self, smoother: ChunkSmoother | None = None) -> None: + self._smoother = smoother or ReplaceSmoother() + self._deque: deque[np.ndarray] = deque() + self._lock = threading.Lock() + self._consecutive_holds = 0 + self._total_holds = 0 + self._total_pops = 0 + + def push_chunk(self, chunk: np.ndarray, offset: int = 0) -> None: + """Push an action chunk, blending with remaining actions via the smoother.""" + with self._lock: + remaining = np.stack(list(self._deque)) if self._deque else np.empty((0, chunk.shape[1]), dtype=chunk.dtype) + merged = self._smoother.merge(remaining, chunk, offset) + self._deque.clear() + self._deque.extend(merged) + + def pop(self) -> np.ndarray | None: + """Pop the next action. + + Returns: + Single action vector, or None if empty. + """ + with self._lock: + if not self._deque: + self._consecutive_holds += 1 + self._total_holds += 1 + return None + self._consecutive_holds = 0 + self._total_pops += 1 + return self._deque.popleft() + + @property + def remaining(self) -> int: + with self._lock: + return len(self._deque) + + @property + def consecutive_holds(self) -> int: + return self._consecutive_holds + + @property + def total_holds(self) -> int: + return self._total_holds + + @property + def total_pops(self) -> int: + return self._total_pops + + def below_threshold(self, threshold: int) -> bool: + with self._lock: + return len(self._deque) < threshold + + def clear(self) -> None: + with self._lock: + self._deque.clear() + self._consecutive_holds = 0 diff --git a/src/physicalai/runtime/execution.py b/src/physicalai/runtime/execution.py new file mode 100644 index 0000000..bb2c854 --- /dev/null +++ b/src/physicalai/runtime/execution.py @@ -0,0 +1,250 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Execution strategies for scheduling policy inference.""" + +from __future__ import annotations + +import logging +import threading +import time +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from physicalai.inference.model import InferenceModel + from physicalai.runtime._action_queue import ActionQueue + +logger = logging.getLogger(__name__) + +_NOT_STARTED = "start() must be called before this method" + + +class WorkerDiedError(RuntimeError): + """Raised when the inference worker thread dies unexpectedly.""" + + +class Execution(ABC): + """Decides when and where inference runs. Pushes results into ActionQueue.""" + + @abstractmethod + def start(self, model: InferenceModel, action_queue: ActionQueue) -> None: + """Bind to model and queue. Called once before the loop.""" + ... + + @abstractmethod + def maybe_request(self, observation: dict[str, np.ndarray]) -> None: + """Check if new inference is needed. If so, run or schedule it.""" + ... + + @abstractmethod + def warmup(self, sample_observation: dict[str, np.ndarray]) -> None: + """Run one inference to discover chunk_size and seed the queue.""" + ... + + @abstractmethod + def stop(self) -> None: + """Stop scheduling.""" + ... + + @property + @abstractmethod + def chunk_size(self) -> int: + """Discovered after warmup().""" + ... + + +class SyncExecution(Execution): + """Synchronous inference in the control thread.""" + + def __init__(self) -> None: # noqa: D107 + self._model: InferenceModel | None = None + self._queue: ActionQueue | None = None + self._chunk_size: int = 0 + + def start(self, model: InferenceModel, action_queue: ActionQueue) -> None: + """Bind model and queue.""" + self._model = model + self._queue = action_queue + + def warmup(self, sample_observation: dict[str, np.ndarray]) -> None: + """Run one inference, seed queue, discover chunk_size. + + Raises: + RuntimeError: If start() has not been called. + """ + if self._model is None or self._queue is None: + raise RuntimeError(_NOT_STARTED) + actions = self._model.predict_action_chunk(sample_observation) + self._chunk_size = actions.shape[0] + self._queue.push_chunk(actions, offset=0) + + def maybe_request(self, observation: dict[str, np.ndarray]) -> None: + """Refill queue synchronously when empty. + + Raises: + RuntimeError: If start() has not been called. + """ + if self._model is None or self._queue is None: + raise RuntimeError(_NOT_STARTED) + if self._queue.below_threshold(1): + actions = self._model.predict_action_chunk(observation) + self._queue.push_chunk(actions, offset=0) + + def stop(self) -> None: + """No-op for synchronous execution.""" + + @property + def chunk_size(self) -> int: + """Return discovered chunk size.""" + return self._chunk_size + + +class AsyncExecution(Execution): + """Async inference in a background thread with health monitoring.""" + + def __init__( # noqa: D107 + self, + threshold: float = 0.5, + fps: int = 30, + watchdog_timeout_s: float = 30.0, + max_consecutive_holds: int | None = None, + ) -> None: + self._threshold_frac = threshold + self._fps = fps + self._watchdog_timeout_s = watchdog_timeout_s + self._max_consecutive_holds = max_consecutive_holds or 3 * fps + + self._model: InferenceModel | None = None + self._queue: ActionQueue | None = None + self._chunk_size: int = 0 + self._threshold_count: int = 0 + + self._lock = threading.Lock() + self._obs_slot: dict[str, np.ndarray] | None = None + self._obs_ready = threading.Event() + self._running_inference = False + self._request_time: float = 0.0 + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._death_cause: BaseException | None = None + self._inference_count: int = 0 + + def start(self, model: InferenceModel, action_queue: ActionQueue) -> None: + """Bind model/queue and spawn inference thread.""" + self._model = model + self._queue = action_queue + self._thread = threading.Thread(target=self._run, name="InferenceThread", daemon=True) + self._thread.start() + + def warmup(self, sample_observation: dict[str, np.ndarray]) -> None: + """Run one inference in main thread, seed queue, discover chunk_size. + + Raises: + RuntimeError: If start() has not been called. + """ + if self._model is None or self._queue is None: + raise RuntimeError(_NOT_STARTED) + actions = self._model.predict_action_chunk(sample_observation) + self._chunk_size = actions.shape[0] + self._threshold_count = int(self._chunk_size * self._threshold_frac) + self._queue.push_chunk(actions, offset=0) + + def maybe_request(self, observation: dict[str, np.ndarray]) -> None: + """Submit observation for background inference if queue is low and worker idle. + + Raises: + RuntimeError: If start() has not been called. + WorkerDiedError: If the inference thread has died. + """ + if self._queue is None: + raise RuntimeError(_NOT_STARTED) + if self._thread is not None and not self._thread.is_alive() and self._death_cause is not None: + msg = f"Inference thread died: {self._death_cause}" + raise WorkerDiedError(msg) from self._death_cause + + if self._busy_duration > self._watchdog_timeout_s: + logger.warning("Inference stuck for %.0fs — force resetting", self._busy_duration) + self._force_reset() + + if self._queue.below_threshold(self._threshold_count) and not self._busy: + snapshot = {k: v.copy() if isinstance(v, np.ndarray) else v for k, v in observation.items()} + with self._lock: + self._obs_slot = snapshot + self._request_time = time.perf_counter() + self._obs_ready.set() + + def stop(self) -> None: + """Signal thread and join with timeout.""" + if self._thread is not None: + self._stop_event.set() + self._obs_ready.set() + self._thread.join(timeout=10.0) + + @property + def chunk_size(self) -> int: + """Return discovered chunk size.""" + return self._chunk_size + + @property + def alive(self) -> bool: + """Whether the inference thread is alive.""" + return self._thread is not None and self._thread.is_alive() + + @property + def inference_count(self) -> int: + """Number of completed inference calls.""" + return self._inference_count + + @property + def _busy(self) -> bool: + with self._lock: + return self._obs_slot is not None or self._running_inference + + @property + def _busy_duration(self) -> float: + with self._lock: + if not (self._obs_slot is not None or self._running_inference): + return 0.0 + return time.perf_counter() - self._request_time + + def _force_reset(self) -> None: + with self._lock: + self._obs_slot = None + self._running_inference = False + logger.warning("Force reset — cleared stuck inference state") + + def _run(self) -> None: + try: + while not self._stop_event.is_set(): + self._obs_ready.wait() + self._obs_ready.clear() + + if self._stop_event.is_set(): + return + + with self._lock: + obs = self._obs_slot + self._obs_slot = None + if obs is None: + continue + self._running_inference = True + + if self._model is None or self._queue is None: + raise RuntimeError(_NOT_STARTED) # noqa: TRY301 + t0 = time.perf_counter() + actions = self._model.predict_action_chunk(obs) + latency = time.perf_counter() - t0 + + offset = int(latency * self._fps) + self._queue.push_chunk(actions, offset=offset) + self._inference_count += 1 + + with self._lock: + self._running_inference = False + + except Exception as e: + self._death_cause = e + logger.exception("Inference thread died") diff --git a/src/physicalai/runtime/runtime.py b/src/physicalai/runtime/runtime.py new file mode 100644 index 0000000..801daa9 --- /dev/null +++ b/src/physicalai/runtime/runtime.py @@ -0,0 +1,292 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""PolicyRuntime — runs a trained policy on robot hardware.""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Protocol, Self + +import numpy as np + +from physicalai.runtime._action_queue import ActionQueue # noqa: PLC2701 +from physicalai.runtime.execution import Execution, WorkerDiedError +from physicalai.runtime.smoothers import LerpSmoother + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + + from physicalai.capture.camera import Camera + from physicalai.inference.model import InferenceModel + from physicalai.robot.interface import Robot + +logger = logging.getLogger(__name__) + +_DEFAULT_LERP_FRAMES = 5 + + +class RuntimeCallback(Protocol): + """Optional hook points in the PolicyRuntime control loop.""" + + def before_send_action(self, *, action: np.ndarray, step: int) -> np.ndarray | None: + """Called before sending action. Return modified action or None.""" + ... + + def on_action_sent(self, *, action: np.ndarray, step: int) -> None: + """Called after action is sent to robot.""" + ... + + def on_hold(self, *, step: int, holds: int) -> None: + """Called when action queue is empty and robot holds last position.""" + ... + + +@dataclass(frozen=True) +class RunStats: + """Statistics from a PolicyRuntime.run() session.""" + + steps: int + total_pops: int + total_holds: int + inference_count: int + transient_errors: int = 0 + stale_obs_ticks: int = 0 + + +class PolicyRuntime: + """Runs a policy on robot hardware. + + Loop: observe → maybe_request → pop → send → sleep. + + Supports context manager for safe lifecycle management:: + + with PolicyRuntime(robot=robot, model=model, ...) as runtime: + stats = runtime.run(duration_s=60.0) + """ + + def __init__( # noqa: D107 + self, + robot: Robot, + model: InferenceModel, + execution: Execution, + fps: float, + cameras: Mapping[str, Camera] | None = None, + action_queue: ActionQueue | None = None, + callbacks: Sequence[RuntimeCallback] = (), + ) -> None: + if fps <= 0: + msg = f"fps must be positive, got {fps}" + raise ValueError(msg) + self._robot = robot + self._model = model + self._execution = execution + self._fps = fps + self._cameras: Mapping[str, Camera] = cameras or {} + self._action_queue = action_queue or ActionQueue(smoother=LerpSmoother(duration_frames=_DEFAULT_LERP_FRAMES)) + self._callbacks = list(callbacks) + self._goal_time = (1.0 / fps) * 3 + self._connected = False + + def connect(self) -> None: + """Connect robot and cameras. + + Connects robot first, then cameras in dict order. On failure, + disconnects everything already connected and re-raises. + + Idempotent — calling on an already-connected runtime is a no-op. + """ + if self._connected: + logger.debug("connect() called but already connected — no-op") + return + + self._robot.connect() + connected_cameras: list[str] = [] + try: + for name, cam in self._cameras.items(): + cam.connect() + connected_cameras.append(name) + except Exception: + for cam_name in connected_cameras: + try: + self._cameras[cam_name].disconnect() + except Exception: + logger.warning("Failed to disconnect camera '%s' during rollback", cam_name, exc_info=True) + try: + self._robot.disconnect() + except Exception: + logger.warning("Failed to disconnect robot during rollback", exc_info=True) + raise + + self._connected = True + + def disconnect(self) -> None: + """Disconnect cameras then robot. Never raises. + + Idempotent — calling on an already-disconnected runtime is a no-op. + """ + if not self._connected: + return + + for name, cam in self._cameras.items(): + try: + cam.disconnect() + except Exception: + logger.warning("Failed to disconnect camera '%s'", name, exc_info=True) + try: + self._robot.disconnect() + except Exception: + logger.warning("Failed to disconnect robot", exc_info=True) + + self._connected = False + + def __enter__(self) -> Self: # noqa: D105 + self.connect() + return self + + def __exit__(self, *exc_info: object) -> None: # noqa: D105 + self.disconnect() + + def run(self, *, duration_s: float | None = None) -> RunStats: + """Run the control loop. + + Args: + duration_s: Maximum duration in seconds. None runs indefinitely. + + Returns: + Statistics from the run session. + + Raises: + RuntimeError: If called before connect(). + WorkerDiedError: If the inference worker thread dies. + """ + if not self._connected: + msg = "PolicyRuntime.run() called before connect(). Use 'with runtime:' or call runtime.connect() first." + raise RuntimeError(msg) + self._execution.start(self._model, self._action_queue) + sample_obs = self._build_model_input() + self._execution.warmup(sample_obs) + + goal_time = 1.0 / self._fps + step = 0 + last_action: np.ndarray | None = None + + try: + while True: + if duration_s is not None and step * goal_time >= duration_s: + break + + loop_start = time.perf_counter() + + obs = self._build_model_input() + self._execution.maybe_request(obs) + + action = self._action_queue.pop() + if action is not None: + last_action = action + else: + action = last_action + self._handle_hold(step=step) + + if action is None: + logger.error("No action available (warmup may have failed)") + self._tick_sleep(loop_start, goal_time) + step += 1 + continue + + modified = self._invoke_callback("before_send_action", action=action, step=step) + if modified is not None: + action = modified + + self._robot.send_action(action, goal_time=self._goal_time) + self._invoke_callback("on_action_sent", action=action, step=step) + self._tick_sleep(loop_start, goal_time) + + step += 1 + + except KeyboardInterrupt: + logger.info("Interrupted by user") + except WorkerDiedError: + logger.exception("Worker died during runtime") + raise + finally: + self._shutdown(step) + + return RunStats( + steps=step, + total_pops=self._action_queue.total_pops, + total_holds=self._action_queue.total_holds, + inference_count=getattr(self._execution, "inference_count", 0), + ) + + def _handle_hold(self, *, step: int) -> None: + holds = self._action_queue.consecutive_holds + if holds == 1: + logger.warning("Queue empty — holding position") + elif self._fps > 0: + warning_interval = max(int(self._fps), 1) + if holds % warning_interval == 0: + logger.warning( + "Queue starvation: %d consecutive holds (%.1fs)", + holds, + holds / self._fps, + ) + self._invoke_callback("on_hold", step=step, holds=holds) + + @staticmethod + def _tick_sleep(loop_start: float, goal_time: float) -> None: + elapsed = time.perf_counter() - loop_start + sleep_time = goal_time - elapsed + if sleep_time > 0: + time.sleep(sleep_time) + + def _build_model_input(self) -> dict[str, Any]: + robot_obs = self._robot.get_observation() + model_input: dict[str, Any] = {} + + if robot_obs.joint_positions is not None: + model_input["state"] = np.array([robot_obs.joint_positions], dtype=np.float32) + + # Merge robot-embedded images and external cameras + if robot_obs.images: + for name, frame in robot_obs.images.items(): + model_input[f"images.{name}"] = frame.data[np.newaxis] + for name, cam in self._cameras.items(): + model_input[f"images.{name}"] = cam.read_latest().data[np.newaxis] + + return model_input + + def _shutdown(self, step: int) -> None: + self._execution.stop() + + remaining = self._action_queue.remaining + drain_limit = min(remaining, int(self._fps)) + for _ in range(drain_limit): + action = self._action_queue.pop() + if action is not None: + self._robot.send_action(action) + time.sleep(1.0 / self._fps) + + logger.info( + "Shutdown complete — %d steps, %d pops, %d holds", + step, + self._action_queue.total_pops, + self._action_queue.total_holds, + ) + + def _invoke_callback(self, method: str, **kwargs: Any) -> Any: # noqa: ANN401 + result = None + for cb in self._callbacks: + fn = getattr(cb, method, None) + if fn is not None: + try: + callback_result = fn(**kwargs) + if callback_result is not None: + result = callback_result + if method == "before_send_action": + kwargs["action"] = callback_result + except Exception: + logger.exception("Callback %s.%s raised", type(cb).__name__, method) + return result diff --git a/src/physicalai/runtime/smoothers.py b/src/physicalai/runtime/smoothers.py new file mode 100644 index 0000000..58b1d14 --- /dev/null +++ b/src/physicalai/runtime/smoothers.py @@ -0,0 +1,71 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Chunk smoothers for runtime action queues.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import override + +import numpy as np + +_NDIM_2 = 2 +_ERR_2D = "remaining and incoming must be 2D arrays" +_ERR_ACTION_DIM = "remaining and incoming must have the same action_dim" + + +class ChunkSmoother(ABC): + """Merges a new action chunk into remaining actions from the previous chunk.""" + + @abstractmethod + def merge(self, remaining: np.ndarray, incoming: np.ndarray, offset: int = 0) -> np.ndarray: + """Merge a previous remainder with a new incoming chunk.""" + raise NotImplementedError + + +class ReplaceSmoother(ChunkSmoother): + """Replace remaining actions with the incoming tail.""" + + @override + def merge(self, remaining: np.ndarray, incoming: np.ndarray, offset: int = 0) -> np.ndarray: + """Return the incoming chunk after skipping the offset.""" + _validate_inputs(remaining, incoming) + return incoming[offset:] + + +class LerpSmoother(ChunkSmoother): + """Blend overlapping actions and append the incoming tail.""" + + def __init__(self, duration_frames: int = 5) -> None: + """Create a smoother with a fallback lerp window.""" + self.duration_frames = duration_frames + + @override + def merge(self, remaining: np.ndarray, incoming: np.ndarray, offset: int = 0) -> np.ndarray: + """Merge chunks using queue-mixer-style linear interpolation. + + Returns: + The blended action chunk. + """ + _validate_inputs(remaining, incoming) + + lerp_dur = max(offset, 1) if offset > 0 else self.duration_frames + incoming = incoming[offset:] + n_remain = len(remaining) + lerp_dur = min(n_remain, lerp_dur) + + weights = np.maximum(1.0 - np.arange(n_remain) / max(lerp_dur, 1), 0.0) + weights = weights[:, np.newaxis] + + n_blend = min(n_remain, len(incoming)) + blended = weights[:n_blend] * remaining[:n_blend] + (1.0 - weights[:n_blend]) * incoming[:n_blend] + + return np.concatenate([blended, incoming[n_blend:]], axis=0).astype(np.float32) + + +def _validate_inputs(remaining: np.ndarray, incoming: np.ndarray) -> None: + if remaining.ndim != _NDIM_2 or incoming.ndim != _NDIM_2: + raise ValueError(_ERR_2D) + if remaining.shape[1] != incoming.shape[1]: + raise ValueError(_ERR_ACTION_DIM) diff --git a/tests/unit/inference/test_action_cursor.py b/tests/unit/inference/test_action_cursor.py deleted file mode 100644 index 3a82b87..0000000 --- a/tests/unit/inference/test_action_cursor.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (C) 2026 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from __future__ import annotations - -import numpy as np -import pytest - -from physicalai.inference.utils.action_cursor import ActionCursor - - -class TestActionCursor: - def test_init_starts_empty_and_repr(self) -> None: - cursor = ActionCursor() - assert cursor.empty is True - assert repr(cursor) == "ActionCursor(buffered=0)" - - def test_push_chunk_2d_pops_in_row_order(self) -> None: - cursor = ActionCursor() - chunk = np.array( - [ - [1.0, 2.0], - [3.0, 4.0], - [5.0, 6.0], - ], - dtype=np.float32, - ) - - cursor.push_chunk(chunk) - - np.testing.assert_array_equal(cursor.pop(), np.array([1.0, 2.0], dtype=np.float32)) - np.testing.assert_array_equal(cursor.pop(), np.array([3.0, 4.0], dtype=np.float32)) - np.testing.assert_array_equal(cursor.pop(), np.array([5.0, 6.0], dtype=np.float32)) - assert cursor.empty is True - - def test_push_chunk_3d_pops_time_slices(self) -> None: - cursor = ActionCursor() - chunk = np.array( - [ - [[1.0, 10.0], [2.0, 20.0], [3.0, 30.0]], - [[4.0, 40.0], [5.0, 50.0], [6.0, 60.0]], - ], - dtype=np.float32, - ) - - cursor.push_chunk(chunk) - - np.testing.assert_array_equal( - cursor.pop(), - np.array([[1.0, 10.0], [4.0, 40.0]], dtype=np.float32), - ) - np.testing.assert_array_equal( - cursor.pop(), - np.array([[2.0, 20.0], [5.0, 50.0]], dtype=np.float32), - ) - np.testing.assert_array_equal( - cursor.pop(), - np.array([[3.0, 30.0], [6.0, 60.0]], dtype=np.float32), - ) - assert cursor.empty is True - - def test_pop_empty_raises_index_error(self) -> None: - cursor = ActionCursor() - with pytest.raises(IndexError, match="ActionCursor is empty; call push_chunk before pop"): - cursor.pop() - - def test_reset_clears_buffered_actions(self) -> None: - cursor = ActionCursor() - cursor.push_chunk(np.array([[1.0], [2.0]], dtype=np.float32)) - - cursor.reset() - - assert cursor.empty is True - with pytest.raises(IndexError): - cursor.pop() - - def test_multiple_pushes_append_in_fifo_order(self) -> None: - cursor = ActionCursor() - first = np.array([[1.0, 1.0], [2.0, 2.0]], dtype=np.float32) - second = np.array( - [ - [[3.0, 3.0], [4.0, 4.0]], - ], - dtype=np.float32, - ) - - cursor.push_chunk(first) - cursor.push_chunk(second) - - np.testing.assert_array_equal(cursor.pop(), np.array([1.0, 1.0], dtype=np.float32)) - np.testing.assert_array_equal(cursor.pop(), np.array([2.0, 2.0], dtype=np.float32)) - np.testing.assert_array_equal(cursor.pop(), np.array([[3.0, 3.0]], dtype=np.float32)) - np.testing.assert_array_equal(cursor.pop(), np.array([[4.0, 4.0]], dtype=np.float32)) - assert cursor.empty is True - - @pytest.mark.parametrize( - "chunk", - [ - np.array([1.0, 2.0], dtype=np.float32), - np.zeros((1, 2, 3, 4), dtype=np.float32), - ], - ) - def test_push_chunk_invalid_ndim_raises_value_error(self, chunk: np.ndarray) -> None: - cursor = ActionCursor() - - with pytest.raises(ValueError, match=r"Chunk must be a 2-D or 3-D array, got ndim="): - cursor.push_chunk(chunk) diff --git a/tests/unit/inference/test_adapters.py b/tests/unit/inference/test_adapters.py index f2be690..42efaba 100644 --- a/tests/unit/inference/test_adapters.py +++ b/tests/unit/inference/test_adapters.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Unit tests for inference adapters shipped with the core ``physicalai`` package. diff --git a/tests/unit/inference/test_model.py b/tests/unit/inference/test_model.py index 77e801b..b52c1de 100644 --- a/tests/unit/inference/test_model.py +++ b/tests/unit/inference/test_model.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025-2026 Intel Corporation +# Copyright (C) 2026 Intel Corporation # SPDX-License-Identifier: Apache-2.0 """Unit tests for InferenceModel and inference runners.""" diff --git a/tests/unit/runtime/__init__.py b/tests/unit/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/runtime/test_action_queue.py b/tests/unit/runtime/test_action_queue.py new file mode 100644 index 0000000..ec60296 --- /dev/null +++ b/tests/unit/runtime/test_action_queue.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import threading + +import numpy as np + +from physicalai.runtime._action_queue import ActionQueue +from physicalai.runtime.smoothers import LerpSmoother, ReplaceSmoother + + +class TestActionQueue: + def test_push_pop_roundtrip(self) -> None: + queue = ActionQueue() + chunk = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32) + queue.push_chunk(chunk) + + actions = [queue.pop() for _ in range(3)] + assert all(a is not None for a in actions) + for i, action in enumerate(actions): + np.testing.assert_array_equal(action, chunk[i]) + + def test_pop_empty_returns_none(self) -> None: + queue = ActionQueue() + assert queue.pop() is None + + def test_consecutive_holds_increment_and_reset(self) -> None: + queue = ActionQueue() + queue.pop() + queue.pop() + assert queue.consecutive_holds == 2 + + queue.push_chunk(np.array([[1.0, 2.0]], dtype=np.float32)) + queue.pop() + assert queue.consecutive_holds == 0 + + def test_total_counters(self) -> None: + queue = ActionQueue() + queue.pop() + queue.pop() + queue.push_chunk(np.array([[1.0], [2.0], [3.0]], dtype=np.float32)) + queue.pop() + queue.pop() + queue.pop() + queue.pop() + + assert queue.total_holds == 3 + assert queue.total_pops == 3 + + def test_remaining_property(self) -> None: + queue = ActionQueue() + assert queue.remaining == 0 + + queue.push_chunk(np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32)) + assert queue.remaining == 2 + + queue.pop() + assert queue.remaining == 1 + + def test_below_threshold(self) -> None: + queue = ActionQueue() + assert queue.below_threshold(1) is True + + queue.push_chunk(np.array([[1.0], [2.0], [3.0]], dtype=np.float32)) + assert queue.below_threshold(4) is True + assert queue.below_threshold(3) is False + assert queue.below_threshold(2) is False + + def test_clear(self) -> None: + queue = ActionQueue() + queue.push_chunk(np.array([[1.0], [2.0]], dtype=np.float32)) + queue.pop() + queue.pop() + queue.pop() + assert queue.consecutive_holds == 1 + + queue.push_chunk(np.array([[3.0], [4.0]], dtype=np.float32)) + queue.clear() + + assert queue.remaining == 0 + assert queue.consecutive_holds == 0 + assert queue.total_holds == 1 + assert queue.total_pops == 2 + + def test_push_with_offset(self) -> None: + queue = ActionQueue() + chunk = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32) + queue.push_chunk(chunk, offset=2) + + assert queue.remaining == 1 + action = queue.pop() + assert action is not None + np.testing.assert_array_equal(action, [5.0, 6.0]) + + def test_default_smoother_is_replace(self) -> None: + queue = ActionQueue() + assert isinstance(queue._smoother, ReplaceSmoother) + + def test_smoother_integration_lerp(self) -> None: + queue = ActionQueue(smoother=LerpSmoother(duration_frames=5)) + first = np.array([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]], dtype=np.float32) + queue.push_chunk(first) + + second = np.array( + [[100.0, 100.0], [110.0, 110.0], [120.0, 120.0], [130.0, 130.0]], + dtype=np.float32, + ) + queue.push_chunk(second) + + first_action = queue.pop() + assert first_action is not None + assert not np.array_equal(first_action, second[0]), "LerpSmoother should blend, not replace" + + def test_smoother_integration_replace(self) -> None: + queue = ActionQueue(smoother=ReplaceSmoother()) + first = np.array([[1.0], [2.0], [3.0]], dtype=np.float32) + queue.push_chunk(first) + + second = np.array([[10.0], [20.0]], dtype=np.float32) + queue.push_chunk(second) + + assert queue.remaining == 2 + action = queue.pop() + assert action is not None + np.testing.assert_array_equal(action, [10.0]) + + +class TestActionQueueThreadSafety: + def test_concurrent_push_pop(self) -> None: + queue = ActionQueue() + errors: list[Exception] = [] + action_dim = 4 + n_pushes = 100 + chunk_size = 10 + + def pusher() -> None: + try: + for i in range(n_pushes): + chunk = np.full((chunk_size, action_dim), float(i), dtype=np.float32) + queue.push_chunk(chunk) + except Exception as exc: + errors.append(exc) + + def popper(stop_event: threading.Event) -> None: + try: + while not stop_event.is_set(): + queue.pop() + except Exception as exc: + errors.append(exc) + + stop = threading.Event() + push_thread = threading.Thread(target=pusher) + pop_thread = threading.Thread(target=popper, args=(stop,)) + + push_thread.start() + pop_thread.start() + + push_thread.join(timeout=5.0) + stop.set() + pop_thread.join(timeout=5.0) + + assert not push_thread.is_alive(), "Push thread deadlocked" + assert not pop_thread.is_alive(), "Pop thread deadlocked" + assert not errors, f"Thread errors: {errors}" diff --git a/tests/unit/runtime/test_execution.py b/tests/unit/runtime/test_execution.py new file mode 100644 index 0000000..08e354e --- /dev/null +++ b/tests/unit/runtime/test_execution.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from physicalai.runtime._action_queue import ActionQueue +from physicalai.runtime.execution import AsyncExecution, SyncExecution, WorkerDiedError + + +def _make_mock_model(chunk: np.ndarray | None = None) -> MagicMock: + model = MagicMock() + if chunk is None: + chunk = np.random.randn(6, 4).astype(np.float32) + model.predict_action_chunk.return_value = chunk + return model + + +class TestSyncExecution: + def test_warmup_seeds_queue_and_discovers_chunk_size(self) -> None: + chunk = np.random.randn(8, 3).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = SyncExecution() + obs = {"state": np.zeros(3)} + + ex.start(model, queue) + ex.warmup(obs) + + assert ex.chunk_size == 8 + assert queue.remaining == 8 + model.predict_action_chunk.assert_called_once_with(obs) + + def test_maybe_request_refills_when_empty(self) -> None: + chunk = np.random.randn(4, 2).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = SyncExecution() + obs = {"state": np.zeros(2)} + + ex.start(model, queue) + ex.warmup(obs) + + for _ in range(4): + queue.pop() + assert queue.remaining == 0 + + model.predict_action_chunk.reset_mock() + model.predict_action_chunk.return_value = chunk + ex.maybe_request(obs) + + assert queue.remaining == 4 + model.predict_action_chunk.assert_called_once() + + def test_maybe_request_does_not_refill_when_nonempty(self) -> None: + chunk = np.random.randn(4, 2).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = SyncExecution() + obs = {"state": np.zeros(2)} + + ex.start(model, queue) + ex.warmup(obs) + queue.pop() + + model.predict_action_chunk.reset_mock() + ex.maybe_request(obs) + model.predict_action_chunk.assert_not_called() + + def test_stop_is_noop(self) -> None: + ex = SyncExecution() + ex.stop() + + +class TestAsyncExecution: + def test_start_spawns_thread(self) -> None: + model = _make_mock_model() + queue = ActionQueue() + ex = AsyncExecution(fps=10) + + ex.start(model, queue) + assert ex.alive is True + ex.stop() + + def test_warmup_seeds_queue(self) -> None: + chunk = np.random.randn(6, 4).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = AsyncExecution(fps=10) + + ex.start(model, queue) + obs = {"state": np.zeros(4)} + ex.warmup(obs) + + assert ex.chunk_size == 6 + assert queue.remaining == 6 + ex.stop() + + def test_maybe_request_submits_when_below_threshold(self) -> None: + chunk = np.random.randn(10, 2).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = AsyncExecution(threshold=0.5, fps=10) + + ex.start(model, queue) + obs = {"state": np.zeros(2)} + ex.warmup(obs) + + for _ in range(10): + queue.pop() + + model.predict_action_chunk.reset_mock() + model.predict_action_chunk.return_value = chunk + ex.maybe_request(obs) + + time.sleep(0.3) + assert queue.remaining > 0 + ex.stop() + + def test_defensive_copy_of_observation(self) -> None: + chunk = np.random.randn(4, 2).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = AsyncExecution(threshold=0.5, fps=10) + + ex.start(model, queue) + obs = {"state": np.zeros(2)} + ex.warmup(obs) + for _ in range(4): + queue.pop() + + model.predict_action_chunk.reset_mock() + original_state = np.array([1.0, 2.0]) + obs_to_submit = {"state": original_state.copy()} + ex.maybe_request(obs_to_submit) + obs_to_submit["state"][:] = 99.0 + + time.sleep(0.3) + if model.predict_action_chunk.called: + submitted = model.predict_action_chunk.call_args[0][0]["state"] + np.testing.assert_array_equal(submitted, original_state) + ex.stop() + + def test_worker_death_raises_error(self) -> None: + model = _make_mock_model() + model.predict_action_chunk.side_effect = [ + np.random.randn(4, 2).astype(np.float32), + ValueError("model exploded"), + ] + queue = ActionQueue() + ex = AsyncExecution(threshold=0.5, fps=10) + + ex.start(model, queue) + obs = {"state": np.zeros(2)} + ex.warmup(obs) + + for _ in range(4): + queue.pop() + + ex.maybe_request(obs) + time.sleep(0.5) + + with pytest.raises(WorkerDiedError, match="model exploded"): + ex.maybe_request(obs) + + ex.stop() + + def test_stop_signals_and_joins(self) -> None: + model = _make_mock_model() + queue = ActionQueue() + ex = AsyncExecution(fps=10) + + ex.start(model, queue) + assert ex.alive is True + + ex.stop() + assert ex._thread is not None + assert not ex._thread.is_alive() + + def test_health_properties(self) -> None: + chunk = np.random.randn(4, 2).astype(np.float32) + model = _make_mock_model(chunk) + queue = ActionQueue() + ex = AsyncExecution(fps=10) + + ex.start(model, queue) + obs = {"state": np.zeros(2)} + ex.warmup(obs) + + assert ex.inference_count == 0 + + for _ in range(4): + queue.pop() + + model.predict_action_chunk.reset_mock() + model.predict_action_chunk.return_value = chunk + ex.maybe_request(obs) + time.sleep(0.3) + + assert ex.inference_count >= 1 + ex.stop() + + def test_watchdog_triggers_force_reset(self) -> None: + chunk = np.random.randn(4, 2).astype(np.float32) + model = _make_mock_model(chunk) + + call_count = 0 + original_side_effect = None + + def slow_predict(obs: dict) -> np.ndarray: + nonlocal call_count + call_count += 1 + if call_count == 2: + time.sleep(100) + return chunk + + model.predict_action_chunk.side_effect = slow_predict + queue = ActionQueue() + ex = AsyncExecution(threshold=0.5, fps=10, watchdog_timeout_s=0.1) + + ex.start(model, queue) + obs = {"state": np.zeros(2)} + ex.warmup(obs) + + for _ in range(4): + queue.pop() + ex.maybe_request(obs) + + time.sleep(0.3) + ex.maybe_request(obs) + + ex.stop() diff --git a/tests/unit/runtime/test_runtime.py b/tests/unit/runtime/test_runtime.py new file mode 100644 index 0000000..45b7bf8 --- /dev/null +++ b/tests/unit/runtime/test_runtime.py @@ -0,0 +1,259 @@ +# Copyright (C) 2026 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for physicalai.runtime.runtime.""" + +from __future__ import annotations + +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from physicalai.runtime._action_queue import ActionQueue +from physicalai.runtime.execution import SyncExecution, WorkerDiedError +from physicalai.runtime.runtime import PolicyRuntime, RunStats + +from physicalai.capture import Frame + + +@dataclass +class FakeRobotObservation: + joint_positions: np.ndarray + timestamp: float + sensor_data: dict[str, np.ndarray] | None + images: dict | None + + +def _make_mock_robot(joint_positions: np.ndarray | None = None) -> MagicMock: + robot = MagicMock() + if joint_positions is None: + joint_positions = np.array([0.1, 0.2, 0.3], dtype=np.float32) + robot.get_observation.return_value = FakeRobotObservation( + joint_positions=joint_positions, + timestamp=time.monotonic(), + sensor_data=None, + images=None, + ) + return robot + + +def _make_mock_model(chunk_size: int = 4, action_dim: int = 3) -> MagicMock: + model = MagicMock() + model.predict_action_chunk.return_value = np.random.randn(chunk_size, action_dim).astype(np.float32) + return model + + +def _make_runtime(**kwargs: Any) -> PolicyRuntime: + """Create a PolicyRuntime with _connected=True for testing.""" + runtime = PolicyRuntime(**kwargs) + runtime._connected = True # noqa: SLF001 + return runtime + + +def _exhaustible_side_effect( + initial_chunks: list[np.ndarray], + action_dim: int = 2, +) -> Callable[[Any], np.ndarray]: + """Return *initial_chunks* in order, then empty arrays forever. + + Prevents StopIteration when SyncExecution refills more times than + the test expected. + """ + it = iter(initial_chunks) + empty = np.empty((0, action_dim), dtype=np.float32) + return lambda _obs: next(it, empty) + + +class TestPolicyRuntime: + def test_full_loop_with_duration(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model(chunk_size=20, action_dim=3) + execution = SyncExecution() + queue = ActionQueue() + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + action_queue=queue, + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + stats = runtime.run(duration_s=0.5) + + assert stats.steps == 5 + assert robot.send_action.call_count >= 5 + + def test_hold_fallback_when_queue_empty(self) -> None: + robot = _make_mock_robot() + chunk = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32) + model = _make_mock_model() + model.predict_action_chunk.side_effect = _exhaustible_side_effect([chunk], action_dim=2) + + execution = SyncExecution() + queue = ActionQueue() + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + action_queue=queue, + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + stats = runtime.run(duration_s=0.4) + + assert stats.steps == 4 + assert robot.send_action.call_count == 4 + + def test_worker_died_error_propagation(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model(chunk_size=4) + + execution = MagicMock() + execution.start = MagicMock() + execution.warmup = MagicMock() + execution.maybe_request.side_effect = WorkerDiedError("dead") + execution.stop = MagicMock() + + queue = ActionQueue() + queue.push_chunk(np.random.randn(4, 3).astype(np.float32)) + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + action_queue=queue, + ) + + with patch("physicalai.runtime.runtime.time") as mock_time, pytest.raises(WorkerDiedError, match="dead"): + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + runtime.run(duration_s=1.0) + + def test_shutdown_does_not_disconnect(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model() + execution = SyncExecution() + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + runtime.run(duration_s=0.1) + + robot.disconnect.assert_not_called() + + def test_run_raises_if_not_connected(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model() + execution = SyncExecution() + + runtime = PolicyRuntime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + ) + + with pytest.raises(RuntimeError, match="connect"): + runtime.run(duration_s=1.0) + + +class TestRuntimeCallback: + def test_before_send_action_called(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model(chunk_size=10) + execution = SyncExecution() + callback = MagicMock() + callback.before_send_action.return_value = None + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + callbacks=[callback], + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + runtime.run(duration_s=0.2) + + assert callback.before_send_action.call_count == 2 + + def test_callback_raises_does_not_crash_loop(self) -> None: + robot = _make_mock_robot() + model = _make_mock_model(chunk_size=10) + execution = SyncExecution() + bad_callback = MagicMock() + bad_callback.before_send_action.side_effect = RuntimeError("oops") + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + callbacks=[bad_callback], + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + stats = runtime.run(duration_s=0.3) + + assert stats.steps == 3 + + def test_on_hold_called_when_queue_empty(self) -> None: + robot = _make_mock_robot() + chunk = np.array([[1.0, 2.0]], dtype=np.float32) + model = _make_mock_model() + model.predict_action_chunk.side_effect = _exhaustible_side_effect([chunk], action_dim=2) + + execution = SyncExecution() + callback = MagicMock() + callback.before_send_action.return_value = None + callback.on_hold.return_value = None + + runtime = _make_runtime( + robot=robot, + model=model, + execution=execution, + fps=10.0, + callbacks=[callback], + ) + + with patch("physicalai.runtime.runtime.time") as mock_time: + mock_time.perf_counter.return_value = 0.0 + mock_time.sleep = MagicMock() + runtime.run(duration_s=0.3) + + assert callback.on_hold.call_count >= 1 + + +class TestRunStats: + def test_fields_populated(self) -> None: + stats = RunStats(steps=10, total_pops=8, total_holds=2, inference_count=3) + assert stats.steps == 10 + assert stats.total_pops == 8 + assert stats.total_holds == 2 + assert stats.inference_count == 3 diff --git a/tests/unit/runtime/test_smoothers.py b/tests/unit/runtime/test_smoothers.py new file mode 100644 index 0000000..d5c02b0 --- /dev/null +++ b/tests/unit/runtime/test_smoothers.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import numpy as np +import pytest + +from physicalai.runtime.smoothers import LerpSmoother, ReplaceSmoother + + +class TestReplaceSmoother: + def test_merge_drops_remaining_and_returns_incoming_offset(self) -> None: + smoother = ReplaceSmoother() + remaining = np.array([[1.0, 1.0], [1.0, 1.0]], dtype=np.float32) + incoming = np.array([[2.0, 2.0], [3.0, 3.0], [4.0, 4.0]], dtype=np.float32) + + result = smoother.merge(remaining, incoming, offset=1) + + np.testing.assert_array_equal( + result, + np.array([[3.0, 3.0], [4.0, 4.0]], dtype=np.float32), + ) + + def test_offset_zero_returns_all_incoming(self) -> None: + smoother = ReplaceSmoother() + remaining = np.array([[1.0, 1.0]], dtype=np.float32) + incoming = np.array([[2.0, 2.0], [3.0, 3.0]], dtype=np.float32) + + result = smoother.merge(remaining, incoming, offset=0) + + np.testing.assert_array_equal(result, incoming) + + def test_offset_beyond_incoming_returns_empty_array(self) -> None: + smoother = ReplaceSmoother() + remaining = np.array([[1.0, 1.0]], dtype=np.float32) + incoming = np.array([[2.0, 2.0]], dtype=np.float32) + + result = smoother.merge(remaining, incoming, offset=5) + + assert result.shape == (0, 2) + assert result.dtype == incoming.dtype + + +class TestLerpSmoother: + def test_lerp_weights_match_queue_mixer_formula(self) -> None: + smoother = LerpSmoother(duration_frames=5) + remaining = np.array([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]], dtype=np.float32) + incoming = np.array( + [[100.0, 100.0], [110.0, 110.0], [120.0, 120.0], [130.0, 130.0]], + dtype=np.float32, + ) + + result = smoother.merge(remaining, incoming, offset=0) + + expected = np.array( + [[10.0, 10.0], [50.0, 50.0], [90.0, 90.0], [130.0, 130.0]], + dtype=np.float32, + ) + np.testing.assert_array_equal(result, expected) + + def test_offset_aware_duration(self) -> None: + smoother = LerpSmoother(duration_frames=99) + remaining = np.array([[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]], dtype=np.float32) + incoming = np.array([[4.0, 4.0], [5.0, 5.0], [6.0, 6.0], [7.0, 7.0]], dtype=np.float32) + + result = smoother.merge(remaining, incoming, offset=2) + + expected = np.array([[1.0, 1.0], [4.5, 4.5]], dtype=np.float32) + np.testing.assert_array_equal(result, expected) + + def test_edge_cases_empty_remaining_single_element_and_offset_beyond_chunk(self) -> None: + smoother = LerpSmoother(duration_frames=5) + + empty_result = smoother.merge( + np.empty((0, 2), dtype=np.float32), + np.array([[1.0, 1.0]], dtype=np.float32), + offset=0, + ) + np.testing.assert_array_equal(empty_result, np.array([[1.0, 1.0]], dtype=np.float32)) + + single_result = smoother.merge( + np.array([[1.0, 1.0]], dtype=np.float32), + np.array([[2.0, 2.0]], dtype=np.float32), + offset=0, + ) + np.testing.assert_array_equal(single_result, np.array([[1.0, 1.0]], dtype=np.float32)) + + offset_beyond_result = smoother.merge( + np.array([[1.0, 1.0]], dtype=np.float32), + np.array([[2.0, 2.0]], dtype=np.float32), + offset=5, + ) + assert offset_beyond_result.shape == (0, 2) + + def test_exact_numerical_blend_values(self) -> None: + smoother = LerpSmoother(duration_frames=5) + remaining = np.array([[1.0, 1.0], [1.0, 1.0], [1.0, 1.0]], dtype=np.float32) + incoming = np.array( + [[2.0, 2.0], [2.0, 2.0], [2.0, 2.0], [2.0, 2.0]], + dtype=np.float32, + ) + + result = smoother.merge(remaining, incoming, offset=1) + + expected = np.array( + [[1.0, 1.0], [2.0, 2.0], [2.0, 2.0]], + dtype=np.float32, + ) + np.testing.assert_array_equal(result, expected) + + def test_merge_is_stateless_for_same_arguments(self) -> None: + smoother = LerpSmoother(duration_frames=5) + remaining = np.array([[1.0, 1.0], [2.0, 2.0]], dtype=np.float32) + incoming = np.array([[3.0, 3.0], [4.0, 4.0], [5.0, 5.0]], dtype=np.float32) + + first = smoother.merge(remaining, incoming, offset=1) + second = smoother.merge(remaining, incoming, offset=1) + + np.testing.assert_array_equal(first, second) + + +def test_input_validation_mismatched_action_dim_raises_value_error() -> None: + smoother = ReplaceSmoother() + remaining = np.array([[1.0, 1.0]], dtype=np.float32) + incoming = np.array([[2.0, 2.0, 2.0]], dtype=np.float32) + + with pytest.raises(ValueError, match="action_dim"): + smoother.merge(remaining, incoming, offset=0)