feat(runtime): callbacks, telemetry via rerun, fault tolerance#119
feat(runtime): callbacks, telemetry via rerun, fault tolerance#119maxxgx wants to merge 51 commits into
Conversation
6c1c3e3 to
57d3126
Compare
57d3126 to
9b6567d
Compare
…tion() and predict_action_chunk()
ce16d44 to
4267c50
Compare
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
b15bbdd to
79cd93d
Compare
8112d57 to
1e45ec5
Compare
There was a problem hiding this comment.
Pull request overview
Introduces a new physicalai.runtime subsystem for running trained policies in a control loop with pluggable execution strategies, callbacks, telemetry/observer tooling, and improved fault tolerance. It also updates inference action buffering and image preprocessing, adds runtime examples, and expands unit test coverage around the new runtime components.
Changes:
- Add
PolicyRuntimecontrol loop with action queue smoothing, callback bus, sync/async inference execution, telemetry emitter, and fault-tolerance paths. - Add telemetry observer utilities (subscriber, console handler, recorder handler, CLI) plus an optional Rerun-based visualization callback.
- Update inference model action dispensing to use an internal deque (removing
ActionCursor) and adjust image preprocessing; add examples and extensive unit tests.
Reviewed changes
Copilot reviewed 25 out of 26 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Adds locked dependencies for the new optional observer/Rerun functionality. |
| pyproject.toml | Adds observer-rerun extra for rerun-sdk. |
| src/physicalai/runtime/init.py | Exposes new runtime public API (runtime, execution, queue, events, callbacks). |
| src/physicalai/runtime/_action_queue.py | New thread-safe action queue with chunk smoothing integration. |
| src/physicalai/runtime/_callback_bus.py | New internal callback dispatcher with inference-event draining. |
| src/physicalai/runtime/_telemetry.py | New Zenoh/msgpack telemetry emitter with numpy encoding. |
| src/physicalai/runtime/callbacks.py | Adds runtime callbacks: console/jsonl/async wrapper + RerunCallback. |
| src/physicalai/runtime/events.py | Defines Tick/Inference/Lifecycle event dataclasses. |
| src/physicalai/runtime/execution.py | Adds SyncExecution and AsyncExecution scheduling strategies with health monitoring. |
| src/physicalai/runtime/runtime.py | Implements PolicyRuntime, connect/disconnect lifecycle, fault tolerance, and stats. |
| src/physicalai/runtime/smoothers.py | Adds ReplaceSmoother and LerpSmoother for chunk merge behavior. |
| src/physicalai/runtime/observer/init.py | Exposes telemetry observer entry points. |
| src/physicalai/runtime/observer/main.py | Adds python -m physicalai.runtime.observer CLI. |
| src/physicalai/runtime/observer/_console.py | Adds console handler for live telemetry display. |
| src/physicalai/runtime/observer/_recorder.py | Adds JSONL recorder handler with numpy JSON encoding. |
| src/physicalai/runtime/observer/_subscriber.py | Adds Zenoh subscriber that decodes telemetry payloads (including numpy). |
| src/physicalai/inference/model.py | Replaces ActionCursor usage with an internal deque; tightens chunk shape handling. |
| src/physicalai/inference/preprocessors/pi05.py | Adjusts image dtype normalization and enforces BCHW output. |
| src/physicalai/robot/init.py | Extends exports (RobotObservation) and keeps lazy-loading robot implementations. |
| src/physicalai/capture/factory.py | Extends camera factory with shared option and adds interactive camera selection. |
| src/physicalai/capture/init.py | Exports select_cameras_interactive. |
| examples/runtime/utils.py | Adds shared helpers for building robots and parsing camera specs for runtime examples. |
| examples/runtime/async_inference.py | New example showcasing async runtime execution + optional Rerun visualization. |
| tests/unit/runtime/init.py | Adds runtime unit test package marker. |
| tests/unit/runtime/test_action_queue.py | New unit tests for ActionQueue behavior and thread-safety. |
| tests/unit/runtime/test_execution.py | New unit tests for sync/async execution strategies and worker failure handling. |
| tests/unit/runtime/test_fault_tolerance.py | New unit tests for resilient observe/send/warmup/shutdown paths. |
| tests/unit/runtime/test_observer.py | New unit tests for observer console/recorder handlers. |
| tests/unit/runtime/test_rerun_callback.py | New unit tests for RerunCallback behavior via mocked rerun module. |
| tests/unit/runtime/test_runtime.py | New unit tests for PolicyRuntime loop behavior and callbacks. |
| tests/unit/runtime/test_smoothers.py | New unit tests for action chunk smoothers. |
| tests/unit/runtime/test_telemetry.py | New unit tests for telemetry packing behavior and callback bus dispatch. |
| tests/unit/inference/test_model.py | Copyright header update. |
| tests/unit/inference/test_adapters.py | Copyright header update. |
| tests/unit/inference/test_action_cursor.py | Removes tests for deleted ActionCursor. |
| src/physicalai/inference/utils/action_cursor.py | Deletes ActionCursor implementation. |
| src/physicalai/inference/utils/init.py | Deletes inference utils export of ActionCursor. |
| src/physicalai/inference/adapters/openvino.py | Copyright header update. |
| src/physicalai/inference/adapters/_discovery.py | Copyright header update. |
| src/physicalai/inference/adapters/init.py | Copyright header update. |
| src/physicalai/inference/init.py | Copyright header update. |
Comments suppressed due to low confidence (2)
src/physicalai/runtime/execution.py:153
- AsyncExecution accepts max_consecutive_holds but the value is never read anywhere. Either enforce it (e.g., raise/emit lifecycle when holds exceed the threshold) or remove the parameter/state to avoid a misleading API surface.
max_consecutive_holds: int | None = None,
) -> None:
self._threshold_frac = threshold
self._fps = fps
self._watchdog_timeout_s = watchdog_timeout_s
self._max_consecutive_holds = max_consecutive_holds or 3 * fps
src/physicalai/runtime/_action_queue.py:71
- ActionQueue advertises thread-safety, but consecutive_holds/total_holds/total_pops properties read mutable state without acquiring the lock while push/pop mutate them under the lock. Add locking (or atomics) around these getters to avoid data races when the queue is used/monitored from multiple threads.
@property
def consecutive_holds(self) -> int:
return self._consecutive_holds
@property
def total_holds(self) -> int:
return self._total_holds
@property
def total_pops(self) -> int:
return self._total_pops
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return float(obj) | ||
| return obj | ||
|
|
||
| return self._msgpack.packb(payload, default=_default) |
| ) | ||
| robot_obs = self._last_robot_obs | ||
| else: | ||
| self._consecutive_error_ticks = 0 |
| def __init__(self, fps: int = 30) -> None: # noqa: D107 | ||
| self._model: InferenceModel | None = None | ||
| self._queue: ActionQueue | None = None | ||
| self._chunk_size: int = 0 | ||
| self._fps = fps | ||
| self._inference_count: int = 0 |
| except ImportError: | ||
| raise SystemExit(1) from None | ||
|
|
||
| subscriber = TelemetrySubscriber(session_id=args.session_id) |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 26 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
src/physicalai/runtime/runtime.py:423
- On successful
send_action(),_consecutive_error_ticksis reset to 0. Because the same counter is also used for observation failures, this reset can interfere with observation-failure escalation (and vice versa). Use independent counters (e.g.,_consecutive_obs_error_ticks/_consecutive_send_error_ticks) or otherwise avoid cross-resetting unrelated failure modes.
else:
self._consecutive_error_ticks = 0
return
| else: | ||
| self._consecutive_error_ticks = 0 | ||
| self._last_robot_obs = robot_obs | ||
|
|
| self._session_id = uuid.uuid4().hex[:8] | ||
| self._execution.set_bus(self._bus, self._session_id) | ||
|
|
||
| self._execution.start(self._model, self._action_queue) |
| if self._queue.below_threshold(1): | ||
| t0 = time.perf_counter() | ||
| actions = self._model.predict_action_chunk(observation) | ||
| self._queue.push_chunk(actions, offset=0) | ||
| latency = time.perf_counter() - t0 | ||
| offset = int(latency * self._fps) | ||
| self._queue.push_chunk(actions, offset=offset) | ||
| self._inference_count += 1 |
| return int(obj) | ||
| if isinstance(obj, np.floating): | ||
| return float(obj) | ||
| return obj |
| try: | ||
| from physicalai.runtime.observer._subscriber import TelemetrySubscriber # noqa: PLC0415, PLC2701 | ||
| except ImportError: | ||
| raise SystemExit(1) from None | ||
|
|
||
| subscriber = TelemetrySubscriber(session_id=args.session_id) |
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| import os |
| event = LifecycleEvent(session_id="t", timestamp=0.0, event="start", metadata={}) | ||
| cb.on_lifecycle(event) | ||
| time.sleep(0.1) | ||
| inner.on_lifecycle.assert_called_once_with(event) |
|
|
||
| Thread safety: only ``emit_inference`` is called from the inference thread. | ||
| It appends to a bounded deque (CPython atomic). All other methods run on | ||
| the control thread. |
Summary
Why
Validation
Breaking changes
Related issues