Skip to content

openvinotoolkit/physicalai

Physical AI

Runtime package for deploying robot policies trained with Physical AI Studio

InstallationCamera APIRobot APIInferenceDocs


Physical AI Runtime provides the deployment-side components for running trained policies on real hardware. It handles camera capture, robot control, and policy inference with a unified API that works across different hardware vendors.

Key Features:

  • Unified Camera API — Same interface for UVC, RealSense, Basler, and IP cameras
  • Robot Protocol — Structural typing for any robot; no inheritance required
  • Inference Engine — Load exported policies from Studio with auto-detected backends
  • Policy Runtime — Control loop with observation building and action dispatch

Installation

pip install physicalai

With hardware-specific extras:

pip install physicalai[realsense]   # Intel RealSense cameras
pip install physicalai[basler]      # Basler industrial cameras
pip install physicalai[so101]       # SO-101 robot arm
pip install physicalai[trossen]     # Trossen WidowX robots

Camera API

All cameras share a unified interface: connect(), read(), read_latest(), and context manager support. Switch hardware without changing application code.

from physicalai.capture import UVCCamera

with UVCCamera(device="/dev/video0", width=640, height=480, fps=30) as camera:
    frame = camera.read_latest()
    print(frame.data.shape)  # (480, 640, 3)
    print(frame.timestamp)   # monotonic timestamp
Intel RealSense (RGB + Depth)
from physicalai.capture import RealSenseCamera

with RealSenseCamera(serial_number="123456789", width=640, height=480, fps=30) as camera:
    rgb, depth = camera.read_rgbd()
    print(rgb.data.shape)    # (480, 640, 3) RGB
    print(depth.data.shape)  # (480, 640) depth in mm
Basler Industrial Camera
from physicalai.capture import BaslerCamera

with BaslerCamera(serial_number="12345678", width=1920, height=1080, fps=60) as camera:
    frame = camera.read_latest()
    print(frame.data.shape)  # (1080, 1920, 3)
Multi-Camera Sync
from physicalai.capture import UVCCamera, RealSenseCamera, read_cameras

cameras = {
    "wrist": UVCCamera(device="/dev/video0"),
    "overhead": RealSenseCamera(serial_number="123456789"),
}

# Connect all
for cam in cameras.values():
    cam.connect()

# Read from all cameras concurrently
synced = read_cameras(cameras)
print(synced.frames["wrist"].data.shape)
print(synced.frames["overhead"].data.shape)

# Cleanup
for cam in cameras.values():
    cam.disconnect()
Camera Discovery
from physicalai.capture import discover_all, UVCCamera

# Discover all connected cameras (returns dict of camera_type -> list of devices)
all_devices = discover_all()
for camera_type, devices in all_devices.items():
    for dev in devices:
        print(f"{camera_type}: {dev.device_id} - {dev.name}")

# Discover specific type
uvc_devices = UVCCamera.discover()

Robot API

Robots implement a Protocol-based interface. Any class with connect(), disconnect(), get_observation(), send_action(), and joint_names works — no inheritance required.

from physicalai.robot import SO101

robot = SO101(port="/dev/ttyUSB0")
robot.connect()

obs = robot.get_observation()
print(obs.joint_positions)  # [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
print(robot.joint_names)    # ['shoulder_pan', 'shoulder_lift', ...]

robot.send_action(target_positions, goal_time=0.1)
robot.disconnect()
Trossen WidowX-AI
from physicalai.robot import WidowXAI

robot = WidowXAI()
robot.connect()

obs = robot.get_observation()
print(obs.joint_positions)

robot.send_action(target_positions)
robot.disconnect()
Bimanual WidowX-AI
from physicalai.robot import BimanualWidowXAI

robot = BimanualWidowXAI()
robot.connect()

obs = robot.get_observation()
# Joint positions for both arms concatenated
print(obs.joint_positions.shape)

robot.send_action(bimanual_targets)
robot.disconnect()
Robot Verification
from physicalai.robot import SO101, verify_robot

robot = SO101(port="/dev/ttyUSB0")
verify_robot(robot)  # Interactive joint-by-joint check

Inference

Load exported policies from Physical AI Studio. The InferenceModel class auto-detects the backend (OpenVINO, ONNX, TorchScript) and handles action chunking automatically.

from physicalai.inference import InferenceModel

# Load exported policy
model = InferenceModel.load("./exports/act_policy")

# Reset state for new episode
model.reset()

# Run inference
action = model.select_action(observation)
With Explicit Backend
from physicalai.inference import InferenceModel

# Force specific backend
model = InferenceModel.load(
    "./exports/act_policy",
    backend="openvino",
    device="GPU",
)

Policy Runtime

The PolicyRuntime orchestrates the full control loop: connecting hardware, reading cameras, building observations, running inference, and dispatching actions to the robot.

Preview: This API is work in progress

from physicalai.runtime import PolicyRuntime, SyncExecution
from physicalai.inference import InferenceModel
from physicalai.capture import UVCCamera, RealSenseCamera
from physicalai.robot import SO101

runtime = PolicyRuntime(
    fps=30,
    robot=SO101(port="/dev/ttyACM0"),
    model=InferenceModel.load("./exports/act_policy"),
    cameras={
        "wrist": UVCCamera(device="/dev/video0", width=640, height=480),
        "overhead": RealSenseCamera(serial="123456789"),
    },
    execution=SyncExecution(mode="chunk"),
)

runtime.run(duration_s=60)
From YAML Config

Preview: This API is not yet implemented.

runtime = PolicyRuntime.from_config("runtime.yaml")
runtime.run(duration_s=60)
# runtime.yaml
runtime:
  class_path: physicalai.runtime.PolicyRuntime
  init_args:
    fps: 30
    robot:
      class_path: physicalai.robot.so101.SO101
      init_args:
        port: /dev/ttyACM0
    model:
      class_path: physicalai.inference.InferenceModel
      init_args:
        export_dir: ./exports/act_policy
    cameras:
      wrist:
        class_path: physicalai.capture.UVCCamera
        init_args:
          device: /dev/video0
          width: 640
          height: 480
    execution:
      class_path: physicalai.runtime.SyncExecution
      init_args:
        mode: chunk
CLI

Preview: This API is not yet implemented.

physicalai run --config runtime.yaml --duration-s 60
Async Execution

Async execution runs inference in a background thread while the main loop handles camera reads and robot commands at a fixed frequency. Useful when inference is slower than the control rate.

Preview: This API is not yet implemented.

from physicalai.runtime import PolicyRuntime, AsyncExecution

runtime = PolicyRuntime(
    fps=30,
    robot=robot,
    model=model,
    cameras=cameras,
    execution=AsyncExecution(),
)

runtime.run(duration_s=60)
Remote Execution

Remote execution sends observations to an inference server and receives actions over the network. Useful for running large models on a separate GPU machine.

Preview: This API is not yet implemented.

from physicalai.runtime import PolicyRuntime, RemoteExecution

runtime = PolicyRuntime(
    fps=30,
    robot=robot,
    cameras=cameras,
    execution=RemoteExecution(endpoint="http://gpu-server:8080/infer"),
)

runtime.run(duration_s=60)

Documentation

HomeGetting StartedHow-To GuidesConceptsAPI Reference

Contributing

See CONTRIBUTING.md.

About

No description, website, or topics provided.

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages