Rust monorepo for EMWIN protocol decoding, live ingest orchestration, HTTP API serving, and CLI tooling.
Install latest release via script:
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/bradsjm/emwin-rs/releases/latest/download/emwin-cli-installer.sh | shRun via Docker (no local Rust toolchain required):
docker run --rm ghcr.io/bradsjm/emwin-rs/emwin-cli:latest --helpDevelopment compose stack (ephemeral Postgres + MinIO + emwin-cli server):
cp .env.compose.example .env.compose
docker compose up --buildcompose.ymlprovisionspostgis/postgis, MinIO, andemwin-cli server.- Postgres data and MinIO object storage use
tmpfs, so the stack is intentionally non-persistent. emwin-cliruns withEMWIN_OUTPUT_DIR=s3://emwin/emwin,AWS_ENDPOINT_URL=http://minio:9000, andEMWIN_PERSIST_DATABASE_URL=postgresql://emwin:emwin@postgres:5432/emwin?sslmode=disableby default.- Set
EMWIN_USERNAMEin.env.compose; setEMWIN_RECEIVER=wxwireandEMWIN_PASSWORDonly when using Weather Wire. - To point compose at a different object-store target, set
EMWIN_OUTPUT_DIRto an S3-style URI such ass3://bucket[/prefix]or an HTTP(S) URI such ashttps://host/path. Backend credentials and endpoint settings come from the environment variables thatobject_storerecognizes for that scheme. For MinIO and other S3-compatible targets, keep usingAWS_ENDPOINT_URL,AWS_REGIONorAWS_DEFAULT_REGION, and the standard AWS credential variables. - The HTTP server is exposed on
http://127.0.0.1:8080, Postgres on127.0.0.1:5432, MinIO S3 onhttp://127.0.0.1:9000, and the MinIO console onhttp://127.0.0.1:9001by default.
emwin-protocol: protocol decoding, ingest runtimes, and relay primitivesemwin-service: shared service contracts and DTOs for live/archive adaptersemwin-live: headless live ingest runtime for QBT and Weather Wireemwin-api: HTTP/SSE/OpenAPI adapter overemwin-liveemwin-cli: CLI entrypoint for archive queries, live server startup, and relay modeemwin-db: persistence runtime and Postgres-backed archive service implementationemwin-parser: text product enrichment and parsing
Add the crate from this monorepo:
[dependencies]
emwin-protocol = { git = "https://github.com/bradsjm/emwin-rs", tag = "v0.3.1", package = "emwin-protocol" }Use the unified ingest API from the crate root:
use emwin_protocol::ingest::{IngestConfig, IngestReceiver};
use emwin_protocol::qbt_receiver::{QbtDecodeConfig, QbtReceiverConfig, default_qbt_upstream_servers};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut receiver = IngestReceiver::build(IngestConfig::Qbt(QbtReceiverConfig {
email: "you@example.com".to_string(),
servers: default_qbt_upstream_servers(),
server_list_path: None, // automatic server-list mode only
follow_server_list_updates: true,
reconnect_delay_secs: 5, // delay after one full failed pass
connection_timeout_secs: 5,
watchdog_timeout_secs: 49,
max_exceptions: 10,
decode: QbtDecodeConfig::default(),
}))?;
receiver.start()?;
receiver.stop().await?;
Ok(())
}For active development against local changes, use a path dependency instead:
[dependencies]
emwin-protocol = { path = "../emwin-rs/crates/emwin-protocol", default-features = false, features = ["qbt"] }emwin-protocol protocol feature flags:
[dependencies]
emwin-protocol = { path = "../emwin-rs/crates/emwin-protocol", default-features = false, features = ["qbt"] }Receiver implementations are opt-in. emwin-protocol now defaults to no transport features, so downstream crates must explicitly enable qbt and/or wxwire.
Archive query mode:
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin incidents --office KOAX
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin incident KOAX FF W 2001
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin incident-products KOAX FF W 2001 --limit 50
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin products --office KOAX --artifact-kind nws_text_product --min-lat 41 --max-lat 42 --min-lon -97 --max-lon -95 --limit 25
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin product 42
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin features --kind polygon --artifact-kind nws_text_product --limit 25
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin features-geojson --kind search_point --limit 100
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin aggregate-facets office --artifact-kind nws_text_product --limit 20
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin aggregate-timeseries product_count --start 2025-03-05T12:00:00Z --end 2025-03-05T15:00:00Z --bucket hour
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin aggregate-cells product_count --precision 5 --limit 100
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin issues --product-id 42 --kind text_product_parse
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin issue 7
cargo run -p emwin-cli -- query --database-url postgres://localhost/emwin product-raw 42 --output ./product.binLive server mode:
cargo run -p emwin-cli -- server --username you@example.com --bind 127.0.0.1:8080
cargo run -p emwin-cli -- server --username you@example.com --output-dir file:///tmp/emwin
cargo run -p emwin-cli -- server --username you@example.com --output-dir file:///tmp/emwin --post-process-archives false
cargo run -p emwin-cli -- server --username you@example.com --output-dir file:///tmp/emwin --persist-database-url postgres://localhost/emwin
cargo run -p emwin-cli -- server --username you@example.com --output-dir file:///tmp/emwin --persist-database-url postgres://localhost/emwin --max-db-connections 16
cargo run -p emwin-cli -- server --username you@example.com --output-dir s3://my-bucket/emwin --persist-database-url postgres://localhost/emwin
cargo run -p emwin-cli -- server --receiver wxwire --username you@example.com --password 'secret'emwin-cli server composes emwin-live and emwin-api.
Optional file persistence:
server --output-dir <OBJECT_STORE_ROOT_URI>writes each completed assembled file and a sibling.JSONmetadata sidecar under canonical archival paths such asqbt/2026/03/16/BOX/nws_text_product/20260316T021530Z-4f2c9d91-AFDBOX.TXT.- Local filesystem persistence must use
file://URIs such asfile:///tmp/emwin; plain paths are rejected. - When
--persist-database-urlis also set, blob writes still succeed even if the Postgres metadata upsert fails. - When Postgres is unavailable at startup or during runtime, the server stays up, retries metadata persistence in the background with backoff, and resumes writing after connectivity returns.
- When filesystem writes fail transiently, including
ENOSPC, or the configured object store returns transient service/network failures, the background persistence worker retries with throttled warning logs while live ingest and connected clients remain online. - ZIP/ZIS archive entry directories are not preserved in persisted blob paths; the original delivered filename remains available in metadata and
/v1/filesresponses. serverdefaults to--post-process-archives true, which extracts the first entry from completed.ZIPand.ZISproducts before parsing and downstream delivery.- Corrupt
.ZIPand.ZISpayloads are logged asCorrupt Zip File Receivedand dropped when archive post-processing is enabled. serverserves retained payloads over HTTP from the in-memory retention cache while optionally persisting payloads and metadata asynchronously in the background.
CLI logging format:
- Diagnostics/logging use canonical
tracing-subscriberformatting andRUST_LOGfiltering. - Process startup logs include the
emwin-clicrate version and selected subcommand. - Command payloads remain on
stdout; diagnostics/logging remain onstderr. - This
stdout/stderrsplit applies to all modes, includingrelay.
Live server mode (SSE + JSON endpoints):
cargo run -p emwin-cli -- server --username you@example.com --bind 127.0.0.1:8080
cargo run -p emwin-cli -- server --receiver wxwire --username you@example.com --password 'secret' --bind 127.0.0.1:8080
cargo run -p emwin-cli -- server --post-process-archives false --username you@example.com --bind 127.0.0.1:8080Useful server flags:
--stats-interval-secs 30(set0to disable periodic stats logging)--quiet(suppress non-error logs)--max-clients 100(cap concurrent SSE clients)--file-retention-secs 300(in-memory completed-file TTL)--max-retained-files 1000(in-memory completed-file capacity)--max-db-connections 10(Postgres pool size for archive reads and persistence writes)--openapi-auth-token "secret-token"(requireAuthorization: Bearer <token>on/v1/*)--alerting-apprise-api-url "http://127.0.0.1:8000"(used byPOST /v1/alerting/contact-points/{id}/testfor Apprise targets)--cors-origin "*"or--cors-origin "https://your-ui.example"(cross-origin browser clients can sendAuthorizationwhen bearer auth is enabled)
Server endpoints:
GET /- Swagger UI for the server APIGET /openapi.json- generated OpenAPI documentGET /v1/incidents- live incident projection backed by persisted Postgres metadataGET /v1/incidents/{office}/{phenomena}/{significance}/{etn}- incident detail with related product linksGET /v1/incidents/{office}/{phenomena}/{significance}/{etn}/products- archived products linked to one incidentGET /v1/products- archived product list/search with cursor paginationGET /v1/products/{product_id}- persisted archived product detailGET /v1/products/{product_id}/raw- persisted archived payload bytesGET /v1/features- archived spatial feature list with cursor paginationGET /v1/features/geojson- bounded GeoJSONFeatureCollectionover archived featuresGET /v1/aggregates/facets- archive facet buckets with completeness metadataGET /v1/aggregates/timeseries- archive time buckets with completeness metadataGET /v1/aggregates/cells- archive geohash cell buckets with completeness metadata, counting distinct products per intersected cell across persisted spatial featuresGET /v1/issues- archived issue listGET /v1/issues/{issue_id}- archived issue detailGET /v1/streams/products?event=product_available&lat=41.42&lon=-96.17&distance_miles=5- SSE product stream with parsed metadata and spatial filtersGET /v1/streams/incidents?action=created,updated&office=KOAX&phenomena=FF&significance=W&etn=2001&status=active- SSE stream of persisted incident projection changes with incident-native filtersGET|POST /v1/alerting/contact-points,GET|PATCH|DELETE /v1/alerting/contact-points/{id},POST /v1/alerting/contact-points/{id}/test- contact-point CRUD and test deliveryGET|POST /v1/alerting/rules,GET|PATCH|DELETE /v1/alerting/rules/{id},POST /v1/alerting/rules/simulate,POST /v1/alerting/rules/{id}/simulate,GET /v1/alerting/rules/{id}/events- rule CRUD, simulation, and auditGET /v1/alerting/deliveries,GET|POST /v1/alerting/silences,DELETE /v1/alerting/silences/{id}- delivery audit and silence managementGET /v1/files- retained completed-file payloads using the same shape asproduct_availableevents, including parsedproductmetadata anddownload_urlGET /v1/files/{*filename}- retained file download (URL-encoded path segment)GET /v1/health- server health summaryGET /v1/metrics- JSON telemetry snapshot
Authentication notes:
- When
--openapi-auth-tokenorEMWIN_OPENAPI_AUTH_TOKENis set, all/v1/*requests must includeAuthorization: Bearer <token>. /openapi.jsonadvertises bearer auth only when--openapi-auth-tokenis configured.GET /,GET /openapi.json, and Swagger UI asset routes remain public so the browser docs continue to load.
Archive/incident notes:
/v1/incidents,/v1/products/*, and/v1/issues/*require--persist-database-url; they return503when Postgres-backed archive metadata is not configured./v1/alerting/*also requires--persist-database-url; the control plane is backed by the same Postgres deployment./v1/incidentsexposes the mutable incident projection from theincidentstable;/v1/products/*exposes persisted product records and raw payload retrieval./v1/products,/v1/features, and/v1/aggregates/*share the archive filter grammar, includingartifact_kind.- Incident alert simulations only use retained
alerting.source_events; requests earlier than the first retained incident source event are rejected instead of fabricating history. POST /v1/alerting/contact-points/{id}/testcan send Apprise tests only when--alerting-apprise-api-urlorEMWIN_APPRISE_API_URLis configured on the server process.- Archive resource endpoints accept flat query parameters such as
office=MKX,lat=41.42, andsource_timestamp_after=1775586000; nested forms such asfilters.office=...andfilters[office]=...are rejected with400. /v1/streams/incidentsalso requires--persist-database-url; it emitsincident_changeSSE frames only after incident projection writes or cleanup updates succeed in Postgres./v1/streams/productsand/v1/streams/incidentsare incremental streams, not durable replay logs.- Clients should fetch an initial snapshot from the corresponding resource endpoints, then attach the SSE stream.
Last-Event-IDis best-effort for short reconnect gaps only; if the server emits a lag warning or the client detects a gap, the client must resync from the resource endpoints./v1/healthreturnsstatus: "degraded"and includes anarchivestatus object when archive persistence is configured but archive access is failing.
/v1/streams/incidents filter parameters:
action- comma-delimited incident mutation types:created,updatedoffice,phenomena,significance,status- incident identity and lifecycle filters using canonical values such asKOAX,FF,W, andactiveetn- comma-delimited event tracking number filter such as2001,2002
/v1/streams/products filter parameters:
event- comma-delimited event names such asproduct_availablefilename- wildcard filename match such as*.TXTorA_*source,pil,family,container,wmo_prefix,office,office_city,office_state,bbb_kind- product metadata filters (sourceuses parsed enrichment sources such astext_headerorwmo_taf_bulletin;officematches the normalized 3-letter office code;containerreflects parsed container values such asraworzip)cccc,ttaaii,afos,bbb- header filters (cccc,ttaaii, andbbbmatch both AFOS-backed headers and WMO-only bulletin headers when present)has_issues,issue_kind,issue_code- parse/QC issue filtershas_vtec,has_ugc,has_hvtec,has_latlon,has_time_mot_loc,has_wind_hail- parsed body presence filters usingtrue/falseor1/0state,county,zone,fire_zone,marine_zone- UGC geographic filters using canonical codes such asNE,IAC001,CAZ041,COF214,AMZ250vtec_phenomena,vtec_significance,vtec_action,vtec_office,etn- VTEC filters using canonical codes such asTO,W,NEW,KDMX, and123hvtec_nwslid,hvtec_severity,hvtec_cause,hvtec_record- HVTEC filters using values such asMSRM1,major,excessive_rainfall, andno_recordwind_hail_kind,min_wind_mph,min_hail_inches- severe-tag filters using kinds such asmax_wind_gust,hail_threat,legacy_haillat,lon,distance_miles- parsed location filters;lat/lonare required together,distance_milesdefaults to5.0, products match if the point falls inside any parsedLAT...LONpolygon or within range of any parsedTIME...MOT...LOC,UGC, orHVTECpointmin_lat,max_lat,min_lon,max_lon- parsed bounding-box filters; all four values are required together, and products match when any parsed polygon, motion path, or parsed point intersects the boxmin_size,max_size- completed file size bounds in bytes- Invalid archive boolean filters now return
400instead of being ignored. - Invalid archive size ranges where
min_size > max_sizenow return400. /v1/features,/v1/features/geojson, and/v1/aggregates/cellsapply spatial filters to each returned geometry or counted feature contribution, not just to product admission./v1/aggregates/cellscounts each product once per geohash cell; polygons and paths may contribute to multiple cells, while representative points contribute to their containing cell.
Examples:
GET /v1/streams/products?event=product_available&pil=TAF,AFDGET /v1/streams/products?event=product_available&family=nws_text_product&container=rawGET /v1/streams/products?event=product_available&source=wmo_taf_bulletin&cccc=KWBCGET /v1/streams/products?event=product_available&office=FFC&office_state=GAGET /v1/streams/products?event=product_available&has_issues=true&issue_code=invalid_wmo_headerGET /v1/streams/products?event=product_available&cccc=KBOX&ttaaii=FXUS61GET /v1/streams/products?event=product_available&county=IAC001&vtec_phenomena=TO&vtec_significance=WGET /v1/streams/products?event=product_available&has_hvtec=true&hvtec_cause=excessive_rainfallGET /v1/streams/products?event=product_available&has_wind_hail=true&min_wind_mph=50&min_hail_inches=1.00GET /v1/streams/products?event=product_available&state=NE&vtec_office=KOAX&vtec_action=NEWGET /v1/streams/products?event=product_available&lat=41.42&lon=-96.17GET /v1/streams/products?event=product_available&lat=41.42&lon=-96.17&distance_miles=15GET /v1/streams/products?event=product_available&min_lat=41.0&max_lat=42.0&min_lon=-97.0&max_lon=-95.0
Optional live-mode endpoint/persistence overrides:
--server host:port(repeatable or comma-delimited)--server-list-path ./servers.json
For QBT live mode:
- omitting
--serverstarts from the built-in EMWIN list and allows automatic server-list updates - providing
--serverpins the runtime to that list and disables automatic server-list load/save/update behavior --serverand--server-list-pathcannot be combined
Environment and .env support:
.envfrom the current working directory is loaded before CLI parsing.- CLI args override process env; process env overrides
.env. - Useful variables include
EMWIN_RECEIVER,EMWIN_USERNAME,EMWIN_PASSWORD,EMWIN_SERVER,EMWIN_SERVER_LIST_PATH,EMWIN_OUTPUT_DIR,EMWIN_PERSIST_DATABASE_URL,EMWIN_OPENAPI_AUTH_TOKEN,EMWIN_APPRISE_API_URL,EMWIN_ALERT_SOURCE_BATCH_SIZE,EMWIN_ALERT_DELIVERY_BATCH_SIZE,EMWIN_ALERT_IDLE_POLL_SECS,EMWIN_ALERT_SOURCE_CLAIM_LEASE_SECS,EMWIN_ALERT_DELIVERY_CLAIM_LEASE_SECS,EMWIN_ALERT_HTTP_TIMEOUT_SECS,EMWIN_ALERT_MAX_DELIVERY_ATTEMPTS,EMWIN_MAX_EVENTS,EMWIN_IDLE_TIMEOUT_SECS,EMWIN_BIND,EMWIN_CORS_ORIGIN,EMWIN_MAX_CLIENTS,EMWIN_STATS_INTERVAL_SECS,EMWIN_FILE_RETENTION_SECS,EMWIN_MAX_RETAINED_FILES,EMWIN_QUIET, andEMWIN_POST_PROCESS_ARCHIVES.
Alert worker mode:
cargo run -p emwin-cli -- alert-worker --database-url postgres://localhost/emwin
cargo run -p emwin-cli -- alert-worker --database-url postgres://localhost/emwin --apprise-api-url http://127.0.0.1:8000
cargo run -p emwin-cli -- alert-worker --database-url postgres://localhost/emwin --source-claim-lease-secs 300 --delivery-claim-lease-secs 300 --http-timeout-secs 30EMWIN_MAX_DB_CONNECTIONSoverrides the default Postgres pool size used byserverwhen archive persistence is enabled.queryusesEMWIN_DATABASE_URLfor direct archive reads from Postgres-backed metadata.- When
EMWIN_OUTPUT_DIRuses an object-store URL such ass3://bucket/prefixorhttps://example.com/path,emwin-dbbuilds the backend throughobject_storeusing environment variables for that scheme. S3-compatible targets still use the AWS environment variables shown above. The target must already exist; automatic bucket/container creation is no longer attempted. - Filters are CLI-only and are not loaded from environment variables.
Relay mode (raw TCP passthrough + metrics):
cargo run -p emwin-cli -- relay --username you@example.comUseful relay flags:
--bind 0.0.0.0:2211(downstream client listener)--max-clients 100(connection cap; over-capacity clients receive server-list frame then disconnect)--auth-timeout-secs 720(downstream re-authentication window)--client-buffer-bytes 65536(per-client backpressure budget)--metrics-bind 127.0.0.1:9090(metrics listener)
Relay endpoints:
GET /health- relay health summaryGET /metrics- relay telemetry snapshot (connections, auth, buffering, and quality state)