Open-source Python SDK for IoT observability and hardware telemetry. Stream sensor data, CAN bus, MAVLink, cameras, and MQTT from any device to Plexus — the HardwareOps platform for real-time monitoring and fleet management.
from plexus import Plexus
px = Plexus()
px.send("engine.rpm", 3450, tags={"unit": "A"})Works on any Linux system — Raspberry Pi, edge compute nodes, test rigs, fleet vehicles, ground stations.
pip install plexus-agentWith extras:
pip install plexus-agent[sensors] # I2C sensors (IMU, environmental)
pip install plexus-agent[can] # CAN bus with DBC decoding
pip install plexus-agent[mavlink] # MAVLink for drones/UAVs
pip install plexus-agent[mqtt] # MQTT bridge
pip install plexus-agent[camera] # USB cameras (OpenCV)
pip install plexus-agent[picamera] # Raspberry Pi Camera Module
pip install plexus-agent[serial] # Serial/UART (GPS, custom devices)
pip install plexus-agent[tui] # Live terminal dashboard
pip install plexus-agent[system] # System health (psutil)
pip install plexus-agent[all] # EverythingOne command from install to streaming:
pip install plexus-agent && plexus start --key plx_xxxxxplexus start handles auth, hardware detection, dependency installation, and sensor selection interactively:
Found 3 sensors on I2C bus 1:
[1] ✓ BME280 temperature, humidity, pressure
[2] ✓ MPU6050 accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z
[3] ✓ INA219 bus_voltage, shunt_voltage, current_ma, power_mw
Stream all? [Y/n] or enter numbers to select (e.g., 1,3):
Get an API key from app.plexus.company → Devices → Add Device.
plexus start --key plx_xxxxx# 1. Pair (one-time) — get your API key from app.plexus.company/devices
plexus pair --key plx_xxxxx
# 2. Run the agent
plexus runThe agent auto-detects connected sensors, cameras, and CAN interfaces. Control everything from the dashboard.
# Name the device for fleet identification
plexus run --name "test-rig-01"
# Stream system health (CPU, memory, disk, thermals)
plexus run --sensor system
# Bridge an MQTT broker
plexus run --mqtt localhost:1883
# Skip sensor/camera auto-detection
plexus run --no-sensors --no-camerasSend data programmatically without the managed agent. Good for scripts, batch uploads, and custom integrations.
- Create an API key at app.plexus.company → Settings → Developer
- Send data:
from plexus import Plexus
px = Plexus(api_key="plx_xxxxx", source_id="test-rig-01")
# Numeric telemetry
px.send("engine.rpm", 3450, tags={"unit": "A"})
px.send("coolant.temperature", 82.3)
# State and configuration
px.send("vehicle.state", "RUNNING")
px.send("motor.enabled", True)
px.send("position", {"x": 1.5, "y": 2.3, "z": 0.8})
# Batch send
px.send_batch([
("temperature", 72.5),
("pressure", 1013.25),
("vibration.rms", 0.42),
])See API.md for curl, JavaScript, Go, and Bash examples.
| Method | How to get it | Used by |
|---|---|---|
API key (plx_*) |
Dashboard → Devices → Add Device, or Settings → Developer | plexus run and Plexus() client |
Two ways to pair:
- API key (recommended):
plexus pair --key plx_xxxxx - Browser login:
plexus pair(opens browser for OAuth device flow)
Credentials are stored in ~/.plexus/config.json or can be set via environment variables:
export PLEXUS_API_KEY=plx_xxxxx
export PLEXUS_ENDPOINT=https://app.plexus.company # defaultplexus start [--key KEY] [--bus N] [--name NAME] Set up and stream (interactive)
plexus add [CAPABILITY...] Install capabilities (sensors, can, mqtt, ...)
plexus run [--live] [--auto-install] [OPTIONS] Start the agent
plexus pair [--key KEY] Pair device with your account
plexus scan [--all] [--setup] [--json] Detect connected hardware
plexus status Check connection and config
plexus doctor Diagnose issues
Set up and start streaming in one command. Handles auth, hardware detection, dependency installation, and sensor selection interactively.
plexus start # Interactive setup
plexus start --key plx_xxx # Non-interactive auth
plexus start --key plx_xxx -b 0 # Specify I2C bus
plexus start --name "robot-01" # Name the device| Flag | Description |
|---|---|
-k, --key |
API key (skips interactive auth prompt) |
-n, --name |
Device name for fleet identification |
-b, --bus |
I2C bus number (default: 1) |
Install capabilities — like shadcn add for hardware. Without arguments, shows an interactive picker with install status.
plexus add # Interactive picker
plexus add can # Add CAN bus support
plexus add sensors camera # Add multipleAvailable capabilities: sensors, camera, picamera, mqtt, can, mavlink, serial, system, tui.
Start the agent. Connects to Plexus and streams telemetry controlled from the dashboard.
plexus run # Start with auto-detected hardware
plexus run --live # Live terminal dashboard (like htop)
plexus run --sensor system # Stream CPU, memory, disk, thermals
plexus run --auto-install # Auto-install missing dependencies
plexus run --mqtt localhost:1883 # Bridge MQTT data
plexus run --no-sensors --no-cameras # Skip hardware auto-detection| Flag | Description |
|---|---|
-n, --name |
Device name for fleet identification |
--live |
Show live terminal dashboard with real-time metrics |
--auto-install |
Auto-install missing Python dependencies on demand |
--no-sensors |
Disable I2C sensor auto-detection |
--no-cameras |
Disable camera auto-detection |
-b, --bus |
I2C bus number (default: 1) |
-s, --sensor |
Sensor type to use (e.g. system). Repeatable. |
--mqtt |
MQTT broker to bridge (e.g. localhost:1883) |
--mqtt-topic |
MQTT topic to subscribe to (default: sensors/#) |
Detect all connected hardware — I2C sensors, cameras, serial ports, USB devices, network interfaces, GPIO, Bluetooth, and system info.
plexus scan # Full hardware scan
plexus scan --all # Show all I2C addresses (including unknown)
plexus scan --setup # Auto-configure CAN interfaces
plexus scan --json # Machine-readable JSON outputDiagnose connectivity, configuration, and dependency issues. Checks config files, network reachability, authentication, installed dependencies, and hardware permissions.
plexus doctor # Run all diagnosticsRun plexus <command> --help for full options.
Declare typed commands on your device. The dashboard auto-generates UI controls — sliders, dropdowns, toggles — from the schema.
from plexus import Plexus, param
px = Plexus()
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000, unit="rpm")
@param("ramp_time", type="float", min=0.1, max=10.0, default=1.0, unit="s")
async def set_speed(rpm, ramp_time):
motor.set_rpm(rpm, ramp=ramp_time)
return {"actual_rpm": motor.read_rpm()}
@px.command("set_mode", description="Switch operating mode")
@param("mode", type="enum", choices=["idle", "run", "calibrate"])
async def set_mode(mode):
controller.set_mode(mode)Commands are sent to the device over WebSocket and executed in real time. The dashboard shows:
- Parameter inputs with validation (min/max, type checking, required fields)
- Execution status and results
- Command history
This works the same way in the C SDK — see the C SDK README for the equivalent API.
Group related data for analysis and playback:
with px.session("thermal-cycle-001"):
while running:
px.send("temperature", read_temp())
px.send("vibration.rms", read_accel())
time.sleep(0.01)Auto-detect all connected I2C sensors:
from plexus import Plexus
from plexus.sensors import auto_sensors
hub = auto_sensors() # finds IMU, environmental, etc.
hub.run(Plexus()) # streams foreverOr configure manually:
from plexus.sensors import SensorHub, MPU6050, BME280
hub = SensorHub()
hub.add(MPU6050(sample_rate=100))
hub.add(BME280(sample_rate=1))
hub.run(Plexus())Built-in sensor drivers:
| Sensor | Type | Metrics | Interface |
|---|---|---|---|
| MPU6050 | 6-axis IMU | accel_x/y/z, gyro_x/y/z | I2C (0x68) |
| MPU9250 | 6-axis IMU | accel_x/y/z, gyro_x/y/z | I2C (0x68) |
| BME280 | Environmental | temperature, humidity, pressure | I2C (0x76) |
| INA219 | Current/Power | bus_voltage, shunt_voltage, current_ma, power_mw | I2C (0x40) |
| SHT3x | Temp/Humidity | temperature, humidity | I2C (0x44) |
| BH1750 | Ambient Light | illuminance | I2C (0x23) |
| VL53L0X | Time-of-Flight | distance_mm | I2C (0x29) |
| ADS1115 | 16-bit ADC | channel_0, channel_1, channel_2, channel_3 | I2C (0x48) |
| QMC5883L | Magnetometer | mag_x, mag_y, mag_z, heading | I2C (0x0D) |
| HMC5883L | Magnetometer | mag_x, mag_y, mag_z, heading | I2C (0x1E) |
| GPS | GPS Receiver | lat, lon, altitude, speed | Serial |
| System | System health | cpu.temperature, memory.used_pct, disk.used_pct, cpu.load | None |
Write a driver for any hardware by extending BaseSensor:
from plexus.sensors import BaseSensor, SensorReading
class StrainGauge(BaseSensor):
name = "StrainGauge"
description = "Load cell strain gauge via ADC"
metrics = ["strain", "force_n"]
def read(self):
raw = self.adc.read_channel(0)
strain = (raw / 4096.0) * self.calibration_factor
return [
SensorReading("strain", round(strain, 6)),
SensorReading("force_n", round(strain * self.k_factor, 2)),
]Read CAN bus data with optional DBC signal decoding:
from plexus import Plexus
from plexus.adapters import CANAdapter
px = Plexus(api_key="plx_xxx", source_id="vehicle-001")
adapter = CANAdapter(
interface="socketcan",
channel="can0",
dbc_path="vehicle.dbc", # optional: decode signals
)
with adapter:
while True:
for metric in adapter.poll():
px.send(metric.name, metric.value, tags=metric.tags)Supports socketcan, pcan, vector, kvaser, and slcan interfaces. See examples/can_basic.py for more.
Stream telemetry from MAVLink-speaking vehicles — ArduPilot, PX4, and other autopilots:
from plexus import Plexus
from plexus.adapters import MAVLinkAdapter
px = Plexus(api_key="plx_xxx", source_id="drone-001")
adapter = MAVLinkAdapter(
connection_string="udpin:0.0.0.0:14550", # SITL or GCS
)
with adapter:
while True:
for metric in adapter.poll():
px.send(metric.name, metric.value, tags=metric.tags)Decoded metrics include attitude (roll/pitch/yaw), GPS, battery, airspeed, RC channels, and more. Supports UDP, TCP, and serial connections. See examples/mavlink_basic.py for more.
Forward MQTT messages to Plexus:
from plexus.adapters import MQTTAdapter
adapter = MQTTAdapter(broker="localhost", topic="sensors/#")
adapter.connect()
adapter.run(on_data=my_callback)Or bridge directly from the CLI:
plexus run --mqtt localhost:1883 --mqtt-topic "sensors/#"The client buffers data locally when the network is unavailable:
- In-memory buffer (default, up to 10,000 points)
- Persistent SQLite buffer for surviving restarts
- Automatic retry with exponential backoff
- Buffered points are sent with the next successful request
# Enable persistent buffering
px = Plexus(persistent_buffer=True)
# Check buffer state
print(px.buffer_size())
px.flush_buffer()Run plexus run --live to get a real-time terminal UI — like htop for your hardware:
┌──────────────────────────────────────────────────────────────┐
│ Plexus Live Dashboard ● online ↑ 4m 32s │
├──────────────┬──────────┬────────┬────────┬─────────────────┤
│ Metric │ Value │ Rate │ Buffer │ Status │
├──────────────┼──────────┼────────┼────────┼─────────────────┤
│ cpu.temp │ 62.3 │ 1.0 Hz │ 0 │ ● streaming │
│ engine.rpm │ 3,450 │ 10 Hz │ 0 │ ● streaming │
│ pressure │ 1013.2 │ 1.0 Hz │ 0 │ ● streaming │
└──────────────┴──────────┴────────┴────────┴─────────────────┘
│ Throughput: 12 pts/min Total: 847 Errors: 0 │
└──────────────────────────────────────────────────────────────┘
Requires the tui extra: pip install plexus-agent[tui]
Device (plexus run)
├── WebSocket → PartyKit Server → Dashboard (real-time)
└── HTTP POST → /api/ingest → ClickHouse (storage)
- WebSocket path: Used by
plexus runfor real-time streaming controlled from the dashboard. Data flows through the PartyKit relay to connected browsers. - HTTP path: Used by the
Plexus()client for direct data ingestion. Data is stored in ClickHouse for historical queries.
When recording a session, both paths are used — WebSocket for live view, HTTP for persistence.
See API.md for the full HTTP and WebSocket protocol specification, including:
- Request/response formats
- All message types
- Code examples in Python, JavaScript, Go, and Bash
- Error codes
- Best practices
Apache 2.0