This repository provides a working, dockerized reference implementation of end-to-end observability for distributed energy resources (DER): solar inverters, batteries, and EV chargers. It demonstrates how to ingest → normalize → store → analyze → visualize high-frequency telemetry with production-like patterns.
- FastAPI ingestion API with
/healthand/ingestendpoints (app/ingestion_api.py). - Pydantic
TelemetryPacketmodel withts,site_id,vendor,device_type,device_id,data,meta(app/models.py). - Normalization pipeline to coerce mixed vendor packets into canonical rows with explicit
metric,value,unit(app/normalize.py). - TimescaleDB/Postgres persistence with JSONB
metaand upsert for latest telemetry (app/db.py,docker-compose.yml). - Streamlit dashboard with live refresh showing Fleet Power Flow, Power Metrics, Battery SOC & Inverter Temp, Recent Telemetry, and Telemetry Gaps (
dashboard/app.py). - Gap/anomaly utilities for data quality and availability risk estimation (
app/anomalies.py). - Deterministic simulators for solar inverter, battery, and EVSE to generate test telemetry (
simulator/*.py). - Dockerized stack: TimescaleDB, API (uvicorn), Dashboard (Streamlit) wired via
docker-compose.yml.
[Simulators] ──HTTP JSON──▶ [FastAPI Ingestion] ──normalize──▶ [TimescaleDB]
│
└──▶ [Streamlit Dashboard]
- Ingestion API: Receives a list of
TelemetryPacketJSON objects and normalizes them into tidy rows. - Storage: Persists to Postgres/TimescaleDB (time-series optimized). JSONB
metapreserves vendor specifics. - Analysis: Gap and outlier detection (
anomalies.py), including estimated lost DR capacity from gaps. - Visualization: Real-time charts plus Telemetry Gaps panel for data quality and availability.
Prereqs: Docker + Docker Compose
# build and start containers in background
docker compose up --build -d
# check logs
docker compose logs -f api
docker compose logs -f dashboard
docker compose logs -f simulator
# stop and remove containers/volumes
docker compose down -vThis starts three services:
- db – TimescaleDB on
localhost:5432(db:telemetry, user/pass:postgres/postgres) - api – FastAPI on
http://localhost:8000 - dashboard – Streamlit on
http://localhost:8501
Health:
GET http://localhost:8000/health→{"status":"ok"}
Schema is defined in sql/init.sql. To load it:
docker compose exec db psql -U postgres -d telemetry -f sql/init.sqlPOST /ingest accepts a JSON array of packets:
[
{
"ts": "2025-01-01T00:00:01Z",
"site_id": "SITE_001",
"vendor": "acme",
"device_type": "inverter",
"device_id": "INV-01",
"data": {"ac_power_kw": 4.2, "dc_voltage_v": 380},
"meta": {"fw": "1.2.3"}
}
]curl -X POST http://localhost:8000/ingest -H "Content-Type: application/json" -d '[{"ts":"2025-01-01T00:00:01Z","site_id":"SITE_001","vendor":"acme",
"device_type":"inverter","device_id":"INV-01",
"data":{"ac_power_kw":4.2},"meta":{"fw":"1.2.3"}}]'DATABASE_URL– API DSN (default:postgresql://postgres:postgres@db:5432/telemetry)DSN– dashboard DSN (default: same as above)API_URL– simulator target API (default:http://api:8000/ingest)HEALTH_URL– simulator health check (default:http://api:8000/health)
Generate realistic test data without hardware:
simulator/solar_inverter.pysimulator/battery.pysimulator/ev_charger.pysimulator/run_all.py(runs all of the above; waits for API health, then streams packets at configurable Hz)
Example:
python -m distributed-energy-observability.simulator.run_all --hz 15 --site SITE_001Open http://localhost:8501 to view:
- Fleet Power Flow (1s avg)
- Power Metrics (1s avg)
- Battery SOC & Inverter Temp (past 3 min)
- Recent Telemetry
- Telemetry Gaps (with
max_gap_sthreshold)
Auto-refresh cadence and lookback window are adjustable in the UI.
Columns:
ts,site_id,device_type,device_id,metric,value,unit,meta
Utilities in app/anomalies.py:
gap_windows(df, max_gap_s=5.0)zscore_outliers(df, threshold=3.0)estimate_lost_dr_kw(gap_df)
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txtRun services manually:
export DATABASE_URL=postgresql://postgres:postgres@localhost:5432/telemetry
uvicorn distributed-energy-observability.app.ingestion_api:app --reload --port 8000
streamlit run distributed-energy-observability/dashboard/app.pyUse Black, Ruff, isort with config in pyproject.toml.
- Add Kafka/Redpanda buffer for backpressure
- Add IsolationForest anomaly detection with labeled faults
- Add Grafana dashboards alongside Streamlit
- Add DR event simulator to show derated capacity under telemetry loss
- Add multi-site roll-ups and service-level objectives (% devices meeting freshness targets)
distributed-energy-observability/
├─ app/
├─ dashboard/
├─ simulator/
├─ sql/
│ └─ init.sql
├─ assets/
│ ├─ dashboard-hero.png
│ └─ docker-up-demo.gif
├─ Dockerfile
├─ docker-compose.yml
└─ requirements.txt
MIT

