Runtime package for deploying robot policies trained with Physical AI Studio
Installation • Camera API • Robot API • Inference • Docs
Physical AI Runtime provides the deployment-side components for running trained policies on real hardware. It handles camera capture, robot control, and policy inference with a unified API that works across different hardware vendors.
Key Features:
- Unified Camera API — Same interface for UVC, RealSense, Basler, and IP cameras
- Robot Protocol — Structural typing for any robot; no inheritance required
- Inference Engine — Load exported policies from Studio with auto-detected backends
- Policy Runtime — Control loop with observation building and action dispatch
pip install physicalaiWith hardware-specific extras:
pip install physicalai[realsense] # Intel RealSense cameras
pip install physicalai[basler] # Basler industrial cameras
pip install physicalai[so101] # SO-101 robot arm
pip install physicalai[trossen] # Trossen WidowX robotsAll cameras share a unified interface: connect(), read(), read_latest(), and context manager support. Switch hardware without changing application code.
from physicalai.capture import UVCCamera
with UVCCamera(device="/dev/video0", width=640, height=480, fps=30) as camera:
frame = camera.read_latest()
print(frame.data.shape) # (480, 640, 3)
print(frame.timestamp) # monotonic timestampIntel RealSense (RGB + Depth)
from physicalai.capture import RealSenseCamera
with RealSenseCamera(serial_number="123456789", width=640, height=480, fps=30) as camera:
rgb, depth = camera.read_rgbd()
print(rgb.data.shape) # (480, 640, 3) RGB
print(depth.data.shape) # (480, 640) depth in mmBasler Industrial Camera
from physicalai.capture import BaslerCamera
with BaslerCamera(serial_number="12345678", width=1920, height=1080, fps=60) as camera:
frame = camera.read_latest()
print(frame.data.shape) # (1080, 1920, 3)Multi-Camera Sync
from physicalai.capture import UVCCamera, RealSenseCamera, read_cameras
cameras = {
"wrist": UVCCamera(device="/dev/video0"),
"overhead": RealSenseCamera(serial_number="123456789"),
}
# Connect all
for cam in cameras.values():
cam.connect()
# Read from all cameras concurrently
synced = read_cameras(cameras)
print(synced.frames["wrist"].data.shape)
print(synced.frames["overhead"].data.shape)
# Cleanup
for cam in cameras.values():
cam.disconnect()Camera Discovery
from physicalai.capture import discover_all, UVCCamera
# Discover all connected cameras (returns dict of camera_type -> list of devices)
all_devices = discover_all()
for camera_type, devices in all_devices.items():
for dev in devices:
print(f"{camera_type}: {dev.device_id} - {dev.name}")
# Discover specific type
uvc_devices = UVCCamera.discover()Robots implement a Protocol-based interface. Any class with connect(), disconnect(), get_observation(), send_action(), and joint_names works — no inheritance required.
from physicalai.robot import SO101
robot = SO101(port="/dev/ttyUSB0")
robot.connect()
obs = robot.get_observation()
print(obs.joint_positions) # [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
print(robot.joint_names) # ['shoulder_pan', 'shoulder_lift', ...]
robot.send_action(target_positions, goal_time=0.1)
robot.disconnect()Trossen WidowX-AI
from physicalai.robot import WidowXAI
robot = WidowXAI()
robot.connect()
obs = robot.get_observation()
print(obs.joint_positions)
robot.send_action(target_positions)
robot.disconnect()Bimanual WidowX-AI
from physicalai.robot import BimanualWidowXAI
robot = BimanualWidowXAI()
robot.connect()
obs = robot.get_observation()
# Joint positions for both arms concatenated
print(obs.joint_positions.shape)
robot.send_action(bimanual_targets)
robot.disconnect()Robot Verification
from physicalai.robot import SO101, verify_robot
robot = SO101(port="/dev/ttyUSB0")
verify_robot(robot) # Interactive joint-by-joint checkLoad exported policies from Physical AI Studio. The InferenceModel class auto-detects the backend (OpenVINO, ONNX, TorchScript) and handles action chunking automatically.
from physicalai.inference import InferenceModel
# Load exported policy
model = InferenceModel.load("./exports/act_policy")
# Reset state for new episode
model.reset()
# Run inference
action = model.select_action(observation)With Explicit Backend
from physicalai.inference import InferenceModel
# Force specific backend
model = InferenceModel.load(
"./exports/act_policy",
backend="openvino",
device="GPU",
)The PolicyRuntime orchestrates the full control loop: connecting hardware, reading cameras, building observations, running inference, and dispatching actions to the robot.
Preview: This API is work in progress
from physicalai.runtime import PolicyRuntime, SyncExecution
from physicalai.inference import InferenceModel
from physicalai.capture import UVCCamera, RealSenseCamera
from physicalai.robot import SO101
runtime = PolicyRuntime(
fps=30,
robot=SO101(port="/dev/ttyACM0"),
model=InferenceModel.load("./exports/act_policy"),
cameras={
"wrist": UVCCamera(device="/dev/video0", width=640, height=480),
"overhead": RealSenseCamera(serial="123456789"),
},
execution=SyncExecution(mode="chunk"),
)
runtime.run(duration_s=60)From YAML Config
Preview: This API is not yet implemented.
runtime = PolicyRuntime.from_config("runtime.yaml")
runtime.run(duration_s=60)# runtime.yaml
runtime:
class_path: physicalai.runtime.PolicyRuntime
init_args:
fps: 30
robot:
class_path: physicalai.robot.so101.SO101
init_args:
port: /dev/ttyACM0
model:
class_path: physicalai.inference.InferenceModel
init_args:
export_dir: ./exports/act_policy
cameras:
wrist:
class_path: physicalai.capture.UVCCamera
init_args:
device: /dev/video0
width: 640
height: 480
execution:
class_path: physicalai.runtime.SyncExecution
init_args:
mode: chunkCLI
Preview: This API is not yet implemented.
physicalai run --config runtime.yaml --duration-s 60Async Execution
Async execution runs inference in a background thread while the main loop handles camera reads and robot commands at a fixed frequency. Useful when inference is slower than the control rate.
Preview: This API is not yet implemented.
from physicalai.runtime import PolicyRuntime, AsyncExecution
runtime = PolicyRuntime(
fps=30,
robot=robot,
model=model,
cameras=cameras,
execution=AsyncExecution(),
)
runtime.run(duration_s=60)Remote Execution
Remote execution sends observations to an inference server and receives actions over the network. Useful for running large models on a separate GPU machine.
Preview: This API is not yet implemented.
from physicalai.runtime import PolicyRuntime, RemoteExecution
runtime = PolicyRuntime(
fps=30,
robot=robot,
cameras=cameras,
execution=RemoteExecution(endpoint="http://gpu-server:8080/infer"),
)
runtime.run(duration_s=60)Home • Getting Started • How-To Guides • Concepts • API Reference
See CONTRIBUTING.md.
