From df30ded40ad4bcad5dbe1438b97d1822d3f30f5c Mon Sep 17 00:00:00 2001 From: Gabor Szabo Date: Tue, 19 May 2026 06:21:46 +0200 Subject: [PATCH 1/2] docs: add PRP-25 forecastops control center full plan (#219) --- .../PRP-25-forecastops-control-center-full.md | 827 ++++++++++++++++++ 1 file changed, 827 insertions(+) create mode 100644 PRPs/PRP-25-forecastops-control-center-full.md diff --git a/PRPs/PRP-25-forecastops-control-center-full.md b/PRPs/PRP-25-forecastops-control-center-full.md new file mode 100644 index 00000000..62bd1bf3 --- /dev/null +++ b/PRPs/PRP-25-forecastops-control-center-full.md @@ -0,0 +1,827 @@ +name: "PRP-25 — ForecastOps Control Center (Full Version)" +description: | + Context-rich PRP that takes the ForecastOps Control Center from its PRP-24 MVP + to the "Full Version" of docs/optional-features/02-forecastops-control-center.md: + model-health + performance-drift indicators, an exportable incident report, and + an operator action layer (bulk retrain + promote-to-alias). Phased so each phase + is independently shippable and one-pass implementable. + +## Purpose + +PRP-24 shipped the Control Center MVP — a read-only `app/features/ops/` slice +(`GET /ops/summary`, `GET /ops/retraining-candidates`) and a `/ops` page. This PRP +delivers the remaining "Full Version" capabilities from the feature brief: + +- **Phase A — Model Health & Drift**: a new `GET /ops/model-health` endpoint that + classifies forecast-error *performance drift* per `(store, product)` from run + history, plus a Model Health section on `/ops`. +- **Phase B — Incident Report Export**: client-side CSV + Markdown export of the + operational snapshot. +- **Phase C — Action Layer**: operator bulk-retrain (multi-select the retraining + queue → fan out to `POST /jobs`) and promote-to-alias (`POST /registry/aliases`), + both behind a confirmation dialog. + +--- + +## DEPENDS ON — read before starting + +This PRP **builds directly on PRP-24** (`PRPs/PRP-24-forecastops-control-center.md`, +issue #217, PR #218). It modifies files PRP-24 created. **PR #218 is merged to +`dev`** (dev tip `aac7735 Merge pull request #218`), so the dependency gate is +satisfied — cut this PRP's branch from `dev`. Sanity-check before starting: if +`app/features/ops/service.py` does not already define `OpsService`, +`extract_wape`, and `score_retraining_candidate`, stop — the dependency is missing. + +--- + +## Goal + +Ship the Full Version of the Control Center as three independently-validatable +phases on top of the PRP-24 slice: + +- **Backend** — one new read-only endpoint (`GET /ops/model-health`). The `ops` + slice stays read-only: **no new table, no migration, no new mutating endpoint.** +- **Frontend** — a Model Health section, an incident-report export control, and an + action layer on the Retraining Queue, all on the existing `/ops` page. + +End state: `docker compose up` → seed → open `/ops` → operator sees drift signals, +can export an incident report, and can trigger retraining / promote a model — +without leaving the page. + +## Why + +- **User value** — the MVP shows *what* needs attention; the Full Version lets the + operator *understand why* (drift), *communicate it* (export), and *act on it* + (retrain / promote) in one place. This is the "operator workflow" the feature + brief calls for (`docs/optional-features/02-forecastops-control-center.md`). +- **Demo value** — a closed-loop ForecastOps story (observe → diagnose → act) + instead of a read-only dashboard. +- **Integration** — Phase A extends the `ops` slice's existing aggregation + pattern; Phases B & C are frontend layers over **already-shipped endpoints** + (`POST /jobs`, `POST /registry/aliases`) — no new backend mutation surface. + +## What + +### User-visible behavior + +On the existing `/ops` page, the Full Version adds: + +1. **Model Health section** (Phase A) — a table of `(store, product)` grains, each + showing its forecast-error (WAPE) history, a **drift badge** + (`improving` / `stable` / `degrading` / `unknown`), and the WAPE delta. Backed + by `GET /ops/model-health`. +2. **Export control** (Phase B) — an "Export report" button in the page header + offering **CSV** (attention items) and **Markdown** (full incident report) + downloads, generated entirely client-side. +3. **Action layer** (Phase C) — the Retraining Queue gains row checkboxes and a + **"Retrain selected (N)"** button; each Model Health / candidate row gains a + **"Promote to alias"** action. Both open a confirmation dialog, then call the + existing job / alias endpoints, reporting per-item success/failure via toasts. + +### Technical requirements + +- Phase A: extend `app/features/ops/{schemas,service,routes,__init__}.py` and the + slice's tests. Server-side SQL, `mypy --strict` + `pyright --strict` clean, + RFC 7807 errors, Pydantic v2 response models (`from_attributes=True`, no + `strict=True`). +- Phases B & C: **frontend only** — new pure util modules (+ vitest), new TanStack + Query hooks reusing existing ones, page wiring. No backend changes. +- No new external dependency, no new table, no Alembic migration. + +### Success Criteria + +- [ ] `GET /ops/model-health?limit=` → 200 with `entries` (each carrying + `drift_direction`, `latest_wape`, `wape_delta`, `wape_history`), sorted with + `degrading` grains first; `422` when `limit` is outside `[1, 100]`. +- [ ] `GET /ops/model-health` → 200 (never 500) on an empty database. +- [ ] `classify_drift` is a pure, unit-tested function that never raises on + missing / sparse WAPE history. +- [ ] `/ops` renders a Model Health section with drift badges and a delta. +- [ ] An "Export report" control downloads a valid CSV and a Markdown report + built entirely client-side from already-loaded data. +- [ ] The Retraining Queue supports multi-select; "Retrain selected" opens a + confirm dialog and creates one `train` job per selected grain via + `POST /jobs`, reporting per-item outcome. +- [ ] A "Promote to alias" action creates/updates an alias via + `POST /registry/aliases` behind a confirm dialog. +- [ ] The `ops` backend slice remains read-only — no new `models.py`, no + migration, no new mutating endpoint. +- [ ] All gates pass: `ruff`, `mypy --strict`, `pyright --strict`, `pytest` + (unit + integration), frontend `tsc` + `lint` + `test`. + +--- + +## All Needed Context + +### DECISIONS LOCKED (resolved during planning — do NOT re-litigate) + +1. **The `ops` backend stays read-only.** Phase C's "action layer" is a + *frontend* feature: the `/ops` page calls the **existing, already-sanctioned** + `POST /jobs` and `POST /registry/aliases` endpoints. No `POST /ops/*` mutating + endpoint is added; the slice keeps no `models.py` and ships no migration. This + keeps the PRP-24 "read-only slice" invariant intact. The mild tension — the + `/ops` *page* becomes an action launcher — is **accepted** (the Forecast page + already triggers train jobs the same way) and **MUST be noted in the PR + description** per `.claude/rules/product-vision.md`. + +2. **Drift = performance drift, not data drift.** Phase A classifies the **trend + of forecast error (WAPE) across a grain's successful-run history** — a + performance-based signal computable read-only from `model_run.metrics`. True + *data drift* (input-feature distribution shift, PSI/KL tests) is **OUT OF + SCOPE**: featuresets are computed in-memory and never persisted, so there are + no feature snapshots to compare. Do NOT add drift infrastructure. + +3. **WebSocket live job updates — DECLINED, do NOT build.** The feature brief's + Full Version lists "WebSocket updates for running jobs". This conflicts with + the `.claude/rules/product-vision.md` guardrail *"Not a real-time streaming + system … the agent WebSocket is for response streaming only."* PRP-24 already + polls `GET /ops/summary` every 15 s and `use-jobs.ts` polls every 5 s — live + job state is already covered. Keeping polling is the deliberate decision; a WS + would be a new streaming surface for no real gain. If a WS is ever wanted it is + a **separate PRP** mirroring the `demo/stream` pattern — not this one. + +4. **Retraining scoring is unchanged.** PRP-24's `score_retraining_candidate` + (60% staleness / 40% WAPE) is locked. Phase A adds drift as a *separate* + `/ops/model-health` signal; it does NOT fold drift into the retraining score. + +5. **No new dependencies.** Recharts, shadcn `Checkbox`, `AlertDialog`, and + `sonner` are already installed (`frontend/package.json`, + `frontend/src/components/ui/{chart,checkbox,alert-dialog,sonner}.tsx`). Reuse + them; do not `pnpm add` anything. + +### Documentation & References + +```yaml +# MUST READ — the PRP-24 slice this PRP extends (already on dev after #218) +- file: PRPs/PRP-24-forecastops-control-center.md + why: The MVP PRP. Its "Known Gotchas" section applies verbatim here — status + columns are String (compare `Enum.X.value`); `datetime.now(UTC)` (ruff DTZ); + DISTINCT ON order_by must lead with the distinct cols; AsyncSession forbids + lazy-loading; response models use ConfigDict(from_attributes=True), NEVER + strict=True; do NOT add a `# noqa: BLE001` (BLE is not in the ruff select). +- file: app/features/ops/service.py + why: EXTEND THIS. Already defines `extract_wape(metrics)` — REUSE it, do not + redefine. `OpsService.get_retraining_candidates` is the near-exact mirror + for the new `get_model_health` (DISTINCT ON vs. full-history difference + noted in Gotchas). Module-scope pure helpers (`score_retraining_candidate`) + are the pattern for the new `classify_drift`. +- file: app/features/ops/schemas.py + why: EXTEND THIS. `RetrainingCandidate` / `RetrainingCandidatesResponse` are the + exact shape to mirror for `ModelHealthEntry` / `ModelHealthResponse`. +- file: app/features/ops/routes.py + why: EXTEND THIS. `get_retraining_candidates` is the exact mirror for the new + `get_model_health` route — same `Query(default=20, ge=1, le=100)` bound. +- file: app/features/ops/__init__.py + why: EXTEND `__all__` with the new response model. +- file: app/features/ops/tests/conftest.py + why: EXTEND. `sample_runs` already creates two success runs for grain + (9001, 8001) — that grain already has a 2-point WAPE history (31.0 → 12.0, + i.e. `improving`). Add a third run if a `degrading` case is wanted. +- file: app/features/ops/tests/{test_service,test_schemas,test_routes_integration}.py + why: EXTEND. `test_service.py` is the pattern for pure-function tests of + `classify_drift`; `test_routes_integration.py` for the new endpoint. +- file: app/features/registry/models.py + why: `ModelRun.metrics` (JSONB, nullable), `status`, `store_id`, `product_id`, + `created_at`, `run_id`. `RunStatus.SUCCESS`. + +# MUST READ — endpoints Phase C reuses (no backend changes — frontend calls these) +- file: app/features/jobs/routes.py + why: `POST /jobs` (202) creates+executes a job. Body is `JobCreate` + {job_type, params}. The train-job `params` contract is documented in the + route docstring — model_type, store_id, product_id, start_date, end_date. +- file: app/features/jobs/schemas.py + why: `JobCreate` = {job_type: JobType, params: dict}. `JobResponse` shape. +- file: app/features/jobs/service.py + why: VERIFY the exact train-job `params` keys `JobService` consumes BEFORE + writing `buildRetrainJob` — this is Phase C's main risk. +- file: app/features/forecasting/schemas.py + why: `ModelConfig` (discriminated union on `model_type`) — the shape of + `model_run.model_config`; tells you what to flatten into the retrain params. +- file: app/features/registry/routes.py + why: `POST /registry/aliases` (201) body `AliasCreate` {alias_name, run_id, + description?}; aliases only point at SUCCESS runs (400 otherwise). +- file: app/features/registry/schemas.py + why: `AliasCreate`, `AliasResponse`, `RunResponse` field names. + +# MUST READ — frontend patterns +- file: frontend/src/pages/ops.tsx + why: MODIFY THIS in every phase. PRP-24's page — header, error/loading/empty + early returns, Card/Table sections, `@/` imports. +- file: frontend/src/hooks/use-ops.ts + why: EXTEND. `useOpsSummary` / `useRetrainingCandidates` are the exact pattern + for `useModelHealth`. +- file: frontend/src/hooks/use-jobs.ts + why: `useCreateJob()` — REUSE for Phase C bulk retrain. Do not write a new + job-creation hook. +- file: frontend/src/hooks/use-runs.ts + why: `useCreateAlias()` — REUSE for Phase C promote. `useRun(runId)` fetches a + run's detail (needed to clone model_config for a retrain). +- file: frontend/src/lib/csv-export.ts + why: `toCsv` / `downloadCsv` / `CsvColumn` — REUSE for Phase B CSV export. + Already CSV-injection-safe. +- file: frontend/src/lib/ops-utils.ts + ops-utils.test.ts + why: PRP-24's pure util module + colocated vitest test — the exact pattern for + the new `incident-report.ts` and `ops-actions.ts` modules. +- file: frontend/src/pages/visualize/demand.tsx + why: A dense data page that already does CSV export (`downloadCsv`/`toCsv`) and + row interaction — mirror its export-button placement and table patterns. +- file: frontend/src/components/ui/checkbox.tsx + why: shadcn Checkbox — Phase C row selection. Already installed. +- file: frontend/src/components/ui/alert-dialog.tsx + why: shadcn AlertDialog — Phase C confirm dialogs. Already installed. +- file: frontend/src/components/ui/sonner.tsx + why: `sonner` toast. VERIFY a `` is mounted (app-shell / main) before + calling `toast()`; if not, mount it once in the app shell. +- file: frontend/src/components/ui/chart.tsx + why: shadcn Recharts wrapper — optional WAPE sparkline in Model Health. +- file: frontend/src/components/charts/time-series-chart.tsx + why: existing Recharts usage pattern if a sparkline is added. +- file: frontend/src/types/api.ts + why: EXTEND. The `Ops*` interfaces PRP-24 added (`OpsSummaryResponse`, + `RetrainingCandidate`, …) are the mirror for `ModelHealth*`. + +# External docs +- url: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html#order-by + why: ordering the full run history per grain (NOT DISTINCT ON — see Gotchas). +- url: https://tanstack.com/query/latest/docs/framework/react/guides/mutations + why: reusing `useMutation` (useCreateJob/useCreateAlias) for the action layer. +- url: https://ui.shadcn.com/docs/components/alert-dialog + why: AlertDialog composition for the confirm gates. +- url: https://recharts.org/en-US/api/LineChart + why: minimal WAPE sparkline (optional Phase A polish). +- url: https://www.mlflow.org/docs/latest/ml/model-registry/ + why: alias-promotion governance — the conceptual basis for Phase C promote. + +- docfile: docs/optional-features/02-forecastops-control-center.md + why: the feature brief — § "Full Version" (lines 76-83) is exactly this PRP's + scope, minus retraining scoring (done in PRP-24) and WebSocket (declined). +``` + +### Current Codebase tree (post-PRP-24, relevant subset) + +```bash +app/features/ops/ # read-only slice from PRP-24 +├── __init__.py schemas.py service.py routes.py +└── tests/ (__init__.py conftest.py test_schemas.py test_service.py + test_routes_integration.py) +frontend/src/ +├── pages/ops.tsx # PRP-24 Control Center page (5 sections) +├── hooks/use-ops.ts # useOpsSummary, useRetrainingCandidates +├── lib/ops-utils.ts (+ .test.ts) # pure helpers +├── types/api.ts # Ops* response interfaces +├── hooks/use-jobs.ts # useCreateJob (REUSE) +├── hooks/use-runs.ts # useCreateAlias, useRun (REUSE) +└── lib/csv-export.ts # toCsv, downloadCsv (REUSE) +``` + +### Desired Codebase tree (files to add / touch) + +```bash +# ── Phase A — Model Health & Drift ── +app/features/ops/schemas.py # MODIFY — add WapePoint, ModelHealthEntry, + # ModelHealthResponse +app/features/ops/service.py # MODIFY — add classify_drift() + get_model_health() +app/features/ops/routes.py # MODIFY — add GET /ops/model-health +app/features/ops/__init__.py # MODIFY — export ModelHealthResponse +app/features/ops/tests/test_schemas.py # MODIFY — new models +app/features/ops/tests/test_service.py # MODIFY — classify_drift +app/features/ops/tests/conftest.py # MODIFY — degrading-grain fixture +app/features/ops/tests/test_routes_integration.py # MODIFY — /ops/model-health +frontend/src/types/api.ts # MODIFY — ModelHealth* interfaces +frontend/src/hooks/use-ops.ts # MODIFY — useModelHealth +frontend/src/pages/ops.tsx # MODIFY — Model Health section + +# ── Phase B — Incident Report Export ── +frontend/src/lib/incident-report.ts # NEW — pure builders (CSV cols + markdown) +frontend/src/lib/incident-report.test.ts # NEW — vitest +frontend/src/pages/ops.tsx # MODIFY — export control in header + +# ── Phase C — Action Layer ── +frontend/src/lib/ops-actions.ts # NEW — pure buildRetrainJob() +frontend/src/lib/ops-actions.test.ts # NEW — vitest +frontend/src/pages/ops.tsx # MODIFY — selection, dialogs, actions + +# NOT created: any app/features/ops/models.py, any Alembic migration, +# any POST /ops/* endpoint, any WebSocket. +``` + +### Known Gotchas & Library Quirks + +```python +# CRITICAL: ALL PRP-24 gotchas still apply. Re-read PRP-24 § "Known Gotchas". +# Headlines: compare String status columns against `Enum.X.value`; use +# `datetime.now(UTC)` (ruff DTZ bans date.today()/naive now()); response models +# use ConfigDict(from_attributes=True) and NEVER strict=True; never add a +# `# noqa: BLE001` (BLE is not in the ruff select — it would trip RUF100). +# +# CRITICAL (Phase A): model-health needs the FULL run history per grain, NOT the +# latest-per-grain. Do NOT use DISTINCT ON here. Query every SUCCESS run, ordered +# by (store_id, product_id, created_at ASC), and group in Python: +# select(ModelRun).where(ModelRun.status == RunStatus.SUCCESS.value) +# .order_by(ModelRun.store_id, ModelRun.product_id, ModelRun.created_at) +# Then itertools.groupby over (store_id, product_id) — rows are already ordered. +# +# CRITICAL (Phase A): REUSE the existing `extract_wape` from ops/service.py — it +# already tolerates None / non-numeric / bool. Do NOT redefine it. WAPE history +# will contain None entries (runs whose metrics lack WAPE); classify_drift MUST +# tolerate a list with None gaps and never raise. +# +# CRITICAL (Phase C): POST /jobs executes SYNCHRONOUSLY (returns 202 but runs the +# job inline before responding). Bulk-retrain of N grains = N blocking calls on +# a single-process backend. Fire them SEQUENTIALLY (await each before the next), +# show per-item progress, and keep N modest. Do NOT Promise.all() them. +# +# CRITICAL (Phase C): a train job's `params` shape is consumed by JobService — +# VERIFY the exact keys in app/features/jobs/service.py + forecasting/schemas.py +# BEFORE writing buildRetrainJob(). `_execute_train` in jobs/service.py reads a +# FLAT params dict: model_type, store_id, product_id, start_date, end_date, plus +# model-specific keys `season_length` (seasonal_naive) / `window_size` +# (moving_average) — there is NO `period` key. Pick those keys explicitly from +# the source run's `model_config`; do NOT blind-spread `model_config` (it also +# carries `schema_version` + a duplicate `model_type`). +# +# CRITICAL (Phase C): aliases may only point at SUCCESS runs (registry returns 400 +# otherwise). Only offer "Promote to alias" on rows whose run status is success. +# +# GOTCHA (Phase C): retrain window — clone the source run's `model_type` + +# `model_config`; set start_date = source run `data_window_start`, end_date = +# `summary.freshness.latest_sales_date` (the freshest data). If latest_sales_date +# is null, fall back to the run's own data_window_end and surface a warning. +# +# GOTCHA: `sonner` `toast()` needs a mounted ``. It is ALREADY mounted — +# `frontend/src/components/layout/app-shell.tsx` renders `` from +# `@/components/ui/sonner`. Task C4 is verification-only; do NOT add a second one. +# +# GOTCHA: commit-format scope allow-list has NO `ops` scope. Use feat(api) for the +# backend phase, feat(ui) for the frontend phases. +# +# GOTCHA: the /ops page renders inside AppShell — no nav/container/Toaster added by +# the page; semantic shadcn tokens only, never raw colors. +``` + +--- + +## External Research Findings + +Verified May 2026. Each finding ends with a **verdict**. + +### 1. Performance-drift vs. data-drift triggers (web search, May 2026) + +MLOps practice splits retraining triggers into **performance-based** (monitor a +core error metric; retrain when it degrades past a threshold) and **drift-based** +(statistical tests — PSI, KL divergence — on input/target distributions). 2025 +reviews report models left unmonitored for 6+ months saw error rates rise ~35%, +and that proactive performance-trigger policies outperform reactive ones. + +- **Verdict — Phase A is a performance-drift indicator.** It tracks the WAPE trend + across a grain's run history and classifies `improving / stable / degrading`. + This is exactly the recommended performance-based signal and needs **no new + infrastructure** — it reads `model_run.metrics`. PSI/KL **data drift** is + correctly out of scope (no persisted feature snapshots; see Decision #2). +- Sources: [What Is Model Drift?](https://www.articsledge.com/post/model-drift), + [Advanced ML Model Monitoring](https://enhancedmlops.com/advanced-ml-model-monitoring-drift-detection-explainability-and-automated-retraining/), + [MLOps Model Monitoring](https://durapid.com/blog/mlops-model-monitoring-how-to-track-model-drift-and-performance-in-production/). + +### 2. Drift threshold (heuristic) + +There is no universal "drift threshold"; teams pick a relative tolerance band. +A ±10% relative change in the error metric is a defensible, deterministic default +for a portfolio system. + +- **Verdict — applied.** `classify_drift` uses a ±10% relative band: latest WAPE + vs. the mean of prior WAPEs. `degrading` if latest is >10% worse, `improving` if + >10% better, `stable` within the band, `unknown` if fewer than two numeric WAPEs. + +### 3. Alias-promotion governance (MLflow) + +MLflow models alias promotion (`champion` / production alias) as a deliberate, +human-gated step decoupling deployment from a specific version. + +- **Verdict — confirms Phase C.** "Promote to alias" reuses `POST /registry/aliases` + behind a confirmation `AlertDialog` — the human gate. No new backend gate is + needed; the registry already restricts aliases to SUCCESS runs. +- Source: [MLflow Model Registry](https://www.mlflow.org/docs/latest/ml/model-registry/). + +### 4. WebSocket job updates — assessed and DECLINED + +The feature brief lists WS job updates under "Full Version". `product-vision.md` +forbids new streaming surfaces ("Not a real-time streaming system"). PRP-24's +`/ops/summary` 15 s poll + `use-jobs.ts` 5 s poll already deliver live job state. + +- **Verdict — do NOT build (Decision #3).** Polling is the deliberate choice. + +--- + +## Implementation Blueprint + +### Phase A — data models (`app/features/ops/schemas.py`, additions) + +All response models — `ConfigDict(from_attributes=True)`, every field a +`Field(..., description=...)`, counts `ge=0`, **no `strict=True`**. + +```python +from typing import Literal +DriftDirection = Literal["improving", "stable", "degrading", "unknown"] + +class WapePoint(BaseModel): # one run's WAPE observation + run_id: str + created_at: datetime + wape: float | None # None when the run lacks WAPE + +class ModelHealthEntry(BaseModel): + store_id: int + product_id: int + run_count: int = Field(..., ge=0) + latest_run_id: str | None + latest_run_status: str | None + latest_wape: float | None + previous_wape: float | None # the prior numeric WAPE + wape_delta: float | None # latest - previous (numeric only) + drift_direction: DriftDirection + last_trained_at: datetime | None + staleness_days: int = Field(..., ge=0) + wape_history: list[WapePoint] # chronological, may hold gaps + +class ModelHealthResponse(BaseModel): + entries: list[ModelHealthEntry] # degrading-first sort + total_evaluated: int = Field(..., ge=0) + generated_at: datetime +``` + +### Phase A — pseudocode (`app/features/ops/service.py`, additions) + +```python +# ── module-scope pure helper (mirror of score_retraining_candidate) ── +_DRIFT_BAND = 0.10 # ±10% relative WAPE change + +def classify_drift(wape_history: list[float | None]) -> tuple[str, float | None]: + """Classify the WAPE trend. Pure; never raises. Returns (direction, delta). + direction ∈ improving|stable|degrading|unknown; delta = latest - previous.""" + numeric = [w for w in wape_history if w is not None] + if len(numeric) < 2: + return "unknown", None + latest = numeric[-1] + prior = numeric[:-1] + baseline = sum(prior) / len(prior) + delta = round(latest - prior[-1], 4) + if baseline <= 0: # avoid div-by-zero on a 0 WAPE + return ("degrading" if latest > 0 else "stable"), delta + rel = (latest - baseline) / baseline + if rel > _DRIFT_BAND: + return "degrading", delta + if rel < -_DRIFT_BAND: + return "improving", delta + return "stable", delta + +# ── OpsService.get_model_health ── +async def get_model_health(self, db, limit: int) -> ModelHealthResponse: + today = datetime.now(UTC).date() + # FULL history — NOT DISTINCT ON. Ordered so itertools.groupby works. + runs = (await db.execute( + select(ModelRun).where(ModelRun.status == RunStatus.SUCCESS.value) + .order_by(ModelRun.store_id, ModelRun.product_id, ModelRun.created_at) + )).scalars().all() + entries = [] + for (store_id, product_id), grain_runs in groupby(runs, key=lambda r: (r.store_id, r.product_id)): + grain_runs = list(grain_runs) # already chronological + history = [WapePoint(run_id=r.run_id, created_at=r.created_at, + wape=extract_wape(r.metrics)) for r in grain_runs] + direction, delta = classify_drift([p.wape for p in history]) + numeric = [p.wape for p in history if p.wape is not None] + latest_run = grain_runs[-1] + entries.append(ModelHealthEntry( + store_id=store_id, product_id=product_id, run_count=len(grain_runs), + latest_run_id=latest_run.run_id, latest_run_status=latest_run.status, + latest_wape=(numeric[-1] if numeric else None), + previous_wape=(numeric[-2] if len(numeric) > 1 else None), + wape_delta=delta, drift_direction=direction, + last_trained_at=latest_run.created_at, + staleness_days=max((today - latest_run.data_window_end).days, 0), + wape_history=history)) + # degrading first, then by |wape_delta| desc; unknown/stable last + _rank = {"degrading": 0, "improving": 1, "stable": 2, "unknown": 3} + entries.sort(key=lambda e: (_rank[e.drift_direction], -abs(e.wape_delta or 0.0))) + logger.info("ops.model_health_computed", grains=len(entries)) + return ModelHealthResponse(entries=entries[:limit], total_evaluated=len(entries), + generated_at=datetime.now(UTC)) +``` + +```python +# ── app/features/ops/routes.py (add; mirror get_retraining_candidates) ── +@router.get("/model-health", response_model=ModelHealthResponse, + summary="Per-(store,product) forecast-error health and drift") +async def get_model_health( + limit: int = Query(default=20, ge=1, le=100, description="Max grains to return"), + db: AsyncSession = Depends(get_db), +) -> ModelHealthResponse: + return await OpsService().get_model_health(db, limit) +``` + +### Phase B — pseudocode (`frontend/src/lib/incident-report.ts`, NEW, pure) + +```typescript +import type { CsvColumn } from '@/lib/csv-export' +import type { AttentionItem, OpsSummaryResponse, RetrainingCandidate } from '@/types/api' + +// CSV column set for the attention-items export (reuse toCsv/downloadCsv). +export const attentionCsvColumns: CsvColumn[] = [ + { key: 'item_type', header: 'Type' }, { key: 'entity_id', header: 'Entity' }, + { key: 'label', header: 'Item' }, { key: 'detail', header: 'Detail' }, + { key: 'occurred_at', header: 'When' }, +] + +// Build a human-readable Markdown incident report from already-loaded page data. +export function buildIncidentMarkdown( + summary: OpsSummaryResponse, candidates: RetrainingCandidate[], +): string { + // Sections: # ForecastOps Incident Report (generated_at) ; System Health + // (api/db + provider lines) ; KPIs (active/failed jobs, success rate, stale + // aliases) ; Data Freshness ; Needs Attention (a markdown table) ; Top + // Retraining Candidates (a markdown table). Pure string assembly — no fetch. + // Return the assembled string. +} +``` + +### Phase C — pseudocode (`frontend/src/lib/ops-actions.ts`, NEW, pure) + +```typescript +import type { JobCreate, ModelRun, RetrainingCandidate } from '@/types/api' + +// Build the POST /jobs body that retrains a grain from its latest run. +// VERIFY param keys against app/features/jobs/service.py before finalizing. +export function buildRetrainJob( + run: ModelRun, // GET /registry/runs/{latest_run_id} + latestSalesDate: string | null, // summary.freshness.latest_sales_date +): JobCreate { + return { + job_type: 'train', + params: { + model_type: run.model_type, + store_id: run.store_id, + product_id: run.product_id, + start_date: run.data_window_start, + end_date: latestSalesDate ?? run.data_window_end, // freshest data + // model-specific keys picked explicitly — NOT a blind ...model_config spread: + ...(run.model_config.season_length != null + ? { season_length: run.model_config.season_length } : {}), + ...(run.model_config.window_size != null + ? { window_size: run.model_config.window_size } : {}), + }, + } +} +``` + +Page wiring (`ops.tsx`): Retraining Queue rows get a `Checkbox`; selection is +`useState>`. "Retrain selected (N)" opens an `AlertDialog`; on confirm, +**sequentially** for each selected candidate: `useRun`-fetch its `latest_run_id`, +`buildRetrainJob(...)`, `useCreateJob().mutateAsync(...)`, `toast` the outcome. +"Promote to alias" (on success-status rows only) opens an `AlertDialog` with an +alias-name input, then `useCreateAlias().mutateAsync({alias_name, run_id})`. + +### Tasks (in order) + +```yaml +# ════════ PHASE A — Model Health & Drift (backend + page section) ════════ +Task A1 — MODIFY app/features/ops/schemas.py: + - ADD DriftDirection Literal, WapePoint, ModelHealthEntry, ModelHealthResponse + - VALIDATE: uv run python -c "from app.features.ops.schemas import ModelHealthResponse; print('ok')" + +Task A2 — MODIFY app/features/ops/service.py: + - ADD module-scope `classify_drift` (pseudocode above); import `groupby` from itertools + - ADD `OpsService.get_model_health` (full-history query — NOT DISTINCT ON; reuse extract_wape) + - VALIDATE: uv run mypy app/features/ops/ && uv run pyright app/features/ops/ + +Task A3 — MODIFY app/features/ops/routes.py + __init__.py: + - ADD GET /ops/model-health (mirror get_retraining_candidates); export ModelHealthResponse + - VALIDATE: uv run python -c "from app.main import app; assert '/ops/model-health' in {r.path for r in app.routes}; print('wired')" + +Task A4 — MODIFY ops tests (test_schemas.py, test_service.py, conftest.py): + - test_service.py: classify_drift cases — <2 numeric → unknown; degrading; + improving; stable within band; None-gap tolerance; zero-baseline guard + - test_schemas.py: construct ModelHealthEntry/Response; ge=0 rejects negatives + - conftest.py: extend sample_runs (or a new fixture) so one grain has a + degrading 3-point WAPE history + - VALIDATE: uv run pytest -v -m "not integration" app/features/ops/tests/test_service.py app/features/ops/tests/test_schemas.py + +Task A5 — MODIFY app/features/ops/tests/test_routes_integration.py: + - /ops/model-health 200 happy (seeded), entries carry drift_direction; + 200 resilient (empty); ?limit=0 → 422; ?limit=200 → 422; degrading-first sort + - VALIDATE: docker compose up -d && uv run pytest -v -m integration app/features/ops/ + +Task A6 — MODIFY frontend/src/types/api.ts: + - ADD DriftDirection, WapePoint, ModelHealthEntry, ModelHealthResponse (dates as string) + - VALIDATE: cd frontend && pnpm tsc --noEmit + +Task A7 — MODIFY frontend/src/hooks/use-ops.ts: + - ADD useModelHealth(limit=20, enabled=true) — queryKey ['ops','model-health',limit]; + no refetchInterval (slow-moving). MIRROR useRetrainingCandidates. + - VALIDATE: cd frontend && pnpm tsc --noEmit + +Task A8 — MODIFY frontend/src/pages/ops.tsx: + - ADD a "Model Health" Card+Table section: grain, drift StatusBadge + (degrading→error, improving→success, stable→info, unknown→default), + latest WAPE, wape_delta, run_count. Optional: a Recharts sparkline of + wape_history. GOTCHA: renders inside AppShell — no raw colors. + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +# ════════ PHASE B — Incident Report Export (frontend only) ════════ +Task B1 — CREATE frontend/src/lib/incident-report.ts + incident-report.test.ts: + - attentionCsvColumns + buildIncidentMarkdown (pure; pseudocode above) + - MIRROR: csv-export.ts + ops-utils.test.ts + - VALIDATE: cd frontend && pnpm test --run src/lib/incident-report.test.ts + +Task B2 — MODIFY frontend/src/pages/ops.tsx: + - ADD an "Export report" control in the page header — a dropdown (or two + buttons): "CSV (attention items)" → downloadCsv(toCsv(...)); "Markdown + report" → download buildIncidentMarkdown(...) as ops-incident-report.md + - MIRROR: demand.tsx export button + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +# ════════ PHASE C — Action Layer (frontend only) ════════ +Task C0 — RESEARCH (no code): read app/features/jobs/service.py + + app/features/forecasting/schemas.py — confirm the exact train-job `params` + keys. Adjust buildRetrainJob accordingly. This de-risks the whole phase. + +Task C1 — CREATE frontend/src/lib/ops-actions.ts + ops-actions.test.ts: + - buildRetrainJob(run, latestSalesDate) (pseudocode above) + - VALIDATE: cd frontend && pnpm test --run src/lib/ops-actions.test.ts + +Task C2 — MODIFY frontend/src/pages/ops.tsx — bulk retrain: + - Retraining Queue rows get a shadcn Checkbox; selection via useState + - "Retrain selected (N)" → AlertDialog confirm → SEQUENTIALLY per candidate: + fetch run (useRun / api), buildRetrainJob, useCreateJob().mutateAsync, toast + - GOTCHA: sequential awaits, not Promise.all (POST /jobs runs synchronously) + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +Task C3 — MODIFY frontend/src/pages/ops.tsx — promote to alias: + - "Promote to alias" action on success-status rows → AlertDialog with an + alias-name input → useCreateAlias().mutateAsync({alias_name, run_id}) → toast + - GOTCHA: only success runs are promotable (registry returns 400 otherwise) + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +Task C4 — VERIFY the in frontend/src/components/layout/app-shell.tsx + is mounted (it already is). Verification-only — do NOT add a second Toaster. + - VALIDATE: cd frontend && pnpm tsc --noEmit + +# ════════ FINAL ════════ +Task D1 — FULL validation sweep (all gates — see Validation Loop). +Task D2 — Browser dogfood per .claude/rules/ui-design.md (webapp-testing / + agent-browser): Model Health renders with drift badges; export downloads a + valid CSV + Markdown; bulk-retrain creates jobs (verify on /explorer/jobs); + promote creates an alias (verify on /ops summary aliases). +``` + +### Integration Points + +```yaml +DATABASE: + - migration: NONE — Phase A is a read-only query; Phases B/C touch no schema. + - tables read (existing): model_run (Phase A). + +ROUTES (backend): + - add to app/features/ops/routes.py: GET /ops/model-health + - already wired: app/main.py includes ops_router (PRP-24) — no main.py change. + +ROUTES (frontend): + - none — no new page, no new route; all work lands on the existing /ops page. + +HOOKS: + - new: useModelHealth in frontend/src/hooks/use-ops.ts + - reuse: useCreateJob (use-jobs.ts), useCreateAlias + useRun (use-runs.ts) + +CONFIG: none — no new settings, no new env var, no new dependency. +``` + +--- + +## Validation Loop + +### Level 1: Syntax & Style + +```bash +uv run ruff check . --fix && uv run ruff format --check . +cd frontend && pnpm lint +# Trap: date.today() → ruff DTZ; a stray `# noqa: BLE001` → RUF100. +``` + +### Level 2: Type Checks + +```bash +uv run mypy app/ && uv run pyright app/ # both --strict +cd frontend && pnpm tsc --noEmit +``` + +### Level 3: Unit Tests + +```bash +uv run pytest -v -m "not integration" app/features/ops/ +cd frontend && pnpm test --run src/lib/incident-report.test.ts src/lib/ops-actions.test.ts +``` + +Pure-function cases that MUST exist (`test_service.py`): +```python +def test_classify_drift_unknown_when_under_two_numeric(): + assert classify_drift([None, 10.0]) == ("unknown", None) + +def test_classify_drift_degrading(): + d, delta = classify_drift([10.0, 10.0, 20.0]) # latest 20 vs baseline 10 + assert d == "degrading" + +def test_classify_drift_improving(): + d, _ = classify_drift([20.0, 20.0, 10.0]) + assert d == "improving" + +def test_classify_drift_stable_within_band(): + d, _ = classify_drift([10.0, 10.5]) # +5% < 10% band + assert d == "stable" + +def test_classify_drift_tolerates_none_gaps(): + assert classify_drift([None, 10.0, None, 12.0])[0] in {"stable", "degrading"} +``` + +### Level 4: Integration Tests + +```bash +docker compose up -d && uv run alembic upgrade head +uv run pytest -v -m integration app/features/ops/ +``` + +`/ops/model-health` → 200; entries cover seeded grains; `drift_direction` present; +empty DB → 200 (never 500); `?limit=0` and `?limit=200` → 422; degrading-first sort. + +### Level 5: Manual Validation + +```bash +uv run uvicorn app.main:app --reload --port 8123 & +curl -s "http://localhost:8123/ops/model-health?limit=5" | head -c 400 +curl -s -o /dev/null -w '%{http_code}\n' "http://localhost:8123/ops/model-health?limit=0" # 422 +# Frontend: seed (make demo), open http://localhost:5173/ops via the +# webapp-testing skill / agent-browser — verify the Model Health section, +# CSV + Markdown export downloads, bulk-retrain → new jobs on /explorer/jobs, +# promote → alias on the summary. Type-check passing ≠ UI works. +``` + +--- + +## Final Validation Checklist + +- [ ] `uv run ruff check . && uv run ruff format --check .` — clean +- [ ] `uv run mypy app/ && uv run pyright app/` — clean (`--strict`) +- [ ] `uv run pytest -v -m "not integration"` — green +- [ ] `docker compose up -d && uv run pytest -v -m integration` — green +- [ ] `cd frontend && pnpm tsc --noEmit && pnpm lint && pnpm test --run` — green +- [ ] `GET /ops/model-health` behaves per Success Criteria (sort, 422, empty-DB) +- [ ] `/ops` shows Model Health with drift badges; export downloads CSV + Markdown; + bulk-retrain creates jobs; promote creates an alias — dogfooded in a browser +- [ ] `ops` backend slice still read-only — no `models.py`, no migration, no + `POST /ops/*` +- [ ] No new dependency +- [ ] PR description flags: the `/ops` page is now an action launcher (calls + existing `POST /jobs` / `POST /registry/aliases`); WebSocket job updates + were assessed and deliberately declined (Decision #3) +- [ ] Commits use `feat(api)` (Phase A) / `feat(ui)` (Phases B, C) and reference + an open issue + +--- + +## Anti-Patterns to Avoid + +- ❌ Don't add a `POST /ops/*` mutating endpoint — Phase C is frontend-only over + existing endpoints; the `ops` slice stays read-only. +- ❌ Don't build the WebSocket — it was assessed and declined (Decision #3). +- ❌ Don't add data-drift / PSI infrastructure — performance drift only (Decision #2). +- ❌ Don't use DISTINCT ON for model-health — it needs the full per-grain history. +- ❌ Don't redefine `extract_wape` — reuse the one in `ops/service.py`. +- ❌ Don't `Promise.all()` the bulk retrains — `POST /jobs` runs synchronously; + go sequential. +- ❌ Don't change `score_retraining_candidate` — PRP-24's scoring is locked. +- ❌ Don't `pnpm add` anything — Recharts/Checkbox/AlertDialog/sonner are installed. +- ❌ Don't add `ConfigDict(strict=True)` to the new response models. +- ❌ Don't claim the UI works on a green type-check — dogfood it in a browser. + +## Workflow Notes + +- Open a GitHub issue first; branch `feat/ops-control-center-full` off `dev` + (`.claude/rules/branch-naming.md`) — **only after PR #218 (PRP-24) is merged**. +- The phases are independently shippable. Prefer **one PR per phase** (smaller + reviews) or a single phased PR — either way: `feat(api)` for Phase A, + `feat(ui)` for Phases B and C. +- The PR description MUST state (a) the `/ops` page becomes an action launcher + via existing endpoints, and (b) WebSocket job updates were deliberately + declined — per `.claude/rules/product-vision.md` § "When Ideas Don't Align". + +## Confidence Score + +**8 / 10** for one-pass implementation success. + +Rationale: Phase A is a near-exact mirror of PRP-24's verified `get_retraining_candidates` +pattern (the one structural difference — full history vs. DISTINCT ON — is called +out as a CRITICAL gotcha). Phase B is pure frontend over the existing, CSV-safe +`csv-export.ts`. Both score ~9/10. Phase C is the residual risk: it depends on the +exact `train`-job `params` contract, mitigated by the mandatory Task C0 research +step (read `jobs/service.py` + `forecasting/schemas.py` first) and by reusing the +existing `useCreateJob` / `useCreateAlias` hooks rather than inventing mutation +code. The biggest scope risks of the feature brief's "Full Version" — a WebSocket +streaming surface and turning `ops` into a backend mutation slice — are removed by +Decisions #1 and #3, keeping every phase aligned with the single-host, non-streaming +product vision. From 91731631d7027553b08c0edb0d126414531ab794 Mon Sep 17 00:00:00 2001 From: Gabor Szabo Date: Tue, 19 May 2026 06:52:41 +0200 Subject: [PATCH 2/2] feat(api,ui): add ops control center model health, export, and actions (#219) --- app/features/ops/__init__.py | 7 +- app/features/ops/routes.py | 51 ++- app/features/ops/schemas.py | 90 ++++ app/features/ops/service.py | 124 ++++++ app/features/ops/tests/conftest.py | 32 ++ .../ops/tests/test_routes_integration.py | 64 +++ app/features/ops/tests/test_schemas.py | 90 ++++ app/features/ops/tests/test_service.py | 57 ++- frontend/src/hooks/use-ops.ts | 19 +- frontend/src/lib/incident-report.test.ts | 138 +++++++ frontend/src/lib/incident-report.ts | 140 +++++++ frontend/src/lib/ops-actions.test.ts | 79 ++++ frontend/src/lib/ops-actions.ts | 43 ++ frontend/src/lib/ops-utils.test.ts | 25 ++ frontend/src/lib/ops-utils.ts | 31 +- frontend/src/pages/ops.tsx | 387 ++++++++++++++++-- frontend/src/types/api.ts | 33 ++ 17 files changed, 1372 insertions(+), 38 deletions(-) create mode 100644 frontend/src/lib/incident-report.test.ts create mode 100644 frontend/src/lib/incident-report.ts create mode 100644 frontend/src/lib/ops-actions.test.ts create mode 100644 frontend/src/lib/ops-actions.ts diff --git a/app/features/ops/__init__.py b/app/features/ops/__init__.py index 79f4d56f..2c534a50 100644 --- a/app/features/ops/__init__.py +++ b/app/features/ops/__init__.py @@ -7,10 +7,15 @@ """ from app.features.ops.routes import router -from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse +from app.features.ops.schemas import ( + ModelHealthResponse, + OpsSummaryResponse, + RetrainingCandidatesResponse, +) from app.features.ops.service import OpsService __all__ = [ + "ModelHealthResponse", "OpsService", "OpsSummaryResponse", "RetrainingCandidatesResponse", diff --git a/app/features/ops/routes.py b/app/features/ops/routes.py index a3f0abeb..fd7381b6 100644 --- a/app/features/ops/routes.py +++ b/app/features/ops/routes.py @@ -1,14 +1,19 @@ """API routes for the ForecastOps Control Center. -Two read-only aggregation endpoints backing the ``/ops`` Control Center page: -operational summary and the ranked retraining-candidate queue. +Three read-only aggregation endpoints backing the ``/ops`` Control Center page: +operational summary, the ranked retraining-candidate queue, and per-grain +forecast-error health with a drift verdict. """ from fastapi import APIRouter, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db -from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse +from app.features.ops.schemas import ( + ModelHealthResponse, + OpsSummaryResponse, + RetrainingCandidatesResponse, +) from app.features.ops.service import OpsService router = APIRouter(prefix="/ops", tags=["ops"]) @@ -81,3 +86,43 @@ async def get_retraining_candidates( Candidates sorted by priority score (highest first). """ return await OpsService().get_retraining_candidates(db, limit) + + +@router.get( + "/model-health", + response_model=ModelHealthResponse, + summary="Per-(store, product) forecast-error health and drift", + description=""" +Classify forecast-error **performance drift** for every `(store, product)` grain. + +For each grain the endpoint reads the **full** successful-run history, extracts +each run's WAPE, and compares the latest WAPE against the mean of the prior +WAPEs within a ±10% relative band — yielding a drift verdict +(`improving` / `stable` / `degrading` / `unknown`). + +Entries are sorted **degrading-first**, then by the magnitude of the WAPE +change, and capped at `limit`. Returns HTTP 200 even on an empty database. + +This is a performance-drift signal, not data drift — it needs no feature +snapshots and adds no new table or migration. +""", +) +async def get_model_health( + limit: int = Query( + default=20, + ge=1, + le=100, + description="Maximum number of grains to return (1-100, default 20).", + ), + db: AsyncSession = Depends(get_db), +) -> ModelHealthResponse: + """Return per-grain forecast-error health and drift. + + Args: + limit: Maximum number of grains to return. + db: Database session. + + Returns: + Grains sorted degrading-first, then by absolute WAPE change. + """ + return await OpsService().get_model_health(db, limit) diff --git a/app/features/ops/schemas.py b/app/features/ops/schemas.py index e58a9b78..02a8405c 100644 --- a/app/features/ops/schemas.py +++ b/app/features/ops/schemas.py @@ -244,3 +244,93 @@ class RetrainingCandidatesResponse(BaseModel): description="Total (store, product) grains evaluated before applying the limit.", ) generated_at: datetime = Field(..., description="When this queue was computed (UTC).") + + +# ============================================================================= +# Model health & drift +# ============================================================================= + + +# Forecast-error trend verdict for a (store, product) grain. +DriftDirection = Literal["improving", "stable", "degrading", "unknown"] + + +class WapePoint(BaseModel): + """One run's WAPE observation in a grain's chronological history.""" + + model_config = ConfigDict(from_attributes=True) + + run_id: str = Field(..., description="External run_id of the observed run.") + created_at: datetime = Field(..., description="When the run was created (UTC).") + wape: float | None = Field( + None, + description="WAPE of the run, when present in its metrics; null otherwise.", + ) + + +class ModelHealthEntry(BaseModel): + """Forecast-error health and drift verdict for one (store, product) grain.""" + + model_config = ConfigDict(from_attributes=True) + + store_id: int = Field(..., description="Store the grain covers.") + product_id: int = Field(..., description="Product the grain covers.") + run_count: int = Field( + ..., + ge=0, + description="Number of successful runs evaluated for this grain.", + ) + latest_run_id: str | None = Field( + None, + description="External run_id of the most recent successful run.", + ) + latest_run_status: str | None = Field( + None, + description="Status of that run (always 'success' for evaluated grains).", + ) + latest_wape: float | None = Field( + None, + description="Most recent numeric WAPE in the grain's history; null when none.", + ) + previous_wape: float | None = Field( + None, + description="The prior numeric WAPE, used as the drift baseline; null when none.", + ) + wape_delta: float | None = Field( + None, + description="latest_wape minus previous_wape; null when fewer than two numeric WAPEs.", + ) + drift_direction: DriftDirection = Field( + ..., + description="Forecast-error trend: improving / stable / degrading / unknown.", + ) + last_trained_at: datetime | None = Field( + None, + description="created_at of the latest successful run; null when none.", + ) + staleness_days: int = Field( + ..., + ge=0, + description="Days since the latest run's training-data window ended.", + ) + wape_history: list[WapePoint] = Field( + ..., + description="Chronological WAPE observations; may carry null gaps.", + ) + + +class ModelHealthResponse(BaseModel): + """Per-grain forecast-error health, degrading grains first.""" + + model_config = ConfigDict(from_attributes=True) + + entries: list[ModelHealthEntry] = Field( + ..., + description="Grains sorted degrading-first, then by |wape_delta| descending.", + ) + total_evaluated: int = Field( + ..., + ge=0, + description="Total grains evaluated before applying the limit.", + ) + generated_at: datetime = Field(..., description="When this report was computed (UTC).") diff --git a/app/features/ops/service.py b/app/features/ops/service.py index d9e8cd18..e88e2054 100644 --- a/app/features/ops/service.py +++ b/app/features/ops/service.py @@ -8,6 +8,7 @@ """ from datetime import UTC, datetime +from itertools import groupby from typing import Any from sqlalchemy import func, select, text @@ -20,13 +21,17 @@ AliasHealth, AttentionItem, DataFreshness, + DriftDirection, JobHealth, + ModelHealthEntry, + ModelHealthResponse, OpsSummaryResponse, RetrainingCandidate, RetrainingCandidatesResponse, RunHealth, StatusCount, SystemHealth, + WapePoint, ) from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus @@ -38,6 +43,8 @@ _WAPE_CAP = 100.0 # How many recent failed jobs / runs to surface in the attention list. _ATTENTION_LIMIT = 10 +# Relative WAPE-change band: forecast-error drift is only flagged outside ±10%. +_DRIFT_BAND = 0.10 # ============================================================================= @@ -91,6 +98,42 @@ def score_retraining_candidate(staleness_days: int, wape: float | None) -> float return round(0.6 * staleness_norm + 0.4 * error_norm, 4) +def classify_drift( + wape_history: list[float | None], +) -> tuple[DriftDirection, float | None]: + """Classify a grain's forecast-error (WAPE) trend. + + Pure and total: never raises, tolerates None gaps and sparse history. + Compares the latest numeric WAPE against the mean of all prior numeric + WAPEs, applying a ±10% relative band — the heuristic drift tolerance from + MLOps monitoring guidance (a universal threshold does not exist). + + Args: + wape_history: Chronological WAPE values; None marks a run with no WAPE. + + Returns: + A ``(direction, delta)`` tuple. ``direction`` is improving / stable / + degrading / unknown; ``delta`` is the latest numeric WAPE minus the + previous numeric WAPE, or None when fewer than two numeric values exist. + """ + numeric = [wape for wape in wape_history if wape is not None] + if len(numeric) < 2: + return "unknown", None + latest = numeric[-1] + prior = numeric[:-1] + baseline = sum(prior) / len(prior) + delta = round(latest - prior[-1], 4) + if baseline <= 0: + # Avoid div-by-zero on a zero baseline: any positive error is degrading. + return ("degrading" if latest > 0 else "stable"), delta + relative = (latest - baseline) / baseline + if relative > _DRIFT_BAND: + return "degrading", delta + if relative < -_DRIFT_BAND: + return "improving", delta + return "stable", delta + + def _alias_staleness( run: ModelRun, latest_success_by_grain: dict[tuple[int, int], ModelRun], @@ -417,3 +460,84 @@ async def get_retraining_candidates( total_evaluated=len(candidates), generated_at=datetime.now(UTC), ) + + async def get_model_health(self, db: AsyncSession, limit: int) -> ModelHealthResponse: + """Classify per-grain forecast-error drift from full run history. + + Unlike the retraining queue, this needs the *full* WAPE history per + grain (not just the latest run), so it queries every successful run + ordered by grain then creation time and groups in Python with + ``itertools.groupby`` — NOT ``DISTINCT ON``. + + Args: + db: Database session. + limit: Maximum grains to return (bounded 1..100 by the route). + + Returns: + Grains sorted degrading-first, then by ``|wape_delta|`` descending, + capped at ``limit``. Never raises on an empty database. + """ + today = datetime.now(UTC).date() + + # FULL history — NOT DISTINCT ON. Ordered by (store, product, created_at) + # so itertools.groupby batches each grain in chronological order. + success_runs = ( + ( + await db.execute( + select(ModelRun) + .where(ModelRun.status == RunStatus.SUCCESS.value) + .order_by( + ModelRun.store_id, + ModelRun.product_id, + ModelRun.created_at, + ) + ) + ) + .scalars() + .all() + ) + + entries: list[ModelHealthEntry] = [] + for (store_id, product_id), grain_iter in groupby( + success_runs, key=lambda run: (run.store_id, run.product_id) + ): + grain_runs = list(grain_iter) # already chronological + history = [ + WapePoint( + run_id=run.run_id, + created_at=run.created_at, + wape=extract_wape(run.metrics), + ) + for run in grain_runs + ] + direction, delta = classify_drift([point.wape for point in history]) + numeric = [point.wape for point in history if point.wape is not None] + latest_run = grain_runs[-1] + entries.append( + ModelHealthEntry( + store_id=store_id, + product_id=product_id, + run_count=len(grain_runs), + latest_run_id=latest_run.run_id, + latest_run_status=latest_run.status, + latest_wape=(numeric[-1] if numeric else None), + previous_wape=(numeric[-2] if len(numeric) > 1 else None), + wape_delta=delta, + drift_direction=direction, + last_trained_at=latest_run.created_at, + staleness_days=max((today - latest_run.data_window_end).days, 0), + wape_history=history, + ) + ) + + # Degrading grains first; within a tier, the largest WAPE move leads. + rank: dict[str, int] = {"degrading": 0, "improving": 1, "stable": 2, "unknown": 3} + entries.sort(key=lambda entry: (rank[entry.drift_direction], -abs(entry.wape_delta or 0.0))) + + logger.info("ops.model_health_computed", grains=len(entries)) + + return ModelHealthResponse( + entries=entries[:limit], + total_evaluated=len(entries), + generated_at=datetime.now(UTC), + ) diff --git a/app/features/ops/tests/conftest.py b/app/features/ops/tests/conftest.py index 6bbb40de..f3663f92 100644 --- a/app/features/ops/tests/conftest.py +++ b/app/features/ops/tests/conftest.py @@ -263,3 +263,35 @@ async def sample_sales(db_session: AsyncSession) -> list[SalesDaily]: for row in sales: await db_session.refresh(row) return sales + + +@pytest.fixture +async def sample_health_runs(db_session: AsyncSession) -> list[ModelRun]: + """Three successful runs for one grain forming a degrading WAPE history. + + Grain (9101, 8101): WAPE 10.0 -> 11.0 -> 25.0. Each run is committed in its + own transaction so its server-side ``created_at`` is strictly later than + the prior one — making the chronological history deterministic for the + model-health endpoint's ``itertools.groupby`` ordering. + """ + wapes = [10.0, 11.0, 25.0] + window_ends = [date(2026, 1, 1), date(2026, 2, 1), date(2026, 3, 1)] + runs: list[ModelRun] = [] + for wape, window_end in zip(wapes, window_ends, strict=True): + run = ModelRun( + run_id=f"test-{_short_id()}", + status=RunStatus.SUCCESS.value, + model_type="naive", + model_config={"_test": True}, + config_hash=_short_id()[:16], + data_window_start=date(2025, 1, 1), + data_window_end=window_end, + store_id=9101, + product_id=8101, + metrics={"wape": wape}, + ) + db_session.add(run) + await db_session.commit() + await db_session.refresh(run) + runs.append(run) + return runs diff --git a/app/features/ops/tests/test_routes_integration.py b/app/features/ops/tests/test_routes_integration.py index ec048d8c..e6b96ec1 100644 --- a/app/features/ops/tests/test_routes_integration.py +++ b/app/features/ops/tests/test_routes_integration.py @@ -16,6 +16,7 @@ _JOB_STATUSES = {s.value for s in JobStatus} _RUN_STATUSES = {s.value for s in RunStatus} +_DRIFT_RANK = {"degrading": 0, "improving": 1, "stable": 2, "unknown": 3} @pytest.mark.integration @@ -130,3 +131,66 @@ async def test_candidates_limit_too_high_rejected(self, client: AsyncClient) -> """limit=200 is above the le=100 bound and returns 422.""" response = await client.get("/ops/retraining-candidates", params={"limit": 200}) assert response.status_code == 422 + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestModelHealth: + """Integration tests for GET /ops/model-health.""" + + async def test_model_health_happy_path( + self, + client: AsyncClient, + sample_health_runs: list[ModelRun], + ) -> None: + """The seeded 3-run degrading grain surfaces with a drift verdict.""" + response = await client.get("/ops/model-health", params={"limit": 100}) + + assert response.status_code == 200 + data = response.json() + + entry = next( + e for e in data["entries"] if e["store_id"] == 9101 and e["product_id"] == 8101 + ) + assert entry["drift_direction"] == "degrading" + assert entry["run_count"] == 3 + assert entry["latest_wape"] == 25.0 + assert entry["previous_wape"] == 11.0 + assert entry["wape_delta"] == 14.0 + assert len(entry["wape_history"]) == 3 + assert data["total_evaluated"] >= 1 + + async def test_model_health_degrading_first_sort( + self, + client: AsyncClient, + sample_health_runs: list[ModelRun], + ) -> None: + """Entries are ordered degrading-first (drift rank non-decreasing).""" + response = await client.get("/ops/model-health", params={"limit": 100}) + + assert response.status_code == 200 + ranks = [_DRIFT_RANK[e["drift_direction"]] for e in response.json()["entries"]] + assert ranks == sorted(ranks), "entries must be sorted degrading-first" + + async def test_model_health_resilient_structural(self, client: AsyncClient) -> None: + """Without seeded fixtures the endpoint still returns 200, never 500.""" + response = await client.get("/ops/model-health") + + assert response.status_code == 200 + data = response.json() + assert isinstance(data["entries"], list) + assert data["total_evaluated"] >= 0 + for entry in data["entries"]: + assert entry["drift_direction"] in _DRIFT_RANK + assert entry["run_count"] >= 0 + assert entry["staleness_days"] >= 0 + + async def test_model_health_limit_zero_rejected(self, client: AsyncClient) -> None: + """limit=0 is below the ge=1 bound and returns 422.""" + response = await client.get("/ops/model-health", params={"limit": 0}) + assert response.status_code == 422 + + async def test_model_health_limit_too_high_rejected(self, client: AsyncClient) -> None: + """limit=200 is above the le=100 bound and returns 422.""" + response = await client.get("/ops/model-health", params={"limit": 200}) + assert response.status_code == 422 diff --git a/app/features/ops/tests/test_schemas.py b/app/features/ops/tests/test_schemas.py index 0a26ae97..4ee7314d 100644 --- a/app/features/ops/tests/test_schemas.py +++ b/app/features/ops/tests/test_schemas.py @@ -13,12 +13,15 @@ AttentionItem, DataFreshness, JobHealth, + ModelHealthEntry, + ModelHealthResponse, OpsSummaryResponse, RetrainingCandidate, RetrainingCandidatesResponse, RunHealth, StatusCount, SystemHealth, + WapePoint, ) _NOW = datetime(2026, 5, 19, 12, 0, tzinfo=UTC) @@ -180,3 +183,90 @@ def test_data_freshness_accepts_date() -> None: """latest_sales_date accepts a date value.""" freshness = DataFreshness(latest_sales_date=date(2026, 5, 1)) assert freshness.latest_sales_date == date(2026, 5, 1) + + +# ============================================================================= +# Model health & drift +# ============================================================================= + + +def test_wape_point_construct() -> None: + """WapePoint carries a run_id, timestamp, and an optional WAPE.""" + point = WapePoint(run_id="r1", created_at=_NOW, wape=14.0) + assert point.wape == 14.0 + null_point = WapePoint(run_id="r2", created_at=_NOW) + assert null_point.wape is None + + +def test_model_health_entry_construct() -> None: + """A valid ModelHealthEntry accepts a known drift direction and history.""" + entry = ModelHealthEntry( + store_id=1, + product_id=2, + run_count=3, + latest_run_id="r3", + latest_run_status="success", + latest_wape=25.0, + previous_wape=11.0, + wape_delta=14.0, + drift_direction="degrading", + last_trained_at=_NOW, + staleness_days=30, + wape_history=[WapePoint(run_id="r3", created_at=_NOW, wape=25.0)], + ) + assert entry.drift_direction == "degrading" + assert entry.wape_delta == 14.0 + + +def test_model_health_entry_rejects_negative_run_count() -> None: + """run_count violates the ge=0 constraint when negative.""" + with pytest.raises(ValidationError): + ModelHealthEntry( + store_id=1, + product_id=2, + run_count=-1, + drift_direction="unknown", + staleness_days=0, + wape_history=[], + ) + + +def test_model_health_entry_rejects_negative_staleness() -> None: + """staleness_days violates the ge=0 constraint when negative.""" + with pytest.raises(ValidationError): + ModelHealthEntry( + store_id=1, + product_id=2, + run_count=0, + drift_direction="unknown", + staleness_days=-1, + wape_history=[], + ) + + +def test_model_health_entry_rejects_unknown_drift_direction() -> None: + """drift_direction is constrained to the four known literals.""" + with pytest.raises(ValidationError): + ModelHealthEntry( + store_id=1, + product_id=2, + run_count=0, + drift_direction="exploding", # type: ignore[arg-type] + staleness_days=0, + wape_history=[], + ) + + +def test_model_health_response_construct() -> None: + """ModelHealthResponse wraps entries with a total and timestamp.""" + entry = ModelHealthEntry( + store_id=1, + product_id=2, + run_count=2, + drift_direction="stable", + staleness_days=5, + wape_history=[], + ) + response = ModelHealthResponse(entries=[entry], total_evaluated=1, generated_at=_NOW) + assert response.total_evaluated == 1 + assert response.entries[0].drift_direction == "stable" diff --git a/app/features/ops/tests/test_service.py b/app/features/ops/tests/test_service.py index 0589efaa..3c228660 100644 --- a/app/features/ops/tests/test_service.py +++ b/app/features/ops/tests/test_service.py @@ -4,7 +4,7 @@ functions with no I/O. """ -from app.features.ops.service import extract_wape, score_retraining_candidate +from app.features.ops.service import classify_drift, extract_wape, score_retraining_candidate # ============================================================================= # score_retraining_candidate @@ -78,3 +78,58 @@ def test_extract_wape_coerces_int_to_float() -> None: result = extract_wape({"wape": 25}) assert result == 25.0 assert isinstance(result, float) + + +# ============================================================================= +# classify_drift +# ============================================================================= + + +def test_classify_drift_unknown_when_empty() -> None: + """An empty history has no trend — direction is 'unknown', delta None.""" + assert classify_drift([]) == ("unknown", None) + + +def test_classify_drift_unknown_when_under_two_numeric() -> None: + """Fewer than two numeric WAPEs yields 'unknown' (None gaps don't count).""" + assert classify_drift([None, 10.0]) == ("unknown", None) + assert classify_drift([10.0]) == ("unknown", None) + + +def test_classify_drift_degrading() -> None: + """A latest WAPE far above the prior mean is 'degrading'; delta is positive.""" + direction, delta = classify_drift([10.0, 10.0, 20.0]) + assert direction == "degrading" + assert delta == 10.0 + + +def test_classify_drift_improving() -> None: + """A latest WAPE far below the prior mean is 'improving'; delta is negative.""" + direction, delta = classify_drift([20.0, 20.0, 10.0]) + assert direction == "improving" + assert delta == -10.0 + + +def test_classify_drift_stable_within_band() -> None: + """A change inside the ±10% relative band is 'stable'.""" + direction, delta = classify_drift([10.0, 10.5]) # +5% < 10% band + assert direction == "stable" + assert delta == 0.5 + + +def test_classify_drift_tolerates_none_gaps() -> None: + """None gaps are skipped; classification uses only numeric observations.""" + direction, delta = classify_drift([None, 10.0, None, 12.0]) # +20% over baseline 10 + assert direction == "degrading" + assert delta == 2.0 + + +def test_classify_drift_zero_baseline_guard() -> None: + """A zero baseline never divides by zero: positive error degrades, zero is stable.""" + assert classify_drift([0.0, 5.0])[0] == "degrading" + assert classify_drift([0.0, 0.0])[0] == "stable" + + +def test_classify_drift_never_raises_on_sparse_history() -> None: + """Sparse / all-None history degrades gracefully to 'unknown'.""" + assert classify_drift([None, None, None]) == ("unknown", None) diff --git a/frontend/src/hooks/use-ops.ts b/frontend/src/hooks/use-ops.ts index 74dd80dd..33141d64 100644 --- a/frontend/src/hooks/use-ops.ts +++ b/frontend/src/hooks/use-ops.ts @@ -1,6 +1,10 @@ import { useQuery } from '@tanstack/react-query' import { api } from '@/lib/api' -import type { OpsSummaryResponse, RetrainingCandidatesResponse } from '@/types/api' +import type { + ModelHealthResponse, + OpsSummaryResponse, + RetrainingCandidatesResponse, +} from '@/types/api' /** * Operational summary for the Control Center. Polled every 15s — job/run state @@ -28,3 +32,16 @@ export function useRetrainingCandidates(limit = 20, enabled = true) { enabled, }) } + +/** + * Per-(store, product) forecast-error health and drift. Deliberately NOT + * polled — drift is slow-moving and only changes when a new run lands, so + * refetch-on-mount is sufficient (mirrors useRetrainingCandidates). + */ +export function useModelHealth(limit = 20, enabled = true) { + return useQuery({ + queryKey: ['ops', 'model-health', limit], + queryFn: () => api('/ops/model-health', { params: { limit } }), + enabled, + }) +} diff --git a/frontend/src/lib/incident-report.test.ts b/frontend/src/lib/incident-report.test.ts new file mode 100644 index 00000000..7b80a66a --- /dev/null +++ b/frontend/src/lib/incident-report.test.ts @@ -0,0 +1,138 @@ +import { describe, it, expect } from 'vitest' +import { attentionCsvColumns, buildIncidentMarkdown } from './incident-report' +import type { + AttentionItem, + ModelHealthEntry, + OpsSummaryResponse, + RetrainingCandidate, +} from '@/types/api' + +/** Build an OpsSummaryResponse with sensible defaults for fields not under test. */ +function makeSummary(overrides: Partial = {}): OpsSummaryResponse { + return { + system: { + api_ok: true, + database_connected: true, + latest_successful_job_at: '2026-05-19T10:00:00Z', + }, + jobs: { counts: [], completed_today: 2, failed_total: 1, active_total: 3 }, + runs: { counts: [], success_rate: 0.8, failed_total: 1 }, + aliases: [], + freshness: { + latest_sales_date: '2026-05-18', + latest_job_completed_at: '2026-05-19T09:00:00Z', + latest_run_completed_at: '2026-05-19T08:00:00Z', + }, + attention_items: [], + generated_at: '2026-05-19T12:00:00Z', + ...overrides, + } +} + +/** Build an AttentionItem with sensible defaults. */ +function makeAttentionItem( + partial: Partial & Pick, +): AttentionItem { + return { + item_type: partial.item_type, + entity_id: partial.entity_id ?? 'e1', + label: partial.label ?? 'label', + detail: partial.detail ?? 'detail', + occurred_at: partial.occurred_at ?? null, + } +} + +/** Build a ModelHealthEntry with sensible defaults. */ +function makeHealthEntry(partial: Partial = {}): ModelHealthEntry { + return { + store_id: partial.store_id ?? 1, + product_id: partial.product_id ?? 2, + run_count: partial.run_count ?? 3, + latest_run_id: partial.latest_run_id ?? 'r1', + latest_run_status: partial.latest_run_status ?? 'success', + latest_wape: partial.latest_wape ?? 25, + previous_wape: partial.previous_wape ?? 11, + wape_delta: partial.wape_delta ?? 14, + drift_direction: partial.drift_direction ?? 'degrading', + last_trained_at: partial.last_trained_at ?? null, + staleness_days: partial.staleness_days ?? 10, + wape_history: partial.wape_history ?? [], + } +} + +/** Build a RetrainingCandidate with sensible defaults. */ +function makeCandidate(partial: Partial = {}): RetrainingCandidate { + return { + store_id: partial.store_id ?? 1, + product_id: partial.product_id ?? 2, + priority_score: partial.priority_score ?? 0.75, + staleness_days: partial.staleness_days ?? 30, + wape: partial.wape ?? 12, + latest_run_id: partial.latest_run_id ?? 'r1', + latest_run_status: partial.latest_run_status ?? 'success', + reason: partial.reason ?? 'reason', + } +} + +describe('attentionCsvColumns', () => { + it('exposes the five attention-item columns in order', () => { + expect(attentionCsvColumns.map((column) => column.key)).toEqual([ + 'item_type', + 'entity_id', + 'label', + 'detail', + 'occurred_at', + ]) + }) +}) + +describe('buildIncidentMarkdown', () => { + it('renders the report title and generated timestamp', () => { + const md = buildIncidentMarkdown(makeSummary(), [], []) + expect(md).toContain('# ForecastOps Incident Report') + expect(md).toContain('_Generated 2026-05-19T12:00:00Z_') + }) + + it('renders KPI lines from the summary', () => { + const md = buildIncidentMarkdown(makeSummary(), [], []) + expect(md).toContain('- Active jobs: 3') + expect(md).toContain('- Run success rate: 80.0%') + }) + + it('shows the empty-state line when nothing needs attention', () => { + const md = buildIncidentMarkdown(makeSummary(), [], []) + expect(md).toContain('## Needs Attention (0)') + expect(md).toContain('_Nothing needs attention._') + }) + + it('renders an attention table row and escapes pipe characters', () => { + const summary = makeSummary({ + attention_items: [ + makeAttentionItem({ item_type: 'failed_job', label: 'train', detail: 'a | b' }), + ], + }) + const md = buildIncidentMarkdown(summary, [], []) + expect(md).toContain('## Needs Attention (1)') + expect(md).toContain('a \\| b') + }) + + it('renders the model-health drift section with a signed delta', () => { + const md = buildIncidentMarkdown(makeSummary(), [], [ + makeHealthEntry({ drift_direction: 'degrading', wape_delta: 14 }), + ]) + expect(md).toContain('## Model Health — Drift (1)') + expect(md).toContain('degrading') + expect(md).toContain('+14.0') + }) + + it('renders the retraining-candidates section', () => { + const md = buildIncidentMarkdown(makeSummary(), [makeCandidate({ priority_score: 0.91 })], []) + expect(md).toContain('## Top Retraining Candidates (1)') + expect(md).toContain('0.91') + }) + + it('handles a null success rate', () => { + const summary = makeSummary({ runs: { counts: [], success_rate: null, failed_total: 0 } }) + expect(buildIncidentMarkdown(summary, [], [])).toContain('- Run success rate: —') + }) +}) diff --git a/frontend/src/lib/incident-report.ts b/frontend/src/lib/incident-report.ts new file mode 100644 index 00000000..d04e6783 --- /dev/null +++ b/frontend/src/lib/incident-report.ts @@ -0,0 +1,140 @@ +// Builders for the ForecastOps incident report — a client-side CSV + Markdown +// export of the operational snapshot already loaded on the /ops page. The +// builders take no I/O and are unit-tested (incident-report.test.ts); +// downloadMarkdown is the one DOM-touching helper (mirrors csv-export.ts). +import type { CsvColumn } from '@/lib/csv-export' +import { formatWapeDelta } from '@/lib/ops-utils' +import type { + AttentionItem, + ModelHealthEntry, + OpsSummaryResponse, + RetrainingCandidate, +} from '@/types/api' + +/** CSV column set for the attention-items export (feed to toCsv / downloadCsv). */ +export const attentionCsvColumns: CsvColumn[] = [ + { key: 'item_type', header: 'Type' }, + { key: 'entity_id', header: 'Entity' }, + { key: 'label', header: 'Item' }, + { key: 'detail', header: 'Detail' }, + { key: 'occurred_at', header: 'When' }, +] + +/** Render a value for a Markdown table cell: '—' for empty, pipes/newlines neutralised. */ +function mdCell(value: string | number | null | undefined): string { + if (value === null || value === undefined || value === '') return '—' + return String(value) + .replace(/\|/g, '\\|') + .replace(/[\r\n]+/g, ' ') +} + +/** Percentage display for the run success rate; '—' when null. */ +function pct(rate: number | null): string { + return rate === null ? '—' : `${(rate * 100).toFixed(1)}%` +} + +/** One-decimal WAPE display; '—' when null. */ +function wape(value: number | null): string { + return value === null ? '—' : value.toFixed(1) +} + +/** + * Build a human-readable Markdown incident report from already-loaded /ops + * page data. Pure: no fetch, no DOM, deterministic for a given input — the + * timestamps are emitted verbatim so the output is stable for unit tests. + */ +export function buildIncidentMarkdown( + summary: OpsSummaryResponse, + candidates: RetrainingCandidate[], + modelHealth: ModelHealthEntry[], +): string { + const staleAliases = summary.aliases.filter((alias) => alias.is_stale).length + const lines: string[] = [ + '# ForecastOps Incident Report', + '', + `_Generated ${summary.generated_at}_`, + '', + '## System Health', + '', + `- API: ${summary.system.api_ok ? 'ok' : 'down'}`, + `- Database: ${summary.system.database_connected ? 'connected' : 'down'}`, + `- Latest successful job: ${summary.system.latest_successful_job_at ?? '—'}`, + '', + '## KPIs', + '', + `- Active jobs: ${summary.jobs.active_total}`, + `- Failed jobs: ${summary.jobs.failed_total}`, + `- Completed today: ${summary.jobs.completed_today}`, + `- Run success rate: ${pct(summary.runs.success_rate)}`, + `- Failed runs: ${summary.runs.failed_total}`, + `- Stale aliases: ${staleAliases} of ${summary.aliases.length}`, + '', + '## Data Freshness', + '', + `- Latest sales date: ${summary.freshness.latest_sales_date ?? '—'}`, + `- Latest completed job: ${summary.freshness.latest_job_completed_at ?? '—'}`, + `- Latest successful run: ${summary.freshness.latest_run_completed_at ?? '—'}`, + '', + `## Needs Attention (${summary.attention_items.length})`, + '', + ] + + if (summary.attention_items.length === 0) { + lines.push('_Nothing needs attention._', '') + } else { + lines.push('| Type | Item | Detail | When |', '| --- | --- | --- | --- |') + for (const item of summary.attention_items) { + lines.push( + `| ${mdCell(item.item_type)} | ${mdCell(item.label)} | ${mdCell(item.detail)} | ${mdCell(item.occurred_at)} |`, + ) + } + lines.push('') + } + + lines.push(`## Model Health — Drift (${modelHealth.length})`, '') + if (modelHealth.length === 0) { + lines.push('_No model health to evaluate._', '') + } else { + lines.push( + '| Store | Product | Drift | Latest WAPE | Δ WAPE | Runs |', + '| --- | --- | --- | --- | --- | --- |', + ) + for (const entry of modelHealth) { + lines.push( + `| ${mdCell(entry.store_id)} | ${mdCell(entry.product_id)} | ${mdCell(entry.drift_direction)} | ${wape(entry.latest_wape)} | ${formatWapeDelta(entry.wape_delta)} | ${mdCell(entry.run_count)} |`, + ) + } + lines.push('') + } + + lines.push(`## Top Retraining Candidates (${candidates.length})`, '') + if (candidates.length === 0) { + lines.push('_No retraining candidates._', '') + } else { + lines.push( + '| Store | Product | Priority | Staleness (days) | WAPE | Reason |', + '| --- | --- | --- | --- | --- | --- |', + ) + for (const candidate of candidates) { + lines.push( + `| ${mdCell(candidate.store_id)} | ${mdCell(candidate.product_id)} | ${candidate.priority_score.toFixed(2)} | ${mdCell(candidate.staleness_days)} | ${wape(candidate.wape)} | ${mdCell(candidate.reason)} |`, + ) + } + lines.push('') + } + + return lines.join('\n') +} + +/** Trigger a browser download of `content` as a Markdown file. */ +export function downloadMarkdown(filename: string, content: string): void { + const blob = new Blob([content], { type: 'text/markdown;charset=utf-8;' }) + const url = URL.createObjectURL(blob) + const link = document.createElement('a') + link.href = url + link.download = filename + document.body.appendChild(link) + link.click() + document.body.removeChild(link) + URL.revokeObjectURL(url) +} diff --git a/frontend/src/lib/ops-actions.test.ts b/frontend/src/lib/ops-actions.test.ts new file mode 100644 index 00000000..a5bc2dcb --- /dev/null +++ b/frontend/src/lib/ops-actions.test.ts @@ -0,0 +1,79 @@ +import { describe, it, expect } from 'vitest' +import { buildRetrainJob } from './ops-actions' +import type { ModelRun } from '@/types/api' + +/** Build a ModelRun with sensible defaults for fields not under test. */ +function makeRun(overrides: Partial = {}): ModelRun { + return { + run_id: 'r1', + status: 'success', + model_type: 'naive', + model_config: { model_type: 'naive', schema_version: '1.0' }, + feature_config: null, + config_hash: 'abc', + data_window_start: '2025-01-01', + data_window_end: '2026-01-01', + store_id: 9, + product_id: 8, + metrics: null, + artifact_uri: null, + artifact_hash: null, + artifact_size_bytes: null, + runtime_info: null, + agent_context: null, + git_sha: null, + error_message: null, + started_at: null, + completed_at: null, + created_at: '2026-01-01T00:00:00Z', + updated_at: '2026-01-01T00:00:00Z', + ...overrides, + } +} + +describe('buildRetrainJob', () => { + it('builds a train job with the flat param contract', () => { + const job = buildRetrainJob(makeRun(), '2026-05-18') + expect(job.job_type).toBe('train') + expect(job.params).toEqual({ + model_type: 'naive', + store_id: 9, + product_id: 8, + start_date: '2025-01-01', + end_date: '2026-05-18', + }) + }) + + it('falls back to the run window end when no latest sales date is known', () => { + expect(buildRetrainJob(makeRun(), null).params.end_date).toBe('2026-01-01') + }) + + it('lifts season_length for a seasonal_naive run', () => { + const run = makeRun({ + model_type: 'seasonal_naive', + model_config: { model_type: 'seasonal_naive', schema_version: '1.0', season_length: 7 }, + }) + expect(buildRetrainJob(run, null).params.season_length).toBe(7) + }) + + it('lifts window_size for a moving_average run', () => { + const run = makeRun({ + model_type: 'moving_average', + model_config: { model_type: 'moving_average', schema_version: '1.0', window_size: 14 }, + }) + expect(buildRetrainJob(run, null).params.window_size).toBe(14) + }) + + it('omits model-specific keys when model_config carries none', () => { + const job = buildRetrainJob(makeRun(), null) + expect(job.params).not.toHaveProperty('season_length') + expect(job.params).not.toHaveProperty('window_size') + }) + + it('ignores non-numeric model_config values', () => { + const run = makeRun({ + model_config: { model_type: 'seasonal_naive', season_length: 'weekly' }, + }) + expect(buildRetrainJob(run, null).params).not.toHaveProperty('season_length') + }) +}) diff --git a/frontend/src/lib/ops-actions.ts b/frontend/src/lib/ops-actions.ts new file mode 100644 index 00000000..54723a99 --- /dev/null +++ b/frontend/src/lib/ops-actions.ts @@ -0,0 +1,43 @@ +// Pure builder for the ForecastOps action layer — turns a source model run +// into the POST /jobs body that retrains its (store, product) grain. Kept +// React-free and unit-tested (ops-actions.test.ts). +import type { JobCreate, ModelRun } from '@/types/api' + +/** Read a numeric field from a run's model_config JSONB, or null when absent. */ +function numericConfig(config: Record, key: string): number | null { + const value = config[key] + return typeof value === 'number' ? value : null +} + +/** + * Build the `POST /jobs` body that retrains a grain from its source run. + * + * The train job consumes a FLAT params dict — verified against + * `app/features/jobs/service.py::_execute_train`: `model_type`, `store_id`, + * `product_id`, `start_date`, `end_date`, plus the model-specific + * `season_length` (seasonal_naive) / `window_size` (moving_average) lifted + * from the source run's `model_config`. There is no `period` key. The + * end date is advanced to the freshest available sales date so the retrain + * sees every observation since the original training window. + * + * @param run - The source model run (GET /registry/runs/{latest_run_id}). + * @param latestSalesDate - summary.freshness.latest_sales_date, or null. + */ +export function buildRetrainJob(run: ModelRun, latestSalesDate: string | null): JobCreate { + const params: Record = { + model_type: run.model_type, + store_id: run.store_id, + product_id: run.product_id, + start_date: run.data_window_start, + end_date: latestSalesDate ?? run.data_window_end, + } + const seasonLength = numericConfig(run.model_config, 'season_length') + if (seasonLength !== null) { + params.season_length = seasonLength + } + const windowSize = numericConfig(run.model_config, 'window_size') + if (windowSize !== null) { + params.window_size = windowSize + } + return { job_type: 'train', params } +} diff --git a/frontend/src/lib/ops-utils.test.ts b/frontend/src/lib/ops-utils.test.ts index 5e156396..a3a49c60 100644 --- a/frontend/src/lib/ops-utils.test.ts +++ b/frontend/src/lib/ops-utils.test.ts @@ -2,7 +2,9 @@ import { describe, it, expect } from 'vitest' import { attentionBadgeVariant, attentionItemLink, + driftBadgeVariant, formatStaleness, + formatWapeDelta, sortRetrainingCandidates, summaryHealthVariant, } from './ops-utils' @@ -117,3 +119,26 @@ describe('sortRetrainingCandidates', () => { expect(sortRetrainingCandidates([])).toEqual([]) }) }) + +describe('driftBadgeVariant', () => { + it('maps each drift direction to its badge variant', () => { + expect(driftBadgeVariant('degrading')).toBe('error') + expect(driftBadgeVariant('improving')).toBe('success') + expect(driftBadgeVariant('stable')).toBe('info') + expect(driftBadgeVariant('unknown')).toBe('default') + }) +}) + +describe('formatWapeDelta', () => { + it('prefixes a positive delta with +', () => { + expect(formatWapeDelta(14)).toBe('+14.0') + }) + + it('keeps a negative delta sign', () => { + expect(formatWapeDelta(-9.3)).toBe('-9.3') + }) + + it('renders an em dash for a null delta', () => { + expect(formatWapeDelta(null)).toBe('—') + }) +}) diff --git a/frontend/src/lib/ops-utils.ts b/frontend/src/lib/ops-utils.ts index 1d0d42eb..20f48eaa 100644 --- a/frontend/src/lib/ops-utils.ts +++ b/frontend/src/lib/ops-utils.ts @@ -2,7 +2,7 @@ // separate from the page component so they are cheap to unit-test (see // ops-utils.test.ts) — mirrors the knowledge-utils.ts / status-utils.ts precedent. import { ROUTES } from '@/lib/constants' -import type { AttentionItem, RetrainingCandidate, SystemHealth } from '@/types/api' +import type { AttentionItem, DriftDirection, RetrainingCandidate, SystemHealth } from '@/types/api' /** * System-health badge variant: 'success' only when the API and database are @@ -49,3 +49,32 @@ export function formatStaleness(days: number): string { export function sortRetrainingCandidates(rows: RetrainingCandidate[]): RetrainingCandidate[] { return [...rows].sort((a, b) => b.priority_score - a.priority_score) } + +/** + * Badge variant for a drift verdict — 'degrading' is an error, 'improving' a + * success, 'stable' an info, and 'unknown' a neutral default. + */ +export function driftBadgeVariant( + direction: DriftDirection, +): 'success' | 'error' | 'info' | 'default' { + switch (direction) { + case 'degrading': + return 'error' + case 'improving': + return 'success' + case 'stable': + return 'info' + default: + return 'default' + } +} + +/** + * Signed, one-decimal WAPE delta for display ("+14.0" / "-9.3"); '—' when the + * grain has fewer than two numeric WAPEs (delta is null). + */ +export function formatWapeDelta(delta: number | null): string { + if (delta === null) return '—' + const sign = delta > 0 ? '+' : '' + return `${sign}${delta.toFixed(1)}` +} diff --git a/frontend/src/pages/ops.tsx b/frontend/src/pages/ops.tsx index aa1b0180..04a5ba1a 100644 --- a/frontend/src/pages/ops.tsx +++ b/frontend/src/pages/ops.tsx @@ -1,11 +1,17 @@ +import { useState } from 'react' import { useNavigate, Link } from 'react-router-dom' -import { Activity, AlertTriangle, CheckCircle2, Clock } from 'lucide-react' -import { useOpsSummary, useRetrainingCandidates } from '@/hooks/use-ops' +import { Activity, AlertTriangle, CheckCircle2, Clock, Download, RefreshCw } from 'lucide-react' +import { toast } from 'sonner' +import { useModelHealth, useOpsSummary, useRetrainingCandidates } from '@/hooks/use-ops' import { useProviderHealth } from '@/hooks/use-config' +import { useCreateJob } from '@/hooks/use-jobs' +import { useCreateAlias } from '@/hooks/use-runs' import { attentionBadgeVariant, attentionItemLink, + driftBadgeVariant, formatStaleness, + formatWapeDelta, sortRetrainingCandidates, summaryHealthVariant, } from '@/lib/ops-utils' @@ -23,8 +29,38 @@ import { TableHeader, TableRow, } from '@/components/ui/table' -import { formatPercent } from '@/lib/api' +import { Button } from '@/components/ui/button' +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from '@/components/ui/dropdown-menu' +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from '@/components/ui/alert-dialog' +import { Checkbox } from '@/components/ui/checkbox' +import { Input } from '@/components/ui/input' +import { downloadCsv, toCsv } from '@/lib/csv-export' +import { attentionCsvColumns, buildIncidentMarkdown, downloadMarkdown } from '@/lib/incident-report' +import { buildRetrainJob } from '@/lib/ops-actions' +import { api, formatPercent, getErrorMessage } from '@/lib/api' import { ROUTES } from '@/lib/constants' +import type { ModelRun } from '@/types/api' + +/** The run + grain a "Promote to alias" dialog is currently targeting. */ +interface PromoteTarget { + runId: string + storeId: number + productId: number +} /** Format an ISO timestamp / date string for display; '—' when null. */ function formatWhen(value: string | null): string { @@ -60,7 +96,15 @@ export default function OpsPage() { const navigate = useNavigate() const summaryQuery = useOpsSummary() const candidatesQuery = useRetrainingCandidates() + const modelHealthQuery = useModelHealth() const providerQuery = useProviderHealth() + const createJob = useCreateJob() + const createAlias = useCreateAlias() + const [selected, setSelected] = useState>(new Set()) + const [retrainConfirmOpen, setRetrainConfirmOpen] = useState(false) + const [actionBusy, setActionBusy] = useState(false) + const [promoteTarget, setPromoteTarget] = useState(null) + const [aliasName, setAliasName] = useState('') if (summaryQuery.error) { return ( @@ -86,15 +130,120 @@ export default function OpsPage() { const totalRuns = summary.runs.counts.reduce((sum, c) => sum + c.count, 0) const staleAliases = summary.aliases.filter((a) => a.is_stale).length const candidates = sortRetrainingCandidates(candidatesQuery.data?.candidates ?? []) + const modelHealthEntries = modelHealthQuery.data?.entries ?? [] + + /** Download the needs-attention list as a CSV, built client-side. */ + function handleExportCsv() { + downloadCsv('ops-attention-items.csv', toCsv(summary.attention_items, attentionCsvColumns)) + } + + /** Download the full operational snapshot as a Markdown incident report. */ + function handleExportMarkdown() { + downloadMarkdown( + 'ops-incident-report.md', + buildIncidentMarkdown(summary, candidates, modelHealthEntries), + ) + } + + /** Stable selection key for a (store, product) grain. */ + const grainKey = (storeId: number, productId: number) => `${storeId}-${productId}` + + /** Toggle one grain in the bulk-retrain selection set. */ + function toggleSelected(key: string) { + setSelected((prev) => { + const next = new Set(prev) + if (next.has(key)) { + next.delete(key) + } else { + next.add(key) + } + return next + }) + } + + // Selected candidates that carry a source run — the bulk-retrain work list. + const selectedCandidates = candidates.filter( + (candidate) => + selected.has(grainKey(candidate.store_id, candidate.product_id)) && + candidate.latest_run_id !== null, + ) + + /** + * Bulk-retrain every selected grain. POST /jobs runs synchronously + * server-side, so jobs are fired SEQUENTIALLY (never Promise.all) with a + * per-item toast; the action layer reuses the existing /jobs endpoint. + */ + async function runBulkRetrain() { + setRetrainConfirmOpen(false) + setActionBusy(true) + let succeeded = 0 + let failed = 0 + for (const candidate of selectedCandidates) { + const runId = candidate.latest_run_id + if (runId === null) continue + const where = `store ${candidate.store_id} / product ${candidate.product_id}` + try { + const run = await api(`/registry/runs/${runId}`) + await createJob.mutateAsync(buildRetrainJob(run, summary.freshness.latest_sales_date)) + succeeded += 1 + toast.success(`Retrain queued — ${where}`) + } catch (error) { + failed += 1 + toast.error(`Retrain failed — ${where}: ${getErrorMessage(error)}`) + } + } + setSelected(new Set()) + setActionBusy(false) + toast.message(`Bulk retrain complete — ${succeeded} queued, ${failed} failed`) + } + + /** Open the promote-to-alias dialog for a grain's latest successful run. */ + function openPromote(runId: string | null, storeId: number, productId: number) { + if (runId === null) return + setAliasName('') + setPromoteTarget({ runId, storeId, productId }) + } + + /** Promote the targeted run to a deployment alias via POST /registry/aliases. */ + async function runPromote() { + if (promoteTarget === null) return + const target = promoteTarget + const name = aliasName.trim() + setActionBusy(true) + try { + await createAlias.mutateAsync({ alias_name: name, run_id: target.runId }) + toast.success(`Promoted run to alias '${name}'`) + } catch (error) { + toast.error(`Promote failed: ${getErrorMessage(error)}`) + } + setActionBusy(false) + setPromoteTarget(null) + } return (
-
-

Control Center

-

- One operational view across jobs, model runs, deployment aliases, and data - freshness — surfacing what needs attention before it affects decisions. -

+
+
+

Control Center

+

+ One operational view across jobs, model runs, deployment aliases, and data + freshness — surfacing what needs attention before it affects decisions. +

+
+ + + + + + CSV — attention items + + Markdown — full report + + +
{totalJobs === 0 && totalRuns === 0 ? ( @@ -268,21 +417,21 @@ export default function OpsPage() { - {/* Section 5 — Retraining Queue */} + {/* Section 5 — Model Health */} - Retraining Queue + Model Health - Store / product pairs ranked by a retraining-priority score that blends - staleness with forecast error (WAPE). + Forecast-error (WAPE) drift per store / product, classified from each grain's + successful-run history. Degrading grains are listed first. - {candidatesQuery.isLoading ? ( - - ) : candidates.length === 0 ? ( + {modelHealthQuery.isLoading ? ( + + ) : modelHealthEntries.length === 0 ? (

- No retraining candidates — no successful model runs to evaluate yet. + No model health to evaluate — no successful model runs yet.

) : ( @@ -290,28 +439,43 @@ export default function OpsPage() { Store Product - Priority - Staleness - WAPE - Reason + Drift + Latest WAPE + Δ WAPE + Runs + Action - {candidates.map((c) => ( - - {c.store_id} - {c.product_id} - - {c.priority_score.toFixed(2)} + {modelHealthEntries.map((entry) => ( + + {entry.store_id} + {entry.product_id} + + + {entry.drift_direction} + - {formatStaleness(c.staleness_days)} + {entry.latest_wape === null ? '—' : entry.latest_wape.toFixed(1)} - {c.wape === null ? '—' : c.wape.toFixed(1)} + {formatWapeDelta(entry.wape_delta)} - - {c.reason} + + {entry.run_count} + + + ))} @@ -320,8 +484,169 @@ export default function OpsPage() { )} + + {/* Section 6 — Retraining Queue */} + + +
+
+ Retraining Queue + + Store / product pairs ranked by a retraining-priority score that blends + staleness with forecast error (WAPE). Select rows to retrain in bulk. + +
+ +
+
+ + {candidatesQuery.isLoading ? ( + + ) : candidates.length === 0 ? ( +

+ No retraining candidates — no successful model runs to evaluate yet. +

+ ) : ( +
+ + + + Select + + Store + Product + Priority + Staleness + WAPE + Reason + Action + + + + {candidates.map((candidate) => { + const key = grainKey(candidate.store_id, candidate.product_id) + return ( + + + toggleSelected(key)} + aria-label={`Select store ${candidate.store_id} product ${candidate.product_id}`} + /> + + + {candidate.store_id} + + + {candidate.product_id} + + + {candidate.priority_score.toFixed(2)} + + + {formatStaleness(candidate.staleness_days)} + + + {candidate.wape === null ? '—' : candidate.wape.toFixed(1)} + + + {candidate.reason} + + + + + + ) + })} + +
+ )} +
+
)} + + {/* Confirm gate — bulk retrain of the selected grains. */} + + + + + Retrain {selectedCandidates.length} grain + {selectedCandidates.length === 1 ? '' : 's'}? + + + This creates one training job per selected store / product via the existing + POST /jobs endpoint. Jobs run sequentially and each may take a moment; the + outcome of every job is reported individually. + + + + Cancel + void runBulkRetrain()}>Retrain + + + + + {/* Confirm gate — promote a run to a deployment alias. */} + { + if (!open) setPromoteTarget(null) + }} + > + + + Promote to alias + + {promoteTarget + ? `Point a deployment alias at the latest successful run for store ${promoteTarget.storeId} / product ${promoteTarget.productId}. An existing alias of the same name is repointed.` + : ''} + + +
+ + setAliasName(event.target.value)} + placeholder="e.g. production" + autoComplete="off" + /> +
+ + Cancel + void runPromote()} + > + Promote + + +
+
) } diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index d1179291..b9276cde 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -669,3 +669,36 @@ export interface RetrainingCandidatesResponse { total_evaluated: number generated_at: string } + +// Forecast-error trend verdict for a (store, product) grain. +export type DriftDirection = 'improving' | 'stable' | 'degrading' | 'unknown' + +// One run's WAPE observation in a grain's chronological history. +export interface WapePoint { + run_id: string + created_at: string + wape: number | null +} + +// Forecast-error health and drift verdict for one (store, product) grain. +export interface ModelHealthEntry { + store_id: number + product_id: number + run_count: number + latest_run_id: string | null + latest_run_status: string | null + latest_wape: number | null + previous_wape: number | null + wape_delta: number | null + drift_direction: DriftDirection + last_trained_at: string | null + staleness_days: number + wape_history: WapePoint[] +} + +// Per-grain forecast-error health — GET /ops/model-health. +export interface ModelHealthResponse { + entries: ModelHealthEntry[] + total_evaluated: number + generated_at: string +}