Skip to content

feat(runtime): callbacks, telemetry via rerun, fault tolerance#119

Open
maxxgx wants to merge 51 commits into
mainfrom
max/runtime-cli-telemetry
Open

feat(runtime): callbacks, telemetry via rerun, fault tolerance#119
maxxgx wants to merge 51 commits into
mainfrom
max/runtime-cli-telemetry

Conversation

@maxxgx
Copy link
Copy Markdown
Contributor

@maxxgx maxxgx commented May 17, 2026

Summary

Why

Validation

Breaking changes

  • None

Related issues

  • Closes #

@maxxgx maxxgx requested a review from a team as a code owner May 17, 2026 20:11
@maxxgx maxxgx force-pushed the max/runtime-cli-telemetry branch from 6c1c3e3 to 57d3126 Compare May 17, 2026 20:15
@maxxgx maxxgx force-pushed the max/runtime-cli-telemetry branch from 57d3126 to 9b6567d Compare May 17, 2026 20:22
Comment thread src/physicalai/cli/__init__.py Outdated
Comment thread src/physicalai/runtime/observer/__init__.py
Comment thread src/physicalai/runtime/runtime.py Outdated
@maxxgx maxxgx force-pushed the max/runtime-cli-telemetry branch from ce16d44 to 4267c50 Compare May 19, 2026 18:51
maxxgx and others added 7 commits May 19, 2026 22:10
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@maxxgx maxxgx force-pushed the max/runtime-cli-telemetry branch from b15bbdd to 79cd93d Compare May 19, 2026 22:06
@maxxgx maxxgx changed the title feat(runtime): cli, telemetry (zenoh), fault tolerance feat(runtime): callbacks, telemetry via rerun, fault tolerance May 20, 2026
@maxxgx maxxgx force-pushed the max/runtime-cli-telemetry branch from 8112d57 to 1e45ec5 Compare May 20, 2026 13:56
Base automatically changed from max/runtime to main May 21, 2026 12:02
Copilot AI review requested due to automatic review settings May 21, 2026 12:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces a new physicalai.runtime subsystem for running trained policies in a control loop with pluggable execution strategies, callbacks, telemetry/observer tooling, and improved fault tolerance. It also updates inference action buffering and image preprocessing, adds runtime examples, and expands unit test coverage around the new runtime components.

Changes:

  • Add PolicyRuntime control loop with action queue smoothing, callback bus, sync/async inference execution, telemetry emitter, and fault-tolerance paths.
  • Add telemetry observer utilities (subscriber, console handler, recorder handler, CLI) plus an optional Rerun-based visualization callback.
  • Update inference model action dispensing to use an internal deque (removing ActionCursor) and adjust image preprocessing; add examples and extensive unit tests.

Reviewed changes

Copilot reviewed 25 out of 26 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
uv.lock Adds locked dependencies for the new optional observer/Rerun functionality.
pyproject.toml Adds observer-rerun extra for rerun-sdk.
src/physicalai/runtime/init.py Exposes new runtime public API (runtime, execution, queue, events, callbacks).
src/physicalai/runtime/_action_queue.py New thread-safe action queue with chunk smoothing integration.
src/physicalai/runtime/_callback_bus.py New internal callback dispatcher with inference-event draining.
src/physicalai/runtime/_telemetry.py New Zenoh/msgpack telemetry emitter with numpy encoding.
src/physicalai/runtime/callbacks.py Adds runtime callbacks: console/jsonl/async wrapper + RerunCallback.
src/physicalai/runtime/events.py Defines Tick/Inference/Lifecycle event dataclasses.
src/physicalai/runtime/execution.py Adds SyncExecution and AsyncExecution scheduling strategies with health monitoring.
src/physicalai/runtime/runtime.py Implements PolicyRuntime, connect/disconnect lifecycle, fault tolerance, and stats.
src/physicalai/runtime/smoothers.py Adds ReplaceSmoother and LerpSmoother for chunk merge behavior.
src/physicalai/runtime/observer/init.py Exposes telemetry observer entry points.
src/physicalai/runtime/observer/main.py Adds python -m physicalai.runtime.observer CLI.
src/physicalai/runtime/observer/_console.py Adds console handler for live telemetry display.
src/physicalai/runtime/observer/_recorder.py Adds JSONL recorder handler with numpy JSON encoding.
src/physicalai/runtime/observer/_subscriber.py Adds Zenoh subscriber that decodes telemetry payloads (including numpy).
src/physicalai/inference/model.py Replaces ActionCursor usage with an internal deque; tightens chunk shape handling.
src/physicalai/inference/preprocessors/pi05.py Adjusts image dtype normalization and enforces BCHW output.
src/physicalai/robot/init.py Extends exports (RobotObservation) and keeps lazy-loading robot implementations.
src/physicalai/capture/factory.py Extends camera factory with shared option and adds interactive camera selection.
src/physicalai/capture/init.py Exports select_cameras_interactive.
examples/runtime/utils.py Adds shared helpers for building robots and parsing camera specs for runtime examples.
examples/runtime/async_inference.py New example showcasing async runtime execution + optional Rerun visualization.
tests/unit/runtime/init.py Adds runtime unit test package marker.
tests/unit/runtime/test_action_queue.py New unit tests for ActionQueue behavior and thread-safety.
tests/unit/runtime/test_execution.py New unit tests for sync/async execution strategies and worker failure handling.
tests/unit/runtime/test_fault_tolerance.py New unit tests for resilient observe/send/warmup/shutdown paths.
tests/unit/runtime/test_observer.py New unit tests for observer console/recorder handlers.
tests/unit/runtime/test_rerun_callback.py New unit tests for RerunCallback behavior via mocked rerun module.
tests/unit/runtime/test_runtime.py New unit tests for PolicyRuntime loop behavior and callbacks.
tests/unit/runtime/test_smoothers.py New unit tests for action chunk smoothers.
tests/unit/runtime/test_telemetry.py New unit tests for telemetry packing behavior and callback bus dispatch.
tests/unit/inference/test_model.py Copyright header update.
tests/unit/inference/test_adapters.py Copyright header update.
tests/unit/inference/test_action_cursor.py Removes tests for deleted ActionCursor.
src/physicalai/inference/utils/action_cursor.py Deletes ActionCursor implementation.
src/physicalai/inference/utils/init.py Deletes inference utils export of ActionCursor.
src/physicalai/inference/adapters/openvino.py Copyright header update.
src/physicalai/inference/adapters/_discovery.py Copyright header update.
src/physicalai/inference/adapters/init.py Copyright header update.
src/physicalai/inference/init.py Copyright header update.
Comments suppressed due to low confidence (2)

src/physicalai/runtime/execution.py:153

  • AsyncExecution accepts max_consecutive_holds but the value is never read anywhere. Either enforce it (e.g., raise/emit lifecycle when holds exceed the threshold) or remove the parameter/state to avoid a misleading API surface.
        max_consecutive_holds: int | None = None,
    ) -> None:
        self._threshold_frac = threshold
        self._fps = fps
        self._watchdog_timeout_s = watchdog_timeout_s
        self._max_consecutive_holds = max_consecutive_holds or 3 * fps

src/physicalai/runtime/_action_queue.py:71

  • ActionQueue advertises thread-safety, but consecutive_holds/total_holds/total_pops properties read mutable state without acquiring the lock while push/pop mutate them under the lock. Add locking (or atomics) around these getters to avoid data races when the queue is used/monitored from multiple threads.
    @property
    def consecutive_holds(self) -> int:
        return self._consecutive_holds

    @property
    def total_holds(self) -> int:
        return self._total_holds

    @property
    def total_pops(self) -> int:
        return self._total_pops


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

return float(obj)
return obj

return self._msgpack.packb(payload, default=_default)
)
robot_obs = self._last_robot_obs
else:
self._consecutive_error_ticks = 0
Comment on lines +71 to +76
def __init__(self, fps: int = 30) -> None: # noqa: D107
self._model: InferenceModel | None = None
self._queue: ActionQueue | None = None
self._chunk_size: int = 0
self._fps = fps
self._inference_count: int = 0
Comment on lines +28 to +31
except ImportError:
raise SystemExit(1) from None

subscriber = TelemetrySubscriber(session_id=args.session_id)
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 26 changed files in this pull request and generated 8 comments.

Comments suppressed due to low confidence (1)

src/physicalai/runtime/runtime.py:423

  • On successful send_action(), _consecutive_error_ticks is reset to 0. Because the same counter is also used for observation failures, this reset can interfere with observation-failure escalation (and vice versa). Use independent counters (e.g., _consecutive_obs_error_ticks / _consecutive_send_error_ticks) or otherwise avoid cross-resetting unrelated failure modes.
            else:
                self._consecutive_error_ticks = 0
                return

Comment on lines +380 to +383
else:
self._consecutive_error_ticks = 0
self._last_robot_obs = robot_obs

Comment on lines +197 to 200
self._session_id = uuid.uuid4().hex[:8]
self._execution.set_bus(self._bus, self._session_id)

self._execution.start(self._model, self._action_queue)
Comment on lines 105 to +111
if self._queue.below_threshold(1):
t0 = time.perf_counter()
actions = self._model.predict_action_chunk(observation)
self._queue.push_chunk(actions, offset=0)
latency = time.perf_counter() - t0
offset = int(latency * self._fps)
self._queue.push_chunk(actions, offset=offset)
self._inference_count += 1
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
return obj
Comment on lines +26 to +31
try:
from physicalai.runtime.observer._subscriber import TelemetrySubscriber # noqa: PLC0415, PLC2701
except ImportError:
raise SystemExit(1) from None

subscriber = TelemetrySubscriber(session_id=args.session_id)
from __future__ import annotations

import argparse
import os
Comment on lines +286 to +289
event = LifecycleEvent(session_id="t", timestamp=0.0, event="start", metadata={})
cb.on_lifecycle(event)
time.sleep(0.1)
inner.on_lifecycle.assert_called_once_with(event)
Comment on lines +28 to +31

Thread safety: only ``emit_inference`` is called from the inference thread.
It appends to a bounded deque (CPython atomic). All other methods run on
the control thread.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants