diff --git a/PRPs/PRP-24-forecastops-control-center.md b/PRPs/PRP-24-forecastops-control-center.md new file mode 100644 index 00000000..123b09e5 --- /dev/null +++ b/PRPs/PRP-24-forecastops-control-center.md @@ -0,0 +1,924 @@ +name: "PRP-24 — ForecastOps Control Center" +description: | + Context-rich PRP for a new read-only `ops` backend slice + an `/ops` frontend + Control Center page that aggregates operational state (jobs, runs, aliases, + data freshness) and ranks retraining candidates. One-pass implementation target. + +## Purpose + +Add an operator-facing dashboard that connects ForecastLabAI's isolated Explorer/Visualize +pages into one workflow. A new read-only vertical slice `app/features/ops/` exposes two +server-side aggregation endpoints; a new `frontend/src/pages/ops.tsx` page consumes them. + +--- + +## Goal + +Ship a fully working ForecastOps Control Center: + +- **Backend** — a new read-only slice `app/features/ops/` with two endpoints: + - `GET /ops/summary` — system health, job-status counts, run/alias health, data + freshness, and a "needs attention" list (failed jobs/runs + stale aliases). + - `GET /ops/retraining-candidates?limit=` — a `(store, product)` queue ranked by a + deterministic retraining-priority score. +- **Frontend** — a new `/ops` page wired into the top nav, consuming both endpoints, + reusing existing `KPICard` / `StatusBadge` / `Card` / `Table` / loading-error-empty + components, with attention items linking to existing Explorer detail pages. + +End state: `docker compose up` → seed → open `/ops` → operator sees, at a glance, what +needs attention. No new tables, no Alembic migration, no new external dependency. + +## Why + +- **User value** — operators can answer "which forecasts need attention?" without + cross-referencing four CRUD pages. Failed jobs and stale models become visible before + they affect decisions. Retraining candidates are ranked by recency + error. +- **Demo value** — reviewers see a mature ForecastOps story instead of isolated CRUD pages. +- **Integration** — this is the natural layer above the existing `jobs`, `registry`, + `backtesting`, and `analytics` slices and the Explorer pages. It reads their state; it + does not duplicate it. +- **Source docs** — `docs/optional-features/02-forecastops-control-center.md` (feature + brief) and `.agents/plans/forecastops-control-center.md` (the 17-task implementation + plan this PRP is derived from). + +## What + +### User-visible behavior + +A new **Control Center** nav item opens `/ops`, a dense single page with: + +1. **System Health** card — API up, database connected, embedding-provider reachability, + timestamp of the latest successful job. +2. **KPI row** — Active Jobs, Failed Jobs, Run Success Rate, Stale Aliases. +3. **Data Freshness** card — latest sales date, latest completed job, latest successful run. +4. **Needs Attention** table — recent failed jobs, failed runs, and stale aliases; each row + links to the matching Explorer detail page. +5. **Retraining Queue** table — `(store, product)` pairs ranked by priority score, showing + staleness, WAPE, and a human-readable reason. + +The page polls every 15 s, shows loading/error/empty states, and degrades gracefully when +fields are null (no sales yet, metrics missing, etc.). + +### Technical requirements + +- New vertical slice `app/features/ops/` with `__init__.py`, `schemas.py`, `service.py`, + `routes.py`, `tests/`. **No `models.py`, no migration** (read-only — mirrors `analytics`). +- Server-side SQL aggregation (`COUNT … GROUP BY`, `DISTINCT ON`) — never fetch lists and + count in Python. +- RFC 7807 errors, Pydantic v2 response models, SQLAlchemy 2.0 async, `mypy --strict` + + `pyright --strict` clean. +- Frontend: new page + hook module + pure util module (+ vitest tests) + route + nav item + + API response types. + +### Success Criteria + +- [ ] `GET /ops/summary` → 200 with `system`, `jobs`, `runs`, `aliases`, `freshness`, + `attention_items`, `generated_at`. +- [ ] `GET /ops/retraining-candidates` → 200, candidates sorted by `priority_score` desc, + honoring `limit`; 422 when `limit` is outside `[1, 100]`. +- [ ] `GET /ops/summary` → 200 (never 500) when the database has no jobs/runs/aliases. +- [ ] `/ops` page renders all five sections, appears in the top nav, and attention items + link to the correct Explorer detail routes. +- [ ] Backend reads sibling slices via **ORM models only** (no `service.py`/`schemas.py` + cross-slice imports); the vertical-slice tension is called out in the PR description. +- [ ] All validation gates pass: `ruff`, `mypy --strict`, `pyright --strict`, `pytest` + (unit + integration), frontend `tsc` + `lint` + `test`. +- [ ] No new external dependency, no new table, no Alembic migration. + +--- + +## All Needed Context + +### DECISIONS LOCKED (resolved during planning — do NOT re-litigate) + +1. **Backend approach** — a real `app/features/ops/` slice that **imports the ORM models** + of sibling slices (`Job`, `ModelRun`, `DeploymentAlias`, `SalesDaily`) for server-side + SQL aggregation. This is a deliberate, accepted tension with the *"a slice may NOT import + from another slice"* rule (`AGENTS.md` § Architecture). `data_platform` ORM is already a + sanctioned cross-slice import (`analytics` uses it); importing `jobs`/`registry` ORM is + the new tension. **Restrict imports to ORM models + read-only `select()` — NEVER import a + sibling `service.py` or `schemas.py`.** The chosen alternative over an `ASGITransport` + in-process-HTTP approach (the `demo` slice's pattern). **MUST be called out in the PR + description** per `.claude/rules/product-vision.md` § "When Ideas Don't Align". +2. **Scope** — feature-doc MVP **plus** the retraining-candidate queue: two endpoints total. + `/ops/model-health` and `/ops/job-health` from the feature brief are **folded into + `/ops/summary`** (model-health → the alias section; job-health → the jobs section). + **Deferred — do NOT build:** drift indicators, bulk-action queue, action drawer, + WebSocket live updates, exportable incident report. +3. **Provider health** — `config.service.get_provider_health()` is a *service function*, so + the `ops` backend does NOT import it. The frontend reuses the **existing** + `useProviderHealth()` hook from `frontend/src/hooks/use-config.ts` (it already calls + `GET /config/providers/health`). + +### Documentation & References + +```yaml +# MUST READ — backend slice pattern to mirror exactly +- file: app/features/analytics/routes.py + why: Canonical read-only aggregation router. Router decl: `router = APIRouter(prefix="/analytics", tags=["analytics"])`. Endpoint signatures, Query() validation, `db: AsyncSession = Depends(get_db)`, `response_model=`. Imports header lines 1-23. +- file: app/features/analytics/service.py + why: `AnalyticsService` class; SQLAlchemy 2.0 `select()` + `func.sum/count`; `.where()` before `.group_by()`; `DISTINCT ON` latest-per-grain in `compute_inventory_status`; `result.one()/.all()`; `logger.info("analytics.", ...)`. +- file: app/features/analytics/schemas.py + why: Pydantic v2 response models — `model_config = ConfigDict(from_attributes=True)`, `Field(..., description=...)`, str-Enum pattern. +- file: app/features/analytics/__init__.py + why: EXACT slice `__init__.py` shape to mirror (docstring + imports + `__all__`). +- file: app/features/analytics/tests/conftest.py + why: `db_session` + `client` fixtures; `app.dependency_overrides[get_db]`; `AsyncClient(transport=ASGITransport(app=app), base_url="http://test")`; `TEST-`-prefixed sample data; FK-safe cleanup. +- file: app/features/analytics/tests/test_routes_integration.py + why: `@pytest.mark.integration` + `@pytest.mark.asyncio`, `client.get(path, params=...)`, status + JSON assertions. +- file: app/features/analytics/tests/test_schemas.py + why: Unmarked unit tests for Pydantic construction/validation. + +# MUST READ — data-source ORM models (the `ops` service queries these) +- file: app/features/jobs/models.py + why: `Job` table `job`; `JobStatus`/`JobType` enums; columns incl. `job_id`, `status`, `completed_at`, `created_at`, `error_message`, `error_type`, `run_id`. +- file: app/features/registry/models.py + why: `ModelRun` table `model_run` (status, model_type, metrics JSONB, data_window_end, store_id, product_id, completed_at, created_at); `RunStatus` enum; `DeploymentAlias` table `deployment_alias` (alias_name, run_id FK→model_run.id, relationship `.run`). +- file: app/features/data_platform/models.py + why: `SalesDaily` (table `sales_daily`, column `date: Mapped[datetime.date]`) — for the latest-sales-date freshness query. +- file: app/core/database.py + why: `get_db` async dependency (auto-commit/rollback) and `Base`. Import: `from app.core.database import get_db`. +- file: app/core/exceptions.py + why: `BadRequestError` etc.; RFC 7807 handler already registered in `app/main.py`. +- file: app/core/health.py + why: DB-connectivity check pattern — `await db.execute(text("SELECT 1"))` in try/except. +- file: app/main.py + why: router import + `app.include_router(...)` wiring (analytics: import line 17, include line 133). + +# MUST READ — frontend patterns +- file: frontend/src/pages/visualize/demand.tsx + why: Closest dense data page — header, error→loading→empty early returns, hooks, useMemo, Card/Table, inline helper subcomponents, `@/` imports. +- file: frontend/src/hooks/use-runs.ts + why: TanStack Query hook module pattern to mirror for `use-ops.ts`. +- file: frontend/src/hooks/use-jobs.ts + why: `refetchInterval` polling pattern. +- file: frontend/src/hooks/use-config.ts + why: ALREADY EXPORTS `useProviderHealth()` — reuse it, do NOT duplicate. +- file: frontend/src/lib/api.ts + why: `api(endpoint, config)` generic client; `ApiError`; `formatNumber`/`formatPercent`. +- file: frontend/src/types/api.ts + why: response-type conventions; `ProviderHealth` already at line ~575; reuse `JobStatus`/`RunStatus` unions. +- file: frontend/src/lib/constants.ts + why: `ROUTES` object + `NAV_ITEMS` array — add `ROUTES.OPS` + a single-link nav entry. +- file: frontend/src/lib/status-utils.ts + why: ALREADY EXPORTS `getStatusVariant(status)` → StatusBadge variant — reuse for job/run status badges. +- file: frontend/src/App.tsx + why: lazy-load + `` + `` inside `}>`. +- file: frontend/src/components/charts/kpi-card.tsx + why: `KPICard` props — `title`, `value:string|number`, `description?`, `icon?:LucideIcon`, `trend?`, `isLoading?`. +- file: frontend/src/components/common/status-badge.tsx + why: `StatusBadge` variants — `default|success|warning|error|info|pending`. +- file: frontend/src/components/common/error-display.tsx + why: `ErrorDisplay({error,title?,onRetry?})` + `EmptyState({title,description?,action?,icon?})`. +- file: frontend/src/lib/knowledge-utils.ts + why: pattern for a PURE util module + colocated `*.test.ts` vitest file. + +# External docs +- url: https://docs.sqlalchemy.org/en/20/tutorial/data_select.html#aggregate-functions-with-group-by-having + why: `select(Job.status, func.count()).group_by(Job.status)` aggregation. +- url: https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#postgresql-distinct-on + why: `.distinct(col, col)` DISTINCT ON — order_by MUST lead with the same columns. +- url: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html + why: `await db.execute(select(...))`, `.scalar()`, `.scalars()`, `.all()`. +- url: https://fastapi.tiangolo.com/tutorial/query-params-str-validations/ + why: `Query(default=20, ge=1, le=100)` bounded validation. +- url: https://tanstack.com/query/latest/docs/framework/react/reference/useQuery + why: `refetchInterval`, `enabled`, `queryKey` for polling hooks. +- url: https://docs.pydantic.dev/latest/concepts/config/ + why: `ConfigDict(from_attributes=True)` for ORM-row → Pydantic response models. + +- docfile: .agents/plans/forecastops-control-center.md + why: The full 17-task implementation plan with per-task IMPLEMENT/PATTERN/GOTCHA/VALIDATE. +- docfile: docs/optional-features/02-forecastops-control-center.md + why: Original feature brief — UX sections, risk model, validation plan. +``` + +### Current Codebase tree (relevant subset) + +```bash +app/ +├── main.py # router wiring — UPDATE +├── core/ +│ ├── database.py # get_db, Base +│ ├── exceptions.py # BadRequestError, RFC 7807 +│ └── health.py # SELECT 1 connectivity pattern +├── features/ +│ ├── analytics/ # ← MIRROR THIS SLICE (read-only aggregation) +│ │ ├── __init__.py routes.py schemas.py service.py +│ │ └── tests/ (conftest.py, test_routes_integration.py, test_schemas.py) +│ ├── jobs/models.py # Job, JobStatus, JobType +│ ├── registry/models.py # ModelRun, RunStatus, DeploymentAlias +│ └── data_platform/models.py # SalesDaily +frontend/src/ +├── App.tsx # route registration — UPDATE +├── hooks/ (use-runs.ts, use-jobs.ts, use-config.ts, index.ts) +├── lib/ (api.ts, constants.ts, status-utils.ts, knowledge-utils.ts) +├── types/api.ts # response types — UPDATE +├── pages/visualize/demand.tsx # ← MIRROR for the dense page layout +└── components/ (charts/kpi-card.tsx, common/status-badge.tsx, common/error-display.tsx) +``` + +### Desired Codebase tree (files to add / touch) + +```bash +app/features/ops/ # NEW SLICE — read-only, no models.py, no migration +├── __init__.py # NEW — slice exports (mirror analytics/__init__.py) +├── schemas.py # NEW — Pydantic v2 response models +├── service.py # NEW — OpsService + pure score/extract helpers +├── routes.py # NEW — APIRouter(prefix="/ops") + 2 endpoints +└── tests/ + ├── __init__.py # NEW — empty package marker + ├── conftest.py # NEW — db_session, client, sample-data fixtures + ├── test_schemas.py # NEW — unit (unmarked) + ├── test_service.py # NEW — unit for score_retraining_candidate/extract_wape + └── test_routes_integration.py # NEW — @pytest.mark.integration + +app/main.py # UPDATE — import + include_router(ops_router) + +frontend/src/ +├── hooks/use-ops.ts # NEW — useOpsSummary, useRetrainingCandidates +├── hooks/index.ts # UPDATE — export * from './use-ops' +├── lib/ops-utils.ts # NEW — pure helpers +├── lib/ops-utils.test.ts # NEW — vitest unit tests +├── pages/ops.tsx # NEW — the Control Center page +├── types/api.ts # UPDATE — Ops* response interfaces +├── lib/constants.ts # UPDATE — ROUTES.OPS + NAV_ITEMS entry +└── App.tsx # UPDATE — lazy import + +``` + +### Known Gotchas & Library Quirks + +```python +# CRITICAL: ORM status columns are String, NOT enum-typed. Compare against the +# `.value`, never the enum object — mirror registry/service.py & jobs/service.py: +# select(Job.status, func.count()).where(Job.status == JobStatus.COMPLETED.value) +# +# CRITICAL: ruff DTZ rules — do NOT use `date.today()` or naive `datetime.now()`. +# Use timezone-aware forms: +# now = datetime.now(UTC) # from `datetime import UTC` +# today = datetime.now(UTC).date() # for staleness math +# +# CRITICAL: PostgreSQL DISTINCT ON — `.distinct(a, b)` REQUIRES `order_by` to lead +# with the SAME columns. Order the "latest" tiebreaker by `created_at.desc()` +# (TimestampMixin, always non-null) — NOT `completed_at` (nullable; DESC puts +# NULLs first in Postgres and would pick a NULL-completed row): +# select(ModelRun).where(ModelRun.status == RunStatus.SUCCESS.value) +# .distinct(ModelRun.store_id, ModelRun.product_id) +# .order_by(ModelRun.store_id, ModelRun.product_id, ModelRun.created_at.desc()) +# +# CRITICAL: `func.count()` with no arg = COUNT(*) — valid, used by analytics. +# `select(Col, func.count()).group_by(Col)` returns only EXISTING statuses; +# zero-fill the missing enum members in Python. +# +# CRITICAL: AsyncSession FORBIDS implicit IO / lazy-loading (SQLAlchemy async +# docs). The alias query MUST select BOTH entities — +# `select(DeploymentAlias, ModelRun).join(ModelRun, DeploymentAlias.run_id == ModelRun.id)` +# — rows come back as (alias, run) tuples; use the joined `ModelRun` row +# DIRECTLY. NEVER touch the `DeploymentAlias.run` relationship attribute — it +# triggers a lazy load → `MissingGreenlet` error. Same rule for every +# relationship: eager-select it or `selectinload()` it; never access it lazily. +# +# CRITICAL: model_run.metrics JSONB is frequently None or lacks WAPE — backtest +# metrics persist to job.result, NOT model_run.metrics (only an explicit +# update_run writes run metrics). `extract_wape()` MUST tolerate None / unrelated +# dicts / non-numeric values. Scoring MUST NEVER raise on missing data. +# +# CRITICAL: DeploymentAlias.run_id is the INTEGER model_run.id (FK), NOT the +# 32-char run_id string. In fixtures set it from the persisted run's `.id`. +# Insert ModelRun before DeploymentAlias; clean up DeploymentAlias first. +# +# CRITICAL: Pydantic strict-mode linter (app/core/tests/test_strict_mode_policy.py) +# only inspects request models with ConfigDict(strict=True). Ops schemas are +# RESPONSE models with ConfigDict(from_attributes=True) — date/datetime fields +# need NO Field(strict=False). Do NOT add strict=True. +# +# CRITICAL: Cross-slice ORM import is the ACCEPTED design (decision #1). No CI +# import-linter enforces slice boundaries, so this will not fail the build — +# but it MUST be flagged in the PR description. +# +# GOTCHA: commit-format scope allow-list has NO `ops` scope. Use feat(api): for +# backend commits, feat(ui): for frontend, feat(api,ui): for the wiring commit. +# +# GOTCHA: the /ops page renders inside AppShell () — do NOT add nav, +# container, or Toaster. Never hardcode raw colors — use shadcn variants / +# semantic tokens (.claude/rules/shadcn-ui.md). +``` + +--- + +## External Research Findings + +Verified May 2026 against the docs the feature brief cited. Each finding ends with a +**verdict** — what it changes (or confirms) for this PRP. + +### 1. SQLAlchemy async ORM — `https://docs.sqlalchemy.org/en/21/orm/extensions/asyncio.html` + +The repo pins `sqlalchemy[asyncio]>=2.0.36`; the doc page is the 2.1 line — the async +contract below is identical across 2.0/2.1. + +- Execution idioms confirmed: `await session.execute(stmt)` → `.scalars()` / `.all()` / + `.one()`; single aggregate value via `await session.scalar(select(func.max(col)))`. +- **CRITICAL — implicit IO is forbidden.** *"the application needs to avoid any points at + which IO-on-attribute access may occur."* Accessing an un-loaded relationship under + `AsyncSession` raises `MissingGreenlet`. Two safe patterns: eager-select the related + entity in the same `select(...)`, or `selectinload()` it. +- A single `AsyncSession` is not safe across concurrent tasks — irrelevant here (one + request = one `get_db` session), but do not `asyncio.gather` queries on the same session. +- **Verdict — applied.** The alias query already selects both entities + (`select(DeploymentAlias, ModelRun).join(...)`). Added a CRITICAL gotcha: iterate the + `(alias, run)` tuples and use the joined `ModelRun` directly; **never touch + `DeploymentAlias.run`**. No other relationship is accessed, so no `selectinload()` is + needed. No code-shape change beyond the explicit warning. + +### 2. FastAPI query-param validation — `https://fastapi.tiangolo.com/tutorial/query-params-str-validations/` + +- Bounded numeric query params: `Query(ge=…, le=…)`; a violation returns **HTTP 422** + (`Unprocessable Entity`) automatically — confirms the `?limit=0` / `?limit=200` → 422 + test cases. +- Current docs favour the `Annotated[int, Query(ge=1, le=100)] = 20` form over the + legacy `limit: int = Query(default=20, ge=1, le=100)` form. +- **Verdict — mirror the repo, not the docs.** `analytics/routes.py` uses the non-`Annotated` + `Query(...)` form. Consistency with the existing slice wins (`.claude/rules` § "don't + create new patterns when existing ones work"). Keep the PRP's `Query(default=20, ge=1, + le=100, …)` signature; if `analytics/routes.py` is found to use `Annotated`, match that + instead. + +### 3. TanStack Query v5 — `https://tanstack.com/query/.../guides/important-defaults` + +- v4→v5 renames: `cacheTime` → `gcTime`; `isLoading` → `isPending` (`isLoading` still + exists = `isPending && isFetching`); `keepPreviousData` is now + `placeholderData: keepPreviousData`. `use-runs.ts` already uses the v5 form — mirror it. +- Polling: pair `refetchInterval` with `refetchOnWindowFocus: false` to avoid focus-storms. + The repo's `query-client.ts` already sets `refetchOnWindowFocus: false` and + `staleTime: 5min` globally. +- **Verdict — applied.** `useOpsSummary` keeps `refetchInterval: 15000` (operational state + worth polling); `useRetrainingCandidates` gets **no** `refetchInterval` (slow-moving — + refetch-on-mount suffices). Task 12 updated accordingly. + +### 4. MLflow Model Registry — `https://www.mlflow.org/docs/latest/ml/model-registry/` + +- MLflow defines an **alias** as *"a mutable, named reference to a particular version of a + registered model"*; the `champion`/production-alias promotion pattern decouples + deployment from a specific version. +- ForecastLabAI's `DeploymentAlias` is the same concept. MLflow frames alias governance as + managing **staleness** — an alias is "stale" when it still points at an old version + after a better one exists. +- **Verdict — confirms the design.** The PRP's `is_stale` detection (alias → non-`success` + run, or a newer `success` run exists for the same store/product) is the industry-standard + alias-staleness check. No change; cite MLflow as the conceptual basis in the PR. + +### 5. NIST AI RMF — `https://www.nist.gov/itl/ai-risk-management-framework` + +- Four core functions: **Govern, Map, Measure, Manage**. *Measure* = continuously track + trustworthiness/performance of deployed AI; *Manage* = act on what monitoring surfaces. + (Operational depth lives in the AI RMF 1.0 PDF + Playbook, not the overview page.) +- **Verdict — framing only.** The Control Center operationalises *Measure* (model-health + metrics, freshness) and *Manage* (the "needs attention" + retraining queue). Useful + one-line justification for the PR description; no implementation impact. + +### 6. Model-retraining triggers (MLOps best practice — web search, May 2026) + +- Established taxonomy: **time-based** (simple, predictable, but "may lead to unnecessary + retraining"), **performance-based** (retrain on metric degradation — needs monitoring), + **drift-based** (data/concept drift — needs drift detection). Sources recommend combining + signals over a pure time-based trigger. +- **Verdict — confirms the heuristic.** The PRP's score blends a **time-based** signal + (staleness) with a **performance-based** signal (WAPE) — exactly the recommended hybrid. + Drift-based is correctly deferred (needs infra the repo doesn't have). When WAPE is + unknown the score degrades to time-based only — an acceptable, documented fallback. The + 60/40 staleness/error weighting is a defensible, deterministic heuristic; keep it. +- Sources: [When to Retrain Your ML Models](https://tech.flowblog.io/blog/when-to-retrain-your-ml-models-for-success), + [Model Retraining 2026 (AIMultiple)](https://research.aimultiple.com/model-retraining/), + [CMU SEI — Automated Retraining](https://www.sei.cmu.edu/blog/improving-automated-retraining-of-machine-learning-models/). + +### 7. WAPE as the error signal (web search, May 2026) + +- WAPE/WMAPE is volume-weighted — a miss on a high-volume SKU counts more — and is not + destabilised by low-demand items; it is the recommended single accuracy metric for + demand forecasting. sMAPE is widely considered broken (unstable near zero, can go + negative). MAE is interpretable but scale-blind. +- **Verdict — confirms the choice.** Using WAPE as the score's error component is correct. + There is no universal "bad WAPE" threshold; the score's cap at WAPE 100 (total error = + total demand) is a reasonable normalisation ceiling. Keep it. +- Sources: [Forecast Accuracy Metrics 2026](https://prospeo.io/s/forecast-accuracy-metrics), + [MAPE vs WMAPE vs SMAPE](https://medium.com/@vinitkothari.24/time-series-evaluation-metrics-mape-vs-wmape-vs-smape-which-one-to-use-why-and-when-part1-32d3852b4779). + +### 8. Cited docs assessed as out-of-scope (do NOT pull these in) + +- **OpenTelemetry** (`opentelemetry.io`, `opentelemetry-python-contrib…fastapi`) — the repo + deliberately ships **no metrics/traces** (`docs/_base/SECURITY.md`: "Metrics — none … + Traces — none"). The `/ops` Control Center **is** the observability surface. Adding OTel + would be a new dependency and a scope violation — **do not add it.** +- **scikit-learn model persistence / `TimeSeriesSplit`** — the `ops` slice does no training + or cross-validation; those belong to `forecasting`/`backtesting`. Not relevant here. +- **Recharts** — MVP is cards + tables (no charts). Recharts is already available + (`frontend/src/components/ui/chart.tsx`) if a sparkline is wanted in a later iteration; + deferred, not part of this PRP. + +--- + +## Implementation Blueprint + +### Data models and structure — `app/features/ops/schemas.py` + +All response models. Every model: `model_config = ConfigDict(from_attributes=True)`. +Every field: `Field(..., description="...")`. Counts: `Field(..., ge=0, ...)`. + +```python +# Pydantic v2 response models (NOT request bodies — no strict=True) +class SystemHealth(BaseModel): + api_ok: bool + database_connected: bool + latest_successful_job_at: datetime | None + +class StatusCount(BaseModel): + status: str + count: int = Field(..., ge=0) + +class JobHealth(BaseModel): + counts: list[StatusCount] # one per JobStatus, zero-filled + completed_today: int = Field(..., ge=0) + failed_total: int = Field(..., ge=0) + active_total: int = Field(..., ge=0) # pending + running + +class RunHealth(BaseModel): + counts: list[StatusCount] # one per RunStatus, zero-filled + success_rate: float | None # success / (total - archived); None if denom 0 + failed_total: int = Field(..., ge=0) + +class AliasHealth(BaseModel): + alias_name: str + run_id: str + run_status: str + model_type: str + store_id: int + product_id: int + is_stale: bool + stale_reason: str | None + wape: float | None + +class DataFreshness(BaseModel): + latest_sales_date: date | None + latest_job_completed_at: datetime | None + latest_run_completed_at: datetime | None + +class AttentionItem(BaseModel): + item_type: Literal["failed_job", "failed_run", "stale_alias"] + entity_id: str # job_id for failed_job; run_id for failed_run AND stale_alias + label: str + detail: str + occurred_at: datetime | None + +class OpsSummaryResponse(BaseModel): + system: SystemHealth + jobs: JobHealth + runs: RunHealth + aliases: list[AliasHealth] + freshness: DataFreshness + attention_items: list[AttentionItem] + generated_at: datetime + +class RetrainingCandidate(BaseModel): + store_id: int + product_id: int + priority_score: float = Field(..., ge=0.0, le=1.0) + staleness_days: int = Field(..., ge=0) + wape: float | None + latest_run_id: str | None + latest_run_status: str | None + reason: str + +class RetrainingCandidatesResponse(BaseModel): + candidates: list[RetrainingCandidate] + total_evaluated: int = Field(..., ge=0) + generated_at: datetime +``` + +### Per-task pseudocode (critical details — not full code) + +```python +# ── app/features/ops/service.py — pure helpers (module scope, above OpsService) ── + +def extract_wape(metrics: dict[str, Any] | None) -> float | None: + # GOTCHA: match the param type to ModelRun.metrics' Mapped[...] annotation. + # Try "wape", "wape_mean", "WAPE"; return first numeric (int|float, not bool); else None. + if not metrics: + return None + for key in ("wape", "wape_mean", "WAPE"): + v = metrics.get(key) + if isinstance(v, (int, float)) and not isinstance(v, bool): + return float(v) + return None + +def score_retraining_candidate(staleness_days: int, wape: float | None) -> float: + """Retraining priority in [0.0, 1.0]; higher = more urgent. + 60% staleness (cap 90 days) + 40% error (cap WAPE 100).""" + staleness_norm = min(max(staleness_days, 0), 90) / 90.0 + error_norm = min(max(wape, 0.0), 100.0) / 100.0 if wape is not None else 0.0 + return round(0.6 * staleness_norm + 0.4 * error_norm, 4) + + +# ── OpsService — no custom __init__; just two async methods ── + +class OpsService: + async def get_summary(self, db: AsyncSession) -> OpsSummaryResponse: + now = datetime.now(UTC) + + # SYSTEM + try: + await db.execute(text("SELECT 1")) + db_ok = True + except Exception: # noqa: BLE001 — connectivity probe + db_ok = False + latest_job = await db.scalar( + select(func.max(Job.completed_at)).where(Job.status == JobStatus.COMPLETED.value) + ) + + # JOBS — server-side GROUP BY, zero-fill the enum + job_rows = (await db.execute( + select(Job.status, func.count()).group_by(Job.status))).all() + job_map = {s: c for s, c in job_rows} + job_counts = [StatusCount(status=s.value, count=job_map.get(s.value, 0)) + for s in JobStatus] + start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + completed_today = await db.scalar(select(func.count()).select_from(Job).where( + Job.status == JobStatus.COMPLETED.value, Job.completed_at >= start_of_day)) or 0 + # active_total = pending + running ; failed_total = failed (from job_map) + + # RUNS — same GROUP BY pattern over RunStatus; success_rate = success/(total-archived) + + # ALIASES — join, compute staleness + alias_rows = (await db.execute( + select(DeploymentAlias, ModelRun) + .join(ModelRun, DeploymentAlias.run_id == ModelRun.id))).all() + # For each (alias, run): is_stale via _is_alias_stale(run, db-derived newer-success), + # wape = extract_wape(run.metrics). + + # FRESHNESS + latest_sales_date = await db.scalar(select(func.max(SalesDaily.date))) + # latest_job_completed_at, latest_run_completed_at (status==SUCCESS) likewise. + + # ATTENTION ITEMS — 10 most-recent failed jobs + 10 failed runs + every stale alias + failed_jobs = (await db.execute( + select(Job).where(Job.status == JobStatus.FAILED.value) + .order_by(Job.created_at.desc()).limit(10))).scalars().all() + # failed_job → AttentionItem(entity_id=job.job_id, occurred_at=job.created_at, ...) + # failed_run → AttentionItem(entity_id=run.run_id, ...) + # stale_alias → AttentionItem(entity_id=, label="alias ''", ...) + + logger.info("ops.summary_computed", db_ok=db_ok, failed_jobs=..., stale_aliases=...) + return OpsSummaryResponse(...) + + async def get_retraining_candidates(self, db: AsyncSession, limit: int + ) -> RetrainingCandidatesResponse: + today = datetime.now(UTC).date() + # latest SUCCESS run per (store, product) — DISTINCT ON, order_by created_at.desc() + runs = (await db.execute( + select(ModelRun).where(ModelRun.status == RunStatus.SUCCESS.value) + .distinct(ModelRun.store_id, ModelRun.product_id) + .order_by(ModelRun.store_id, ModelRun.product_id, ModelRun.created_at.desc()) + )).scalars().all() + candidates = [] + for run in runs: + staleness = (today - run.data_window_end).days + wape = extract_wape(run.metrics) + score = score_retraining_candidate(staleness, wape) + reason = f"{staleness}d since last train window" + ( + f"; WAPE {wape:.1f}" if wape is not None else "; WAPE unknown") + candidates.append(RetrainingCandidate( + store_id=run.store_id, product_id=run.product_id, priority_score=score, + staleness_days=max(staleness, 0), wape=wape, latest_run_id=run.run_id, + latest_run_status=run.status, reason=reason)) + candidates.sort(key=lambda c: c.priority_score, reverse=True) + return RetrainingCandidatesResponse( + candidates=candidates[:limit], total_evaluated=len(candidates), + generated_at=datetime.now(UTC)) +``` + +```python +# ── app/features/ops/routes.py ── +router = APIRouter(prefix="/ops", tags=["ops"]) + +@router.get("/summary", response_model=OpsSummaryResponse, + summary="Operational summary for the Control Center") +async def get_ops_summary(db: AsyncSession = Depends(get_db)) -> OpsSummaryResponse: + return await OpsService().get_summary(db) + +@router.get("/retraining-candidates", response_model=RetrainingCandidatesResponse, + summary="Ranked retraining-candidate queue") +async def get_retraining_candidates( + limit: int = Query(default=20, ge=1, le=100, description="Max candidates to return"), + db: AsyncSession = Depends(get_db), +) -> RetrainingCandidatesResponse: + return await OpsService().get_retraining_candidates(db, limit) +``` + +```typescript +// ── frontend/src/lib/ops-utils.ts — PURE (no React, no fetch) ── +import { ROUTES } from '@/lib/constants' +import type { AttentionItem, RetrainingCandidate, SystemHealth } from '@/types/api' + +export function summaryHealthVariant(s: SystemHealth): 'success' | 'error' { + return s.api_ok && s.database_connected ? 'success' : 'error' +} +export function attentionItemLink(item: AttentionItem): string { + // failed_job → /explorer/jobs/:id ; failed_run + stale_alias → /explorer/runs/:id + if (item.item_type === 'failed_job') return `/explorer/jobs/${item.entity_id}` + return `/explorer/runs/${item.entity_id}` +} +export function attentionBadgeVariant(t: AttentionItem['item_type']): 'error' | 'warning' { + return t === 'stale_alias' ? 'warning' : 'error' +} +export function formatStaleness(days: number): string { + return days <= 0 ? 'today' : `${days}d` +} +export function sortRetrainingCandidates(rows: RetrainingCandidate[]): RetrainingCandidate[] { + return [...rows].sort((a, b) => b.priority_score - a.priority_score) +} +``` + +### list of tasks to be completed (in order) + +```yaml +Task 1 — CREATE app/features/ops/schemas.py: + - MIRROR pattern from: app/features/analytics/schemas.py + - DEFINE the 11 response models above; every model ConfigDict(from_attributes=True) + - IMPORTS: from datetime import date, datetime; from typing import Literal; + from pydantic import BaseModel, ConfigDict, Field + - GOTCHA: response models — NO ConfigDict(strict=True), NO Field(strict=False) + - VALIDATE: uv run python -c "from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse; print('ok')" + +Task 2 — CREATE app/features/ops/service.py (pure helpers first): + - ADD module-scope `extract_wape` and `score_retraining_candidate` (pseudocode above) + - IMPORTS: from typing import Any + - GOTCHA: never raise on None/missing metrics + - VALIDATE: uv run python -c "from app.features.ops.service import score_retraining_candidate as s; assert s(90,100.0)==1.0 and s(0,None)==0.0; print('ok')" + +Task 3 — CREATE app/features/ops/tests/__init__.py + test_schemas.py + test_service.py: + - __init__.py empty + - test_schemas.py: construct each model; assert ge=0 rejects negatives (pytest.raises(ValidationError)) + - test_service.py: score boundaries (0,None)->0.0, (90,100.0)->1.0, mid, negative clamp, + wape>100 clamp; extract_wape each key / None / {} / non-numeric / bool + - MIRROR: app/features/analytics/tests/test_schemas.py — plain def test_*(), UNMARKED + - VALIDATE: uv run pytest -v -m "not integration" app/features/ops/tests/test_schemas.py app/features/ops/tests/test_service.py + +Task 4 — UPDATE app/features/ops/service.py — implement OpsService: + - ADD class OpsService (no custom __init__) with get_summary + get_retraining_candidates + - MIRROR: AnalyticsService.compute_inventory_status (DISTINCT ON), compute_kpis (func + scalar) + - IMPORTS: from datetime import UTC, datetime; + from sqlalchemy import func, select, text; + from sqlalchemy.ext.asyncio import AsyncSession; + from app.core.logging import get_logger; + from app.features.jobs.models import Job, JobStatus; + from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus; + from app.features.data_platform.models import SalesDaily; + from app.features.ops.schemas import (... all ...) + - GOTCHA: compare status == Enum.X.value; use datetime.now(UTC); created_at.desc() for DISTINCT ON + - VALIDATE: uv run mypy app/features/ops/ && uv run pyright app/features/ops/ + +Task 5 — CREATE app/features/ops/routes.py: + - MIRROR: app/features/analytics/routes.py header + endpoint signatures + - router = APIRouter(prefix="/ops", tags=["ops"]); 2 endpoints (pseudocode above) + - VALIDATE: uv run python -c "from app.features.ops.routes import router; print(sorted(r.path for r in router.routes))" + +Task 6 — CREATE app/features/ops/__init__.py: + - MIRROR: app/features/analytics/__init__.py — docstring + imports + __all__ + - VALIDATE: uv run python -c "from app.features.ops import router, OpsService; print('ok')" + +Task 7 — UPDATE app/main.py: + - FIND the block of `from app.features..routes import router as _router` imports + - INJECT: from app.features.ops.routes import router as ops_router + - FIND in create_app(): the run of `app.include_router(...)` calls + - INJECT: app.include_router(ops_router) (e.g. after analytics_router) + - PRESERVE ruff import sorting (keep grouped with app.features.* imports) + - VALIDATE: uv run python -c "from app.main import app; p={r.path for r in app.routes}; assert '/ops/summary' in p and '/ops/retraining-candidates' in p; print('wired')" + +Task 8 — CREATE app/features/ops/tests/conftest.py: + - MIRROR: app/features/analytics/tests/conftest.py (db_session, client fixtures verbatim; + extend TEST- cleanup to Job/ModelRun/DeploymentAlias — DeploymentAlias before ModelRun) + - ADD fixtures: sample_jobs (statuses incl. failed+error_message), + sample_runs (statuses incl. success with metrics={"wape":31.0} + failed; varied + store_id/product_id/data_window_end), sample_alias (DeploymentAlias→success run), + sample_sales (a couple SalesDaily rows) + - GOTCHA: DeploymentAlias.run_id = persisted run.id (int); insert ModelRun first + - VALIDATE: uv run pytest -m integration app/features/ops/tests/ --collect-only + +Task 9 — CREATE app/features/ops/tests/test_routes_integration.py: + - @pytest.mark.integration + @pytest.mark.asyncio + - tests: /ops/summary 200 happy (seeded) ; /ops/summary 200 resilient (no fixtures → + counts >= 0, status keys all present) ; /ops/retraining-candidates 200 sorted desc, + len <= limit ; ?limit=0 → 422 ; ?limit=200 → 422 + - MIRROR: app/features/analytics/tests/test_routes_integration.py + - GOTCHA: idempotent — assert structural invariants, not exact global totals + - VALIDATE: docker compose up -d && uv run pytest -v -m integration app/features/ops/ + +Task 10 — UPDATE frontend/src/types/api.ts: + - ADD interfaces: SystemHealth, StatusCount, JobHealth, RunHealth, AliasHealth, + DataFreshness, AttentionItem, OpsSummaryResponse, RetrainingCandidate, + RetrainingCandidatesResponse (dates as string; nullable → `| null`) + - VALIDATE: cd frontend && pnpm tsc --noEmit + +Task 11 — CREATE frontend/src/lib/ops-utils.ts + ops-utils.test.ts: + - IMPLEMENT pure functions (pseudocode above) + - MIRROR: frontend/src/lib/knowledge-utils.ts + knowledge-utils.test.ts + - VALIDATE: cd frontend && pnpm test --run src/lib/ops-utils.test.ts + +Task 12 — CREATE frontend/src/hooks/use-ops.ts + UPDATE hooks/index.ts: + - useOpsSummary(enabled=true): queryKey ['ops','summary'], api('/ops/summary'), + refetchInterval: 15000 (operational state — poll. Global query-client already + sets refetchOnWindowFocus:false, so this won't double-fire on tab focus.) + - useRetrainingCandidates(limit=20, enabled=true): queryKey ['ops','retraining',limit], + api('/ops/retraining-candidates', {params:{limit}}) + — NO refetchInterval: the queue moves slowly (changes only on a new run); + refetch-on-mount + manual invalidation is sufficient. Avoids needless load. + - DO NOT add useProviderHealth — it already exists in use-config.ts; reuse that. + - index.ts: add `export * from './use-ops'` + - MIRROR: frontend/src/hooks/use-runs.ts, use-jobs.ts + - VALIDATE: cd frontend && pnpm tsc --noEmit + +Task 13 — UPDATE frontend/src/lib/constants.ts: + - ADD `OPS: '/ops'` to ROUTES (after SHOWCASE) + - ADD `{ label: 'Control Center', href: ROUTES.OPS }` to NAV_ITEMS (after Showcase) + - PRESERVE the `as const` literal types + - VALIDATE: cd frontend && pnpm tsc --noEmit + +Task 14 — UPDATE frontend/src/App.tsx: + - ADD `const OpsPage = lazy(() => import('@/pages/ops'))` + - ADD `}>} />` + inside the }> block + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +Task 15 — CREATE frontend/src/pages/ops.tsx: + - export default function OpsPage() + - hooks: useOpsSummary(), useRetrainingCandidates(), useProviderHealth() [from use-config] + - early returns: ErrorDisplay(onRetry) → LoadingState → EmptyState (zero jobs AND runs) + - sections: System Health card, KPI row (KPICard ×4), Data Freshness card, + Needs Attention table (Link to attentionItemLink(item)), Retraining Queue table + - reuse getStatusVariant from @/lib/status-utils for job/run status badges + - MIRROR: frontend/src/pages/visualize/demand.tsx + - GOTCHA: renders inside AppShell — no nav/container; no raw colors + - VALIDATE: cd frontend && pnpm tsc --noEmit && pnpm lint + +Task 16 — FULL validation sweep (all gates — see Validation Loop) + +Task 17 — Browser dogfood per .claude/rules/ui-design.md (webapp-testing / agent-browser) +``` + +### Integration Points + +```yaml +DATABASE: + - migration: NONE — read-only slice, no schema change. + - tables read (existing): job, model_run, deployment_alias, sales_daily. + +ROUTES (backend): + - add to: app/main.py + - import: from app.features.ops.routes import router as ops_router + - wire: app.include_router(ops_router) + +ROUTES (frontend): + - add to: frontend/src/lib/constants.ts → ROUTES.OPS = '/ops' ; NAV_ITEMS entry + - add to: frontend/src/App.tsx → lazy import + + +HOOKS: + - new: frontend/src/hooks/use-ops.ts + - update: frontend/src/hooks/index.ts → export * from './use-ops' + - reuse: useProviderHealth from frontend/src/hooks/use-config.ts (do NOT duplicate) + +CONFIG: none — no new settings, no new env var. +``` + +--- + +## Validation Loop + +### Level 1: Syntax & Style + +```bash +uv run ruff check . --fix +uv run ruff format --check . +cd frontend && pnpm lint +# Expected: no errors. Common trap: date.today() → ruff DTZ — use datetime.now(UTC).date(). +``` + +### Level 2: Type Checks + +```bash +uv run mypy app/ && uv run pyright app/ # both --strict +cd frontend && pnpm tsc --noEmit +# Expected: no errors. +``` + +### Level 3: Unit Tests + +```bash +uv run pytest -v -m "not integration" app/features/ops/ +cd frontend && pnpm test --run src/lib/ops-utils.test.ts +``` + +Backend unit cases (`test_service.py`, pure — no DB, no mocks): +```python +def test_score_zero_when_fresh_and_no_error(): + assert score_retraining_candidate(0, None) == 0.0 + +def test_score_max_when_fully_stale_and_max_error(): + assert score_retraining_candidate(90, 100.0) == 1.0 + +def test_score_clamps_negative_staleness_and_high_wape(): + assert score_retraining_candidate(-5, 250.0) == 0.4 # staleness→0, error→1.0, *0.4 + +def test_extract_wape_prefers_wape_then_wape_mean(): + assert extract_wape({"wape": 12.0}) == 12.0 + assert extract_wape({"wape_mean": 8.5}) == 8.5 + assert extract_wape(None) is None + assert extract_wape({}) is None + assert extract_wape({"wape": "bad"}) is None + assert extract_wape({"wape": True}) is None # bool is not a metric +``` + +### Level 4: Integration Tests + +```bash +docker compose up -d +uv run alembic upgrade head +uv run pytest -v -m integration app/features/ops/ +``` + +`test_routes_integration.py` (`@pytest.mark.integration` + `@pytest.mark.asyncio`): +- `/ops/summary` → 200; `system.database_connected is True`; job & run `counts` cover + every status key; seeded failed job appears in `attention_items`; `freshness.latest_sales_date` set. +- `/ops/summary` with no fixtures → 200 (never 500); all counts `>= 0`; `attention_items` is a list. +- `/ops/retraining-candidates` → 200; `candidates` sorted by `priority_score` desc; `len <= limit`. +- `/ops/retraining-candidates?limit=0` → 422. +- `/ops/retraining-candidates?limit=200` → 422. + +### Level 5: Manual Validation + +```bash +uv run uvicorn app.main:app --reload --port 8123 & +curl -s http://localhost:8123/ops/summary | head -c 400 +curl -s "http://localhost:8123/ops/retraining-candidates?limit=5" | head -c 400 +curl -s -o /dev/null -w '%{http_code}\n' "http://localhost:8123/ops/retraining-candidates?limit=0" # 422 +# Frontend: seed first (make demo), then open http://localhost:5173/ops via +# the webapp-testing skill / agent-browser — verify all 5 sections, nav item, +# attention-item links route to Explorer detail pages, retraining table sorted, +# empty-state on a fresh DB. Type-check passing ≠ UI works. +``` + +--- + +## Final Validation Checklist + +- [ ] `uv run ruff check . && uv run ruff format --check .` — clean +- [ ] `uv run mypy app/ && uv run pyright app/` — clean (`--strict`) +- [ ] `uv run pytest -v -m "not integration"` — green +- [ ] `docker compose up -d && uv run pytest -v -m integration` — green +- [ ] `cd frontend && pnpm tsc --noEmit && pnpm lint && pnpm test --run` — green +- [ ] `GET /ops/summary` and `GET /ops/retraining-candidates` behave per Success Criteria +- [ ] `/ops` page renders all 5 sections in a real browser; nav item present; links work +- [ ] No new dependency, no new table, no migration +- [ ] Backend cross-slice imports are ORM-models-only; PR description flags the tension +- [ ] Commits use `feat(api)` / `feat(ui)` scopes (no `ops` scope exists) and reference an open issue + +--- + +## Anti-Patterns to Avoid + +- ❌ Don't import a sibling slice's `service.py` or `schemas.py` — ORM models only. +- ❌ Don't fetch full lists and count in Python — use `func.count()` + `GROUP BY`. +- ❌ Don't use `date.today()` / naive `datetime.now()` — ruff DTZ; use `datetime.now(UTC)`. +- ❌ Don't add `ConfigDict(strict=True)` to response models. +- ❌ Don't duplicate `useProviderHealth` — reuse the one in `use-config.ts`. +- ❌ Don't re-implement a status→badge mapper — reuse `getStatusVariant` from `status-utils.ts`. +- ❌ Don't create `app/features/ops/models.py` or an Alembic migration. +- ❌ Don't let scoring raise on `None`/missing `metrics` — degrade to staleness-only. +- ❌ Don't claim the UI works on a green type-check — dogfood it in a browser. +- ❌ Don't catch-all silently except for the deliberate DB-connectivity probe. + +--- + +## Workflow Notes + +- Open a GitHub issue first (`gh issue list` / `gh issue create`); branch + `feat/ops-control-center` off `dev` (`.claude/rules/branch-naming.md`); every commit + references the issue and uses `feat(api)` / `feat(ui)` / `feat(api,ai)` scopes; PR into `dev`. +- The cross-slice ORM import (`jobs`, `registry`) is a deliberate, accepted tension with the + vertical-slice rule — **state it explicitly in the PR description** per + `.claude/rules/product-vision.md`. + +## Confidence Score + +**9 / 10** for one-pass implementation success. + +Rationale: the `analytics` slice is a near-exact backend template; every data source, ORM +column, enum, and frontend pattern is verified against the live codebase; all three open +items the plan flagged (`SalesDaily`, `useProviderHealth`, `getStatusVariant`) are resolved +inline. The external-research pass (§ External Research Findings) validated the retraining +heuristic (hybrid time+performance trigger, WAPE error signal) and the alias-staleness +design against MLOps/MLflow guidance, and caught one latent bug — the `AsyncSession` +lazy-load trap on `DeploymentAlias.run` — now fixed with an explicit CRITICAL gotcha. +Residual risk: (1) integration-test fixture FK ordering for `DeploymentAlias`→`ModelRun`; +(2) `model_run.metrics` shape variability — both mitigated by defensive `extract_wape` and +structural (not exact-total) test assertions; (3) minor SQLAlchemy `DISTINCT ON` / typing +friction under `--strict`, mitigated by the explicit gotcha and `created_at`-ordering +guidance. diff --git a/app/features/ops/__init__.py b/app/features/ops/__init__.py new file mode 100644 index 00000000..79f4d56f --- /dev/null +++ b/app/features/ops/__init__.py @@ -0,0 +1,18 @@ +"""ForecastOps Control Center slice. + +A read-only vertical slice that aggregates operational state across the +``jobs``, ``registry``, and ``data_platform`` slices: system health, job / run / +alias health, data freshness, a needs-attention list, and a ranked +retraining-candidate queue. Has no models and no migration — it only reads. +""" + +from app.features.ops.routes import router +from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse +from app.features.ops.service import OpsService + +__all__ = [ + "OpsService", + "OpsSummaryResponse", + "RetrainingCandidatesResponse", + "router", +] diff --git a/app/features/ops/routes.py b/app/features/ops/routes.py new file mode 100644 index 00000000..a3f0abeb --- /dev/null +++ b/app/features/ops/routes.py @@ -0,0 +1,83 @@ +"""API routes for the ForecastOps Control Center. + +Two read-only aggregation endpoints backing the ``/ops`` Control Center page: +operational summary and the ranked retraining-candidate queue. +""" + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_db +from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse +from app.features.ops.service import OpsService + +router = APIRouter(prefix="/ops", tags=["ops"]) + + +@router.get( + "/summary", + response_model=OpsSummaryResponse, + summary="Operational summary for the Control Center", + description=""" +Aggregate the system's operational state into one response. + +**Sections**: +- `system`: API liveness, database connectivity, latest completed job. +- `jobs`: per-status job histogram plus active / failed / completed-today counts. +- `runs`: per-status model-run histogram plus success rate and failed count. +- `aliases`: every deployment alias with a staleness verdict. +- `freshness`: latest sales date, latest completed job, latest successful run. +- `attention_items`: recent failed jobs, failed runs, and stale aliases. + +Returns HTTP 200 even on an empty database — every section degrades to +zeros / nulls / empty lists rather than erroring. +""", +) +async def get_ops_summary( + db: AsyncSession = Depends(get_db), +) -> OpsSummaryResponse: + """Return the aggregated operational summary. + + Args: + db: Database session. + + Returns: + The full operational summary. + """ + return await OpsService().get_summary(db) + + +@router.get( + "/retraining-candidates", + response_model=RetrainingCandidatesResponse, + summary="Ranked retraining-candidate queue", + description=""" +Rank `(store, product)` grains by a deterministic retraining-priority score. + +Each grain is evaluated from its latest successful model run. The score blends +a time-based signal (staleness since the training-data window ended) with a +performance-based signal (WAPE), so the highest-scoring rows are the most +overdue and/or least accurate. + +Candidates are sorted by `priority_score` descending and capped at `limit`. +""", +) +async def get_retraining_candidates( + limit: int = Query( + default=20, + ge=1, + le=100, + description="Maximum number of candidates to return (1-100, default 20).", + ), + db: AsyncSession = Depends(get_db), +) -> RetrainingCandidatesResponse: + """Return the ranked retraining-candidate queue. + + Args: + limit: Maximum number of candidates to return. + db: Database session. + + Returns: + Candidates sorted by priority score (highest first). + """ + return await OpsService().get_retraining_candidates(db, limit) diff --git a/app/features/ops/schemas.py b/app/features/ops/schemas.py new file mode 100644 index 00000000..e58a9b78 --- /dev/null +++ b/app/features/ops/schemas.py @@ -0,0 +1,246 @@ +"""Pydantic schemas for the ForecastOps Control Center. + +All models here are HTTP **response** models — they are built from aggregated +operational state, never parsed from a request body. They therefore use +``ConfigDict(from_attributes=True)`` and deliberately do NOT set +``strict=True`` (the strict-mode request-body policy does not apply). +""" + +from datetime import date, datetime +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + +# ============================================================================= +# System & freshness +# ============================================================================= + + +class SystemHealth(BaseModel): + """Liveness snapshot for the Control Center header.""" + + model_config = ConfigDict(from_attributes=True) + + api_ok: bool = Field( + ..., + description="True whenever this response was produced (the API served the request).", + ) + database_connected: bool = Field( + ..., + description="True when a 'SELECT 1' probe against PostgreSQL succeeded.", + ) + latest_successful_job_at: datetime | None = Field( + None, + description="Completion timestamp of the most recent completed job. " + "Null when no job has completed yet.", + ) + + +class DataFreshness(BaseModel): + """How current the underlying data and model state are.""" + + model_config = ConfigDict(from_attributes=True) + + latest_sales_date: date | None = Field( + None, + description="Most recent date present in sales_daily. Null when no sales exist.", + ) + latest_job_completed_at: datetime | None = Field( + None, + description="Completion timestamp of the most recently finished job (any outcome).", + ) + latest_run_completed_at: datetime | None = Field( + None, + description="Completion timestamp of the most recent successful model run.", + ) + + +# ============================================================================= +# Job & run health +# ============================================================================= + + +class StatusCount(BaseModel): + """One row of a status histogram (e.g. how many jobs are 'failed').""" + + model_config = ConfigDict(from_attributes=True) + + status: str = Field(..., description="The lifecycle status value.") + count: int = Field(..., ge=0, description="Number of entities in that status.") + + +class JobHealth(BaseModel): + """Aggregated job-execution health.""" + + model_config = ConfigDict(from_attributes=True) + + counts: list[StatusCount] = Field( + ..., + description="One entry per JobStatus, zero-filled for statuses with no rows.", + ) + completed_today: int = Field( + ..., + ge=0, + description="Jobs that completed since 00:00 UTC today.", + ) + failed_total: int = Field(..., ge=0, description="Total jobs in the 'failed' status.") + active_total: int = Field( + ..., + ge=0, + description="Jobs currently pending or running.", + ) + + +class RunHealth(BaseModel): + """Aggregated model-run health.""" + + model_config = ConfigDict(from_attributes=True) + + counts: list[StatusCount] = Field( + ..., + description="One entry per RunStatus, zero-filled for statuses with no rows.", + ) + success_rate: float | None = Field( + None, + description="Successful runs divided by non-archived runs. " + "Null when there are no non-archived runs.", + ) + failed_total: int = Field(..., ge=0, description="Total runs in the 'failed' status.") + + +# ============================================================================= +# Alias health +# ============================================================================= + + +class AliasHealth(BaseModel): + """Deployment-alias health, including a staleness verdict.""" + + model_config = ConfigDict(from_attributes=True) + + alias_name: str = Field(..., description="The deployment alias name.") + run_id: str = Field(..., description="External run_id of the aliased model run.") + run_status: str = Field(..., description="Lifecycle status of the aliased run.") + model_type: str = Field(..., description="Model type of the aliased run.") + store_id: int = Field(..., description="Store the aliased run targets.") + product_id: int = Field(..., description="Product the aliased run targets.") + is_stale: bool = Field( + ..., + description="True when the alias points at a non-successful run, or a newer " + "successful run exists for the same store/product.", + ) + stale_reason: str | None = Field( + None, + description="Human-readable explanation when is_stale is true; null otherwise.", + ) + wape: float | None = Field( + None, + description="WAPE of the aliased run, when present in its metrics; null otherwise.", + ) + + +# ============================================================================= +# Attention items +# ============================================================================= + + +class AttentionItem(BaseModel): + """One entry in the 'needs attention' list.""" + + model_config = ConfigDict(from_attributes=True) + + item_type: Literal["failed_job", "failed_run", "stale_alias"] = Field( + ..., + description="What kind of problem this row represents.", + ) + entity_id: str = Field( + ..., + description="job_id for failed_job; run_id for failed_run and stale_alias. " + "Used to deep-link to the matching Explorer detail page.", + ) + label: str = Field(..., description="Short title for the row.") + detail: str = Field(..., description="Longer explanation (error message or stale reason).") + occurred_at: datetime | None = Field( + None, + description="When the entity was created. Null when unknown.", + ) + + +# ============================================================================= +# Summary response +# ============================================================================= + + +class OpsSummaryResponse(BaseModel): + """Aggregated operational summary for the Control Center page.""" + + model_config = ConfigDict(from_attributes=True) + + system: SystemHealth = Field(..., description="Liveness and connectivity.") + jobs: JobHealth = Field(..., description="Job-execution health.") + runs: RunHealth = Field(..., description="Model-run health.") + aliases: list[AliasHealth] = Field( + ..., + description="Every deployment alias with its staleness verdict.", + ) + freshness: DataFreshness = Field(..., description="How current data and models are.") + attention_items: list[AttentionItem] = Field( + ..., + description="Recent failed jobs, failed runs, and stale aliases.", + ) + generated_at: datetime = Field(..., description="When this summary was computed (UTC).") + + +# ============================================================================= +# Retraining-candidate queue +# ============================================================================= + + +class RetrainingCandidate(BaseModel): + """One (store, product) pair ranked for retraining.""" + + model_config = ConfigDict(from_attributes=True) + + store_id: int = Field(..., description="Store the candidate covers.") + product_id: int = Field(..., description="Product the candidate covers.") + priority_score: float = Field( + ..., + ge=0.0, + le=1.0, + description="Retraining-priority score in [0, 1]; higher means more urgent.", + ) + staleness_days: int = Field( + ..., + ge=0, + description="Days since the latest successful run's training-data window ended.", + ) + wape: float | None = Field( + None, + description="WAPE of the latest successful run, when known; null otherwise.", + ) + latest_run_id: str | None = Field( + None, + description="External run_id of the latest successful run for this grain.", + ) + latest_run_status: str | None = Field( + None, + description="Status of that run (always 'success' for evaluated candidates).", + ) + reason: str = Field(..., description="Human-readable rationale for the score.") + + +class RetrainingCandidatesResponse(BaseModel): + """Ranked retraining-candidate queue.""" + + model_config = ConfigDict(from_attributes=True) + + candidates: list[RetrainingCandidate] = Field( + ..., + description="Candidates sorted by priority_score (descending), capped at the limit.", + ) + total_evaluated: int = Field( + ..., + ge=0, + description="Total (store, product) grains evaluated before applying the limit.", + ) + generated_at: datetime = Field(..., description="When this queue was computed (UTC).") diff --git a/app/features/ops/service.py b/app/features/ops/service.py new file mode 100644 index 00000000..d9e8cd18 --- /dev/null +++ b/app/features/ops/service.py @@ -0,0 +1,419 @@ +"""Service layer for the ForecastOps Control Center. + +Read-only aggregation across sibling slices. This module imports the ORM +**models** of the ``jobs``, ``registry``, and ``data_platform`` slices and runs +read-only ``select()`` queries against them. It deliberately does NOT import any +sibling ``service.py`` or ``schemas.py`` — the cross-slice coupling is confined +to the verified, read-only ORM surface (see PRP-24, decision #1). +""" + +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy import func, select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logging import get_logger +from app.features.data_platform.models import SalesDaily +from app.features.jobs.models import Job, JobStatus +from app.features.ops.schemas import ( + AliasHealth, + AttentionItem, + DataFreshness, + JobHealth, + OpsSummaryResponse, + RetrainingCandidate, + RetrainingCandidatesResponse, + RunHealth, + StatusCount, + SystemHealth, +) +from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus + +logger = get_logger(__name__) + +# Staleness (days) at which the time-based component of the score saturates. +_STALENESS_CAP_DAYS = 90 +# WAPE value at which the error-based component of the score saturates. +_WAPE_CAP = 100.0 +# How many recent failed jobs / runs to surface in the attention list. +_ATTENTION_LIMIT = 10 + + +# ============================================================================= +# Pure helpers (no DB, no I/O — unit-tested directly) +# ============================================================================= + + +def extract_wape(metrics: dict[str, Any] | None) -> float | None: + """Pull a WAPE value out of a model run's ``metrics`` JSONB blob. + + Tolerant by design: ``model_run.metrics`` is frequently None or carries an + unrelated metric set (backtest WAPE persists to ``job.result``, not run + metrics), so this returns None rather than raising whenever a numeric WAPE + cannot be found. Booleans are rejected — ``bool`` is an ``int`` subclass but + is never a valid metric value. + + Args: + metrics: The ``ModelRun.metrics`` JSONB dict, or None. + + Returns: + The WAPE as a float, or None when absent / non-numeric. + """ + if not metrics: + return None + for key in ("wape", "wape_mean", "WAPE"): + value = metrics.get(key) + if isinstance(value, bool): + continue + if isinstance(value, (int, float)): + return float(value) + return None + + +def score_retraining_candidate(staleness_days: int, wape: float | None) -> float: + """Compute a deterministic retraining-priority score in ``[0.0, 1.0]``. + + Blends a time-based signal (staleness, capped at 90 days, 60% weight) with a + performance-based signal (WAPE, capped at 100, 40% weight) — the hybrid + trigger recommended by MLOps retraining guidance. When WAPE is unknown the + score degrades gracefully to staleness-only. Never raises. + + Args: + staleness_days: Days since the run's training-data window ended. + wape: The run's WAPE, or None when unknown. + + Returns: + Priority score rounded to 4 decimals; higher means more urgent. + """ + staleness_norm = min(max(staleness_days, 0), _STALENESS_CAP_DAYS) / _STALENESS_CAP_DAYS + error_norm = min(max(wape, 0.0), _WAPE_CAP) / _WAPE_CAP if wape is not None else 0.0 + return round(0.6 * staleness_norm + 0.4 * error_norm, 4) + + +def _alias_staleness( + run: ModelRun, + latest_success_by_grain: dict[tuple[int, int], ModelRun], +) -> tuple[bool, str | None]: + """Decide whether an aliased run is stale, and why. + + An alias is stale when its run is no longer a successful run, or when a + newer successful run exists for the same ``(store, product)`` grain — the + industry-standard alias-staleness check (cf. MLflow alias governance). + + Args: + run: The model run the alias points at. + latest_success_by_grain: Latest successful run keyed by (store, product). + + Returns: + A ``(is_stale, reason)`` tuple; ``reason`` is None when not stale. + """ + if run.status != RunStatus.SUCCESS.value: + return True, f"aliased run status is '{run.status}', not 'success'" + latest = latest_success_by_grain.get((run.store_id, run.product_id)) + if latest is not None and latest.id != run.id and latest.created_at > run.created_at: + return True, "a newer successful run exists for this store/product" + return False, None + + +# ============================================================================= +# Service +# ============================================================================= + + +class OpsService: + """Read-only operational aggregation for the Control Center.""" + + async def get_summary(self, db: AsyncSession) -> OpsSummaryResponse: + """Aggregate system, job, run, alias, and freshness state. + + Args: + db: Database session. + + Returns: + The full operational summary. Never raises on an empty database — + every section degrades to zeros / nulls / empty lists. + """ + now = datetime.now(UTC) + + # ---- System health ------------------------------------------------ + try: + await db.execute(text("SELECT 1")) + database_connected = True + except Exception: + # Deliberate connectivity probe: any failure means "not connected". + database_connected = False + + latest_successful_job_at = await db.scalar( + select(func.max(Job.completed_at)).where(Job.status == JobStatus.COMPLETED.value) + ) + + # ---- Job health --------------------------------------------------- + job_count_rows = ( + await db.execute(select(Job.status, func.count()).group_by(Job.status)) + ).all() + job_count_map: dict[str, int] = {str(row[0]): int(row[1]) for row in job_count_rows} + job_counts = [ + StatusCount(status=status.value, count=job_count_map.get(status.value, 0)) + for status in JobStatus + ] + start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + completed_today = int( + await db.scalar( + select(func.count()) + .select_from(Job) + .where( + Job.status == JobStatus.COMPLETED.value, + Job.completed_at >= start_of_day, + ) + ) + or 0 + ) + jobs = JobHealth( + counts=job_counts, + completed_today=completed_today, + failed_total=job_count_map.get(JobStatus.FAILED.value, 0), + active_total=( + job_count_map.get(JobStatus.PENDING.value, 0) + + job_count_map.get(JobStatus.RUNNING.value, 0) + ), + ) + + # ---- Run health --------------------------------------------------- + run_count_rows = ( + await db.execute(select(ModelRun.status, func.count()).group_by(ModelRun.status)) + ).all() + run_count_map: dict[str, int] = {str(row[0]): int(row[1]) for row in run_count_rows} + run_counts = [ + StatusCount(status=status.value, count=run_count_map.get(status.value, 0)) + for status in RunStatus + ] + eligible = sum(run_count_map.values()) - run_count_map.get(RunStatus.ARCHIVED.value, 0) + success_rate = ( + run_count_map.get(RunStatus.SUCCESS.value, 0) / eligible if eligible > 0 else None + ) + runs = RunHealth( + counts=run_counts, + success_rate=success_rate, + failed_total=run_count_map.get(RunStatus.FAILED.value, 0), + ) + + # ---- Alias health ------------------------------------------------- + # Latest successful run per (store, product) — the staleness baseline. + latest_success_runs = ( + ( + await db.execute( + select(ModelRun) + .where(ModelRun.status == RunStatus.SUCCESS.value) + .distinct(ModelRun.store_id, ModelRun.product_id) + .order_by( + ModelRun.store_id, + ModelRun.product_id, + ModelRun.created_at.desc(), + ) + ) + ) + .scalars() + .all() + ) + latest_success_by_grain: dict[tuple[int, int], ModelRun] = { + (run.store_id, run.product_id): run for run in latest_success_runs + } + + # Two-query alias load. NEVER touch DeploymentAlias.run — accessing that + # relationship under AsyncSession triggers a lazy load (MissingGreenlet). + # Resolve the integer FK into a typed map of single-entity rows instead. + alias_rows = (await db.execute(select(DeploymentAlias))).scalars().all() + alias_run_ids = {alias.run_id for alias in alias_rows} + runs_by_id: dict[int, ModelRun] = {} + if alias_run_ids: + runs_by_id = { + run.id: run + for run in ( + (await db.execute(select(ModelRun).where(ModelRun.id.in_(alias_run_ids)))) + .scalars() + .all() + ) + } + + aliases: list[AliasHealth] = [] + stale_alias_items: list[AttentionItem] = [] + for alias in alias_rows: + run = runs_by_id.get(alias.run_id) + if run is None: # orphan FK — defensive; the FK constraint forbids it + continue + is_stale, stale_reason = _alias_staleness(run, latest_success_by_grain) + aliases.append( + AliasHealth( + alias_name=alias.alias_name, + run_id=run.run_id, + run_status=run.status, + model_type=run.model_type, + store_id=run.store_id, + product_id=run.product_id, + is_stale=is_stale, + stale_reason=stale_reason, + wape=extract_wape(run.metrics), + ) + ) + if is_stale: + stale_alias_items.append( + AttentionItem( + item_type="stale_alias", + entity_id=run.run_id, + label=f"alias '{alias.alias_name}' is stale", + detail=stale_reason or "alias is stale", + occurred_at=run.created_at, + ) + ) + + # ---- Data freshness ----------------------------------------------- + freshness = DataFreshness( + latest_sales_date=await db.scalar(select(func.max(SalesDaily.date))), + latest_job_completed_at=await db.scalar(select(func.max(Job.completed_at))), + latest_run_completed_at=await db.scalar( + select(func.max(ModelRun.completed_at)).where( + ModelRun.status == RunStatus.SUCCESS.value + ) + ), + ) + + # ---- Attention items ---------------------------------------------- + failed_jobs = ( + ( + await db.execute( + select(Job) + .where(Job.status == JobStatus.FAILED.value) + .order_by(Job.created_at.desc()) + .limit(_ATTENTION_LIMIT) + ) + ) + .scalars() + .all() + ) + failed_runs = ( + ( + await db.execute( + select(ModelRun) + .where(ModelRun.status == RunStatus.FAILED.value) + .order_by(ModelRun.created_at.desc()) + .limit(_ATTENTION_LIMIT) + ) + ) + .scalars() + .all() + ) + + attention_items: list[AttentionItem] = [ + AttentionItem( + item_type="failed_job", + entity_id=job.job_id, + label=f"{job.job_type} job failed", + detail=job.error_message or job.error_type or "Job failed", + occurred_at=job.created_at, + ) + for job in failed_jobs + ] + attention_items.extend( + AttentionItem( + item_type="failed_run", + entity_id=run.run_id, + label=f"{run.model_type} run failed", + detail=run.error_message or "Run failed", + occurred_at=run.created_at, + ) + for run in failed_runs + ) + attention_items.extend(stale_alias_items) + + logger.info( + "ops.summary_computed", + database_connected=database_connected, + failed_jobs=len(failed_jobs), + failed_runs=len(failed_runs), + stale_aliases=len(stale_alias_items), + ) + + return OpsSummaryResponse( + system=SystemHealth( + api_ok=True, + database_connected=database_connected, + latest_successful_job_at=latest_successful_job_at, + ), + jobs=jobs, + runs=runs, + aliases=aliases, + freshness=freshness, + attention_items=attention_items, + generated_at=now, + ) + + async def get_retraining_candidates( + self, db: AsyncSession, limit: int + ) -> RetrainingCandidatesResponse: + """Rank ``(store, product)`` grains by retraining priority. + + One candidate per grain — derived from its latest successful run. + + Args: + db: Database session. + limit: Maximum candidates to return (bounded 1..100 by the route). + + Returns: + Candidates sorted by ``priority_score`` descending, capped at limit. + """ + today = datetime.now(UTC).date() + + # Latest successful run per (store, product) — DISTINCT ON requires the + # ORDER BY to lead with the DISTINCT ON columns; created_at (non-null + # TimestampMixin column) is the "latest" tiebreaker. + latest_success_runs = ( + ( + await db.execute( + select(ModelRun) + .where(ModelRun.status == RunStatus.SUCCESS.value) + .distinct(ModelRun.store_id, ModelRun.product_id) + .order_by( + ModelRun.store_id, + ModelRun.product_id, + ModelRun.created_at.desc(), + ) + ) + ) + .scalars() + .all() + ) + + candidates: list[RetrainingCandidate] = [] + for run in latest_success_runs: + raw_staleness = (today - run.data_window_end).days + staleness_days = max(raw_staleness, 0) + wape = extract_wape(run.metrics) + score = score_retraining_candidate(raw_staleness, wape) + wape_part = f"WAPE {wape:.1f}" if wape is not None else "WAPE unknown" + candidates.append( + RetrainingCandidate( + store_id=run.store_id, + product_id=run.product_id, + priority_score=score, + staleness_days=staleness_days, + wape=wape, + latest_run_id=run.run_id, + latest_run_status=run.status, + reason=f"{staleness_days}d since last training window; {wape_part}", + ) + ) + + candidates.sort(key=lambda candidate: candidate.priority_score, reverse=True) + + logger.info( + "ops.retraining_candidates_computed", + total_evaluated=len(candidates), + returned=min(limit, len(candidates)), + ) + + return RetrainingCandidatesResponse( + candidates=candidates[:limit], + total_evaluated=len(candidates), + generated_at=datetime.now(UTC), + ) diff --git a/app/features/ops/tests/__init__.py b/app/features/ops/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/app/features/ops/tests/conftest.py b/app/features/ops/tests/conftest.py new file mode 100644 index 00000000..6bbb40de --- /dev/null +++ b/app/features/ops/tests/conftest.py @@ -0,0 +1,265 @@ +"""Test fixtures for the ops slice. + +Mirrors ``app/features/analytics/tests/conftest.py``: a real PostgreSQL session +(integration tests need ``docker-compose up -d``) with FK-safe, scoped cleanup. + +All seeded rows carry a ``test-`` / ``TEST-`` marker so the teardown never +touches a shared dev or CI dataset. +""" + +import uuid +from collections.abc import AsyncGenerator +from datetime import date + +import pytest +from httpx import ASGITransport, AsyncClient +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.core.config import get_settings +from app.core.database import get_db +from app.features.data_platform.models import Calendar, Product, SalesDaily, Store +from app.features.jobs.models import Job, JobStatus, JobType +from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus +from app.main import app + +# Calendar dates the ops sales fixture occupies — deleted on teardown. +_SALES_DATES = [date(2026, 3, 1), date(2026, 3, 2), date(2026, 3, 3)] + + +def _short_id() -> str: + """Return a short unique hex token for test natural keys.""" + return uuid.uuid4().hex[:12] + + +# ============================================================================= +# Database + client fixtures +# ============================================================================= + + +@pytest.fixture +async def db_session() -> AsyncGenerator[AsyncSession, None]: + """Yield an async session, then clean up every ``test-``/``TEST-`` row. + + Cleanup runs in FK-safe order: DeploymentAlias before ModelRun (alias FKs + the run), and Sales before its Store/Product parents. Jobs are independent. + Calendar rows are intentionally left in place — the sales fixture's dates + fall inside the seeder's window, so a seeded dataset may already reference + them; deleting them would hit a foreign-key violation. + """ + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=False) + async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with async_session_maker() as session: + try: + yield session + finally: + test_store_ids = select(Store.id).where(Store.code.like("TEST-%")) + test_product_ids = select(Product.id).where(Product.sku.like("TEST-%")) + # Aliases first — they FK-reference model_run. + await session.execute( + delete(DeploymentAlias).where(DeploymentAlias.alias_name.like("test-%")) + ) + await session.execute(delete(ModelRun).where(ModelRun.run_id.like("test-%"))) + await session.execute(delete(Job).where(Job.job_id.like("test-%"))) + await session.execute( + delete(SalesDaily).where( + SalesDaily.store_id.in_(test_store_ids) + | SalesDaily.product_id.in_(test_product_ids) + ) + ) + await session.execute(delete(Product).where(Product.sku.like("TEST-%"))) + await session.execute(delete(Store).where(Store.code.like("TEST-%"))) + await session.commit() + + await engine.dispose() + + +@pytest.fixture +async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]: + """Create a test client with the database dependency overridden.""" + + async def override_get_db() -> AsyncGenerator[AsyncSession, None]: + yield db_session + + app.dependency_overrides[get_db] = override_get_db + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + yield ac + + app.dependency_overrides.pop(get_db, None) + + +# ============================================================================= +# Sample-data fixtures +# ============================================================================= + + +@pytest.fixture +async def sample_jobs(db_session: AsyncSession) -> list[Job]: + """Create one job per lifecycle status; the failed job carries an error.""" + jobs = [ + Job( + job_id=f"test-{_short_id()}", + job_type=JobType.TRAIN.value, + status=JobStatus.PENDING.value, + params={"_test": True}, + ), + Job( + job_id=f"test-{_short_id()}", + job_type=JobType.PREDICT.value, + status=JobStatus.RUNNING.value, + params={"_test": True}, + ), + Job( + job_id=f"test-{_short_id()}", + job_type=JobType.BACKTEST.value, + status=JobStatus.COMPLETED.value, + params={"_test": True}, + result={"ok": True}, + ), + Job( + job_id=f"test-{_short_id()}", + job_type=JobType.TRAIN.value, + status=JobStatus.FAILED.value, + params={"_test": True}, + error_message="seeded failure", + error_type="ValueError", + ), + ] + for job in jobs: + db_session.add(job) + await db_session.commit() + for job in jobs: + await db_session.refresh(job) + return jobs + + +@pytest.fixture +async def sample_runs(db_session: AsyncSession) -> dict[str, ModelRun]: + """Create model runs across grains and statuses. + + ``success_old`` and ``success_new`` share grain (9001, 8001) so an alias to + ``success_old`` is provably stale (a newer successful run exists). + """ + + def _run( + status: str, + store_id: int, + product_id: int, + window_end: date, + metrics: dict[str, float] | None, + error_message: str | None = None, + ) -> ModelRun: + return ModelRun( + run_id=f"test-{_short_id()}", + status=status, + model_type="naive", + model_config={"_test": True}, + config_hash=_short_id()[:16], + data_window_start=date(2025, 1, 1), + data_window_end=window_end, + store_id=store_id, + product_id=product_id, + metrics=metrics, + error_message=error_message, + ) + + runs = { + "success_old": _run(RunStatus.SUCCESS.value, 9001, 8001, date(2026, 1, 1), {"wape": 31.0}), + "failed": _run( + RunStatus.FAILED.value, + 9002, + 8002, + date(2026, 2, 1), + None, + error_message="seeded run failure", + ), + "success_other": _run(RunStatus.SUCCESS.value, 9003, 8003, date(2026, 2, 15), None), + } + for run in runs.values(): + db_session.add(run) + await db_session.commit() + for run in runs.values(): + await db_session.refresh(run) + + # success_new is committed after success_old so its created_at is strictly + # later — making success_old the stale one for grain (9001, 8001). + success_new = _run(RunStatus.SUCCESS.value, 9001, 8001, date(2026, 4, 1), {"wape": 12.0}) + db_session.add(success_new) + await db_session.commit() + await db_session.refresh(success_new) + runs["success_new"] = success_new + return runs + + +@pytest.fixture +async def sample_alias( + db_session: AsyncSession, sample_runs: dict[str, ModelRun] +) -> DeploymentAlias: + """Alias pointing at the OLDER successful run — provably stale.""" + alias = DeploymentAlias( + alias_name=f"test-{_short_id()}", + run_id=sample_runs["success_old"].id, + description="ops slice test alias", + ) + db_session.add(alias) + await db_session.commit() + await db_session.refresh(alias) + return alias + + +@pytest.fixture +async def sample_sales(db_session: AsyncSession) -> list[SalesDaily]: + """Create a TEST- store/product, calendar rows, and a few sales days.""" + store = Store( + code=f"TEST-{_short_id()}", + name="Ops Test Store", + region="Test Region", + city="Test City", + store_type="supermarket", + ) + product = Product( + sku=f"TEST-{_short_id()}", + name="Ops Test Product", + category="Test Category", + brand="Test Brand", + base_price=10, + base_cost=5, + ) + db_session.add_all([store, product]) + await db_session.commit() + await db_session.refresh(store) + await db_session.refresh(product) + + for day in _SALES_DATES: + await db_session.merge( + Calendar( + date=day, + day_of_week=day.weekday(), + month=day.month, + quarter=(day.month - 1) // 3 + 1, + year=day.year, + is_holiday=False, + ) + ) + await db_session.commit() + + sales = [ + SalesDaily( + date=day, + store_id=store.id, + product_id=product.id, + quantity=5, + unit_price=10, + total_amount=50, + ) + for day in _SALES_DATES + ] + for row in sales: + db_session.add(row) + await db_session.commit() + for row in sales: + await db_session.refresh(row) + return sales diff --git a/app/features/ops/tests/test_routes_integration.py b/app/features/ops/tests/test_routes_integration.py new file mode 100644 index 00000000..ec048d8c --- /dev/null +++ b/app/features/ops/tests/test_routes_integration.py @@ -0,0 +1,132 @@ +"""Integration tests for the ops Control Center routes. + +Runs against a real PostgreSQL database — the full path from HTTP request +through SQL aggregation to response. Requires ``docker-compose up -d``. + +Assertions are structural (status-key coverage, sort order, bounds) rather than +exact global totals, so the tests stay idempotent against a shared dataset. +""" + +import pytest +from httpx import AsyncClient + +from app.features.data_platform.models import SalesDaily +from app.features.jobs.models import Job, JobStatus +from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus + +_JOB_STATUSES = {s.value for s in JobStatus} +_RUN_STATUSES = {s.value for s in RunStatus} + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestOpsSummary: + """Integration tests for GET /ops/summary.""" + + async def test_summary_happy_path( + self, + client: AsyncClient, + sample_jobs: list[Job], + sample_runs: dict[str, ModelRun], + sample_alias: DeploymentAlias, + sample_sales: list[SalesDaily], + ) -> None: + """A seeded database yields a fully populated summary.""" + response = await client.get("/ops/summary") + + assert response.status_code == 200 + data = response.json() + + assert data["system"]["api_ok"] is True + assert data["system"]["database_connected"] is True + + # Job and run histograms cover every status key (zero-filled). + assert {c["status"] for c in data["jobs"]["counts"]} == _JOB_STATUSES + assert {c["status"] for c in data["runs"]["counts"]} == _RUN_STATUSES + + # The seeded failed job surfaces in the attention list. It is the most + # recently created failed job, so the limit-10 window always includes it. + failed_job_id = next(j.job_id for j in sample_jobs if j.status == JobStatus.FAILED.value) + failed_job_ids = { + item["entity_id"] + for item in data["attention_items"] + if item["item_type"] == "failed_job" + } + assert failed_job_id in failed_job_ids + + # Freshness reflects the seeded sales. + assert data["freshness"]["latest_sales_date"] is not None + assert data["freshness"]["latest_sales_date"] >= "2026-03-03" + + # The alias seeded against the older successful run is reported stale. + stale_alias = next(a for a in data["aliases"] if a["alias_name"] == sample_alias.alias_name) + assert stale_alias["is_stale"] is True + assert stale_alias["stale_reason"] is not None + assert stale_alias["wape"] == 31.0 + + async def test_summary_resilient_structural(self, client: AsyncClient) -> None: + """Without any seeded fixtures the summary still returns 200, never 500.""" + response = await client.get("/ops/summary") + + assert response.status_code == 200 + data = response.json() + + # Every histogram bucket is non-negative and every status key present. + for section in ("jobs", "runs"): + for count in data[section]["counts"]: + assert count["count"] >= 0 + assert {c["status"] for c in data["jobs"]["counts"]} == _JOB_STATUSES + assert {c["status"] for c in data["runs"]["counts"]} == _RUN_STATUSES + + assert data["jobs"]["completed_today"] >= 0 + assert data["jobs"]["active_total"] >= 0 + assert data["jobs"]["failed_total"] >= 0 + assert data["runs"]["failed_total"] >= 0 + assert isinstance(data["attention_items"], list) + assert isinstance(data["aliases"], list) + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestRetrainingCandidates: + """Integration tests for GET /ops/retraining-candidates.""" + + async def test_candidates_sorted_and_limited( + self, + client: AsyncClient, + sample_runs: dict[str, ModelRun], + ) -> None: + """Candidates are sorted by priority_score desc and capped at limit.""" + response = await client.get("/ops/retraining-candidates", params={"limit": 5}) + + assert response.status_code == 200 + data = response.json() + + candidates = data["candidates"] + assert len(candidates) <= 5 + assert data["total_evaluated"] >= len(candidates) + + scores = [c["priority_score"] for c in candidates] + assert scores == sorted(scores, reverse=True), "candidates must be sorted desc" + + for candidate in candidates: + assert 0.0 <= candidate["priority_score"] <= 1.0 + assert candidate["staleness_days"] >= 0 + assert candidate["latest_run_status"] == RunStatus.SUCCESS.value + + async def test_candidates_default_limit(self, client: AsyncClient) -> None: + """The endpoint works with no explicit limit (default 20).""" + response = await client.get("/ops/retraining-candidates") + + assert response.status_code == 200 + assert len(response.json()["candidates"]) <= 20 + + async def test_candidates_limit_zero_rejected(self, client: AsyncClient) -> None: + """limit=0 is below the ge=1 bound and returns 422.""" + response = await client.get("/ops/retraining-candidates", params={"limit": 0}) + assert response.status_code == 422 + + async def test_candidates_limit_too_high_rejected(self, client: AsyncClient) -> None: + """limit=200 is above the le=100 bound and returns 422.""" + response = await client.get("/ops/retraining-candidates", params={"limit": 200}) + assert response.status_code == 422 diff --git a/app/features/ops/tests/test_schemas.py b/app/features/ops/tests/test_schemas.py new file mode 100644 index 00000000..0a26ae97 --- /dev/null +++ b/app/features/ops/tests/test_schemas.py @@ -0,0 +1,182 @@ +"""Unit tests for the ops slice's Pydantic response schemas. + +These run without a database (-m "not integration"). +""" + +from datetime import UTC, date, datetime + +import pytest +from pydantic import ValidationError + +from app.features.ops.schemas import ( + AliasHealth, + AttentionItem, + DataFreshness, + JobHealth, + OpsSummaryResponse, + RetrainingCandidate, + RetrainingCandidatesResponse, + RunHealth, + StatusCount, + SystemHealth, +) + +_NOW = datetime(2026, 5, 19, 12, 0, tzinfo=UTC) + + +def test_system_health_construct() -> None: + """SystemHealth carries liveness flags and an optional job timestamp.""" + system = SystemHealth(api_ok=True, database_connected=True, latest_successful_job_at=_NOW) + assert system.api_ok is True + assert system.latest_successful_job_at == _NOW + + +def test_system_health_allows_null_job_timestamp() -> None: + """latest_successful_job_at defaults to None when no job has completed.""" + system = SystemHealth(api_ok=True, database_connected=False) + assert system.latest_successful_job_at is None + + +def test_status_count_rejects_negative_count() -> None: + """A negative count violates the ge=0 constraint.""" + with pytest.raises(ValidationError): + StatusCount(status="failed", count=-1) + + +def test_job_health_construct_and_reject_negative() -> None: + """JobHealth aggregates counts; negative totals are rejected.""" + health = JobHealth( + counts=[StatusCount(status="completed", count=3)], + completed_today=2, + failed_total=1, + active_total=0, + ) + assert health.completed_today == 2 + with pytest.raises(ValidationError): + JobHealth(counts=[], completed_today=-1, failed_total=0, active_total=0) + + +def test_run_health_allows_null_success_rate() -> None: + """success_rate is None when there are no eligible runs.""" + health = RunHealth(counts=[], success_rate=None, failed_total=0) + assert health.success_rate is None + + +def test_alias_health_construct() -> None: + """AliasHealth carries the staleness verdict and an optional WAPE.""" + alias = AliasHealth( + alias_name="production", + run_id="abc123", + run_status="success", + model_type="naive", + store_id=1, + product_id=2, + is_stale=True, + stale_reason="a newer successful run exists for this store/product", + wape=18.4, + ) + assert alias.is_stale is True + assert alias.wape == 18.4 + + +def test_data_freshness_defaults_to_null() -> None: + """Every freshness field is optional and defaults to None.""" + freshness = DataFreshness() + assert freshness.latest_sales_date is None + assert freshness.latest_job_completed_at is None + assert freshness.latest_run_completed_at is None + + +def test_attention_item_rejects_unknown_type() -> None: + """item_type is constrained to the three known literals.""" + with pytest.raises(ValidationError): + AttentionItem( + item_type="something_else", # type: ignore[arg-type] + entity_id="x", + label="x", + detail="x", + ) + + +def test_attention_item_construct() -> None: + """A valid AttentionItem accepts the known literals.""" + item = AttentionItem( + item_type="failed_job", + entity_id="job-1", + label="train job failed", + detail="boom", + occurred_at=_NOW, + ) + assert item.item_type == "failed_job" + + +def test_ops_summary_response_construct() -> None: + """OpsSummaryResponse nests every section.""" + summary = OpsSummaryResponse( + system=SystemHealth(api_ok=True, database_connected=True), + jobs=JobHealth(counts=[], completed_today=0, failed_total=0, active_total=0), + runs=RunHealth(counts=[], success_rate=None, failed_total=0), + aliases=[], + freshness=DataFreshness(), + attention_items=[], + generated_at=_NOW, + ) + assert summary.generated_at == _NOW + assert summary.aliases == [] + + +def test_retraining_candidate_rejects_out_of_range_score() -> None: + """priority_score is bounded to [0.0, 1.0].""" + with pytest.raises(ValidationError): + RetrainingCandidate( + store_id=1, + product_id=2, + priority_score=1.5, + staleness_days=10, + wape=None, + latest_run_id="r1", + latest_run_status="success", + reason="x", + ) + + +def test_retraining_candidate_rejects_negative_staleness() -> None: + """staleness_days violates ge=0 when negative.""" + with pytest.raises(ValidationError): + RetrainingCandidate( + store_id=1, + product_id=2, + priority_score=0.5, + staleness_days=-1, + wape=None, + latest_run_id="r1", + latest_run_status="success", + reason="x", + ) + + +def test_retraining_candidates_response_construct() -> None: + """RetrainingCandidatesResponse wraps candidates with a total and timestamp.""" + candidate = RetrainingCandidate( + store_id=1, + product_id=2, + priority_score=0.75, + staleness_days=30, + wape=12.0, + latest_run_id="r1", + latest_run_status="success", + reason="30d since last training window; WAPE 12.0", + ) + response = RetrainingCandidatesResponse( + candidates=[candidate], + total_evaluated=1, + generated_at=_NOW, + ) + assert response.total_evaluated == 1 + assert response.candidates[0].priority_score == 0.75 + + +def test_data_freshness_accepts_date() -> None: + """latest_sales_date accepts a date value.""" + freshness = DataFreshness(latest_sales_date=date(2026, 5, 1)) + assert freshness.latest_sales_date == date(2026, 5, 1) diff --git a/app/features/ops/tests/test_service.py b/app/features/ops/tests/test_service.py new file mode 100644 index 00000000..0589efaa --- /dev/null +++ b/app/features/ops/tests/test_service.py @@ -0,0 +1,80 @@ +"""Unit tests for the ops slice's pure scoring helpers. + +These run without a database (-m "not integration"): the helpers are pure +functions with no I/O. +""" + +from app.features.ops.service import extract_wape, score_retraining_candidate + +# ============================================================================= +# score_retraining_candidate +# ============================================================================= + + +def test_score_zero_when_fresh_and_no_error() -> None: + """A brand-new run with no WAPE scores 0.0.""" + assert score_retraining_candidate(0, None) == 0.0 + + +def test_score_max_when_fully_stale_and_max_error() -> None: + """90+ days stale with WAPE 100 saturates both terms to 1.0.""" + assert score_retraining_candidate(90, 100.0) == 1.0 + + +def test_score_clamps_negative_staleness_and_high_wape() -> None: + """Negative staleness clamps to 0; WAPE above the cap clamps to 1.0.""" + # staleness -> 0.0, error -> 1.0; score = 0.6*0 + 0.4*1.0 = 0.4 + assert score_retraining_candidate(-5, 250.0) == 0.4 + + +def test_score_midpoint() -> None: + """Half-stale with half-max WAPE lands at the weighted midpoint.""" + # staleness 45/90 -> 0.5, error 50/100 -> 0.5; score = 0.6*0.5 + 0.4*0.5 = 0.5 + assert score_retraining_candidate(45, 50.0) == 0.5 + + +def test_score_staleness_only_when_wape_unknown() -> None: + """With WAPE unknown the score degrades to the staleness term alone.""" + # staleness 90 -> 1.0, error -> 0.0; score = 0.6 + assert score_retraining_candidate(90, None) == 0.6 + + +def test_score_is_bounded() -> None: + """The score never escapes [0.0, 1.0] for extreme inputs.""" + assert score_retraining_candidate(10_000, 10_000.0) == 1.0 + assert score_retraining_candidate(-10_000, -10_000.0) == 0.0 + + +# ============================================================================= +# extract_wape +# ============================================================================= + + +def test_extract_wape_prefers_wape_then_wape_mean() -> None: + """The 'wape' key wins; 'wape_mean' and 'WAPE' are fallbacks.""" + assert extract_wape({"wape": 12.0}) == 12.0 + assert extract_wape({"wape_mean": 8.5}) == 8.5 + assert extract_wape({"WAPE": 4.0}) == 4.0 + assert extract_wape({"wape": 1.0, "wape_mean": 99.0}) == 1.0 + + +def test_extract_wape_returns_none_for_missing_or_empty() -> None: + """None and an empty / unrelated dict yield None — never an exception.""" + assert extract_wape(None) is None + assert extract_wape({}) is None + assert extract_wape({"mae": 3.2}) is None + + +def test_extract_wape_rejects_non_numeric_and_bool() -> None: + """A non-numeric value yields None; bool is rejected (it is not a metric).""" + assert extract_wape({"wape": "bad"}) is None + assert extract_wape({"wape": None}) is None + assert extract_wape({"wape": True}) is None + assert extract_wape({"wape": False}) is None + + +def test_extract_wape_coerces_int_to_float() -> None: + """An integer WAPE is returned as a float.""" + result = extract_wape({"wape": 25}) + assert result == 25.0 + assert isinstance(result, float) diff --git a/app/main.py b/app/main.py index f473127d..9aaa4882 100644 --- a/app/main.py +++ b/app/main.py @@ -24,6 +24,7 @@ from app.features.forecasting.routes import router as forecasting_router from app.features.ingest.routes import router as ingest_router from app.features.jobs.routes import router as jobs_router +from app.features.ops.routes import router as ops_router from app.features.rag.routes import router as rag_router from app.features.registry.routes import router as registry_router from app.features.seeder.routes import router as seeder_router @@ -131,6 +132,7 @@ def create_app() -> FastAPI: app.include_router(health_router) app.include_router(dimensions_router) app.include_router(analytics_router) + app.include_router(ops_router) app.include_router(jobs_router) app.include_router(ingest_router) app.include_router(featuresets_router) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 82ad780b..be5b288f 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -10,6 +10,7 @@ import { ROUTES } from '@/lib/constants' // Lazy-loaded page components const DashboardPage = lazy(() => import('@/pages/dashboard')) const ShowcasePage = lazy(() => import('@/pages/showcase')) +const OpsPage = lazy(() => import('@/pages/ops')) const SalesExplorerPage = lazy(() => import('@/pages/explorer/sales')) const StoresExplorerPage = lazy(() => import('@/pages/explorer/stores')) const StoreDetailPage = lazy(() => import('@/pages/explorer/store-detail')) @@ -55,6 +56,14 @@ function App() { } /> + }> + + + } + /> api('/ops/summary'), + refetchInterval: 15000, + enabled, + }) +} + +/** + * Ranked retraining-candidate queue. Deliberately NOT polled — the queue only + * changes when a new run lands, so refetch-on-mount is sufficient. + */ +export function useRetrainingCandidates(limit = 20, enabled = true) { + return useQuery({ + queryKey: ['ops', 'retraining', limit], + queryFn: () => + api('/ops/retraining-candidates', { params: { limit } }), + enabled, + }) +} diff --git a/frontend/src/lib/constants.ts b/frontend/src/lib/constants.ts index d802c664..50306a81 100644 --- a/frontend/src/lib/constants.ts +++ b/frontend/src/lib/constants.ts @@ -2,6 +2,7 @@ export const ROUTES = { DASHBOARD: '/', SHOWCASE: '/showcase', + OPS: '/ops', EXPLORER: { SALES: '/explorer/sales', STORES: '/explorer/stores', @@ -33,6 +34,7 @@ export const ROUTES = { export const NAV_ITEMS = [ { label: 'Dashboard', href: ROUTES.DASHBOARD }, { label: 'Showcase', href: ROUTES.SHOWCASE }, + { label: 'Control Center', href: ROUTES.OPS }, { label: 'Explorer', items: [ diff --git a/frontend/src/lib/ops-utils.test.ts b/frontend/src/lib/ops-utils.test.ts new file mode 100644 index 00000000..5e156396 --- /dev/null +++ b/frontend/src/lib/ops-utils.test.ts @@ -0,0 +1,119 @@ +import { describe, it, expect } from 'vitest' +import { + attentionBadgeVariant, + attentionItemLink, + formatStaleness, + sortRetrainingCandidates, + summaryHealthVariant, +} from './ops-utils' +import type { AttentionItem, RetrainingCandidate, SystemHealth } from '@/types/api' + +/** Build an AttentionItem with sensible defaults for fields not under test. */ +function makeItem(partial: Partial & Pick): AttentionItem { + return { + item_type: partial.item_type, + entity_id: partial.entity_id ?? 'entity-1', + label: partial.label ?? 'label', + detail: partial.detail ?? 'detail', + occurred_at: partial.occurred_at ?? null, + } +} + +/** Build a RetrainingCandidate with sensible defaults. */ +function makeCandidate( + partial: Partial & Pick, +): RetrainingCandidate { + return { + store_id: partial.store_id ?? 1, + product_id: partial.product_id ?? 1, + priority_score: partial.priority_score, + staleness_days: partial.staleness_days ?? 0, + wape: partial.wape ?? null, + latest_run_id: partial.latest_run_id ?? 'run-1', + latest_run_status: partial.latest_run_status ?? 'success', + reason: partial.reason ?? 'reason', + } +} + +describe('summaryHealthVariant', () => { + it('is success when API and database are both up', () => { + const system: SystemHealth = { + api_ok: true, + database_connected: true, + latest_successful_job_at: null, + } + expect(summaryHealthVariant(system)).toBe('success') + }) + + it('is error when the database is down', () => { + const system: SystemHealth = { + api_ok: true, + database_connected: false, + latest_successful_job_at: null, + } + expect(summaryHealthVariant(system)).toBe('error') + }) +}) + +describe('attentionItemLink', () => { + it('links a failed job to the job detail page', () => { + expect(attentionItemLink(makeItem({ item_type: 'failed_job', entity_id: 'job-9' }))).toBe( + '/explorer/jobs/job-9', + ) + }) + + it('links a failed run to the run detail page', () => { + expect(attentionItemLink(makeItem({ item_type: 'failed_run', entity_id: 'run-9' }))).toBe( + '/explorer/runs/run-9', + ) + }) + + it('links a stale alias to the run detail page', () => { + expect(attentionItemLink(makeItem({ item_type: 'stale_alias', entity_id: 'run-3' }))).toBe( + '/explorer/runs/run-3', + ) + }) +}) + +describe('attentionBadgeVariant', () => { + it('warns for a stale alias', () => { + expect(attentionBadgeVariant('stale_alias')).toBe('warning') + }) + + it('errors for failed jobs and runs', () => { + expect(attentionBadgeVariant('failed_job')).toBe('error') + expect(attentionBadgeVariant('failed_run')).toBe('error') + }) +}) + +describe('formatStaleness', () => { + it('renders a positive day count', () => { + expect(formatStaleness(12)).toBe('12d') + }) + + it('renders "today" at zero or negative days', () => { + expect(formatStaleness(0)).toBe('today') + expect(formatStaleness(-3)).toBe('today') + }) +}) + +describe('sortRetrainingCandidates', () => { + it('sorts by priority score descending', () => { + const sorted = sortRetrainingCandidates([ + makeCandidate({ priority_score: 0.2 }), + makeCandidate({ priority_score: 0.9 }), + makeCandidate({ priority_score: 0.5 }), + ]) + expect(sorted.map((c) => c.priority_score)).toEqual([0.9, 0.5, 0.2]) + }) + + it('does not mutate the input array', () => { + const input = [makeCandidate({ priority_score: 0.1 }), makeCandidate({ priority_score: 0.8 })] + sortRetrainingCandidates(input) + expect(input.map((c) => c.priority_score)).toEqual([0.1, 0.8]) + }) + + it('returns an empty array unchanged', () => { + expect(sortRetrainingCandidates([])).toEqual([]) + }) +}) diff --git a/frontend/src/lib/ops-utils.ts b/frontend/src/lib/ops-utils.ts new file mode 100644 index 00000000..1d0d42eb --- /dev/null +++ b/frontend/src/lib/ops-utils.ts @@ -0,0 +1,51 @@ +// Pure, React-free helpers for the ForecastOps Control Center page. Kept +// separate from the page component so they are cheap to unit-test (see +// ops-utils.test.ts) — mirrors the knowledge-utils.ts / status-utils.ts precedent. +import { ROUTES } from '@/lib/constants' +import type { AttentionItem, RetrainingCandidate, SystemHealth } from '@/types/api' + +/** + * System-health badge variant: 'success' only when the API and database are + * both up, 'error' otherwise. + */ +export function summaryHealthVariant(system: SystemHealth): 'success' | 'error' { + return system.api_ok && system.database_connected ? 'success' : 'error' +} + +/** + * Deep-link an attention item to its Explorer detail page. A failed job links + * to the job detail page; a failed run and a stale alias both carry a run_id + * and link to the run detail page. + */ +export function attentionItemLink(item: AttentionItem): string { + if (item.item_type === 'failed_job') { + return `${ROUTES.EXPLORER.JOBS}/${item.entity_id}` + } + return `${ROUTES.EXPLORER.RUNS}/${item.entity_id}` +} + +/** + * Badge variant for an attention row — a stale alias is a 'warning', a failed + * job or run is an 'error'. + */ +export function attentionBadgeVariant( + itemType: AttentionItem['item_type'], +): 'error' | 'warning' { + return itemType === 'stale_alias' ? 'warning' : 'error' +} + +/** + * Human-readable staleness: "today" at zero or negative days, "{n}d" otherwise. + */ +export function formatStaleness(days: number): string { + return days <= 0 ? 'today' : `${days}d` +} + +/** + * Return a copy of the candidates sorted by priority score, most urgent first. + * The backend already sorts, but sorting again keeps the page correct if the + * order ever changes upstream. + */ +export function sortRetrainingCandidates(rows: RetrainingCandidate[]): RetrainingCandidate[] { + return [...rows].sort((a, b) => b.priority_score - a.priority_score) +} diff --git a/frontend/src/pages/ops.tsx b/frontend/src/pages/ops.tsx new file mode 100644 index 00000000..aa1b0180 --- /dev/null +++ b/frontend/src/pages/ops.tsx @@ -0,0 +1,327 @@ +import { useNavigate, Link } from 'react-router-dom' +import { Activity, AlertTriangle, CheckCircle2, Clock } from 'lucide-react' +import { useOpsSummary, useRetrainingCandidates } from '@/hooks/use-ops' +import { useProviderHealth } from '@/hooks/use-config' +import { + attentionBadgeVariant, + attentionItemLink, + formatStaleness, + sortRetrainingCandidates, + summaryHealthVariant, +} from '@/lib/ops-utils' +import { getStatusVariant } from '@/lib/status-utils' +import { KPICard } from '@/components/charts/kpi-card' +import { EmptyState, ErrorDisplay } from '@/components/common/error-display' +import { LoadingState } from '@/components/common/loading-state' +import { StatusBadge } from '@/components/common/status-badge' +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card' +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from '@/components/ui/table' +import { formatPercent } from '@/lib/api' +import { ROUTES } from '@/lib/constants' + +/** Format an ISO timestamp / date string for display; '—' when null. */ +function formatWhen(value: string | null): string { + if (!value) return '—' + const parsed = new Date(value) + return Number.isNaN(parsed.getTime()) ? value : parsed.toLocaleString() +} + +/** A labelled health row inside the System Health card. */ +function HealthRow({ label, ok, detail }: { label: string; ok: boolean; detail?: string }) { + return ( +
+ {label} + + {detail && {detail}} + {ok ? 'ok' : 'down'} + +
+ ) +} + +/** A labelled value pair for the Data Freshness card. */ +function FreshnessRow({ label, value }: { label: string; value: string }) { + return ( +
+ {label} + {value} +
+ ) +} + +export default function OpsPage() { + const navigate = useNavigate() + const summaryQuery = useOpsSummary() + const candidatesQuery = useRetrainingCandidates() + const providerQuery = useProviderHealth() + + if (summaryQuery.error) { + return ( +
+

Control Center

+ void summaryQuery.refetch()} /> +
+ ) + } + + if (summaryQuery.isLoading || !summaryQuery.data) { + return ( +
+

Control Center

+ +
+ ) + } + + const summary = summaryQuery.data + const providers = providerQuery.data ?? [] + const totalJobs = summary.jobs.counts.reduce((sum, c) => sum + c.count, 0) + const totalRuns = summary.runs.counts.reduce((sum, c) => sum + c.count, 0) + const staleAliases = summary.aliases.filter((a) => a.is_stale).length + const candidates = sortRetrainingCandidates(candidatesQuery.data?.candidates ?? []) + + return ( +
+
+

Control Center

+

+ One operational view across jobs, model runs, deployment aliases, and data + freshness — surfacing what needs attention before it affects decisions. +

+
+ + {totalJobs === 0 && totalRuns === 0 ? ( + } + action={{ label: 'Go to Showcase', onClick: () => navigate(ROUTES.SHOWCASE) }} + /> + ) : ( + <> +
+ {/* Section 1 — System Health */} + + +
+ System Health + + {summaryHealthVariant(summary.system) === 'success' ? 'healthy' : 'degraded'} + +
+ API, database, and embedding-provider reachability. +
+ +
+ + + {providers.map((p) => ( + + ))} +
+
+ Latest successful job: + + {formatWhen(summary.system.latest_successful_job_at)} + +
+
+
+ Jobs + {summary.jobs.counts.map((c) => ( + + {c.status} {c.count} + + ))} +
+
+ Runs + {summary.runs.counts.map((c) => ( + + {c.status} {c.count} + + ))} +
+
+
+
+ + {/* Section 3 — Data Freshness */} + + + Data Freshness + How current the data and model state are. + + +
+ + + +
+
+
+
+ + {/* Section 2 — KPI row */} +
+ + + + +
+ + {/* Section 4 — Needs Attention */} + + + Needs Attention + + Recent failed jobs, failed runs, and stale deployment aliases. Each row links + to its Explorer detail page. + + + + {summary.attention_items.length === 0 ? ( +

+ Nothing needs attention — no failed jobs, failed runs, or stale aliases. +

+ ) : ( + + + + Type + Item + Detail + When + + + + {summary.attention_items.map((item) => ( + + + + {item.item_type.replace('_', ' ')} + + + + + {item.label} + + + + {item.detail} + + + {formatWhen(item.occurred_at)} + + + ))} + +
+ )} +
+
+ + {/* Section 5 — Retraining Queue */} + + + Retraining Queue + + Store / product pairs ranked by a retraining-priority score that blends + staleness with forecast error (WAPE). + + + + {candidatesQuery.isLoading ? ( + + ) : candidates.length === 0 ? ( +

+ No retraining candidates — no successful model runs to evaluate yet. +

+ ) : ( + + + + Store + Product + Priority + Staleness + WAPE + Reason + + + + {candidates.map((c) => ( + + {c.store_id} + {c.product_id} + + {c.priority_score.toFixed(2)} + + + {formatStaleness(c.staleness_days)} + + + {c.wape === null ? '—' : c.wape.toFixed(1)} + + + {c.reason} + + + ))} + +
+ )} +
+
+ + )} +
+ ) +} diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index e37539ad..d1179291 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -578,3 +578,94 @@ export interface ProviderHealth { detail: string models: string[] } + +// ============================================================================= +// ForecastOps Control Center — GET /ops/summary, GET /ops/retraining-candidates +// ============================================================================= + +// Liveness snapshot for the Control Center header. +export interface SystemHealth { + api_ok: boolean + database_connected: boolean + latest_successful_job_at: string | null +} + +// One bucket of a status histogram. +export interface StatusCount { + status: string + count: number +} + +// Aggregated job-execution health. +export interface JobHealth { + counts: StatusCount[] + completed_today: number + failed_total: number + active_total: number +} + +// Aggregated model-run health. +export interface RunHealth { + counts: StatusCount[] + success_rate: number | null + failed_total: number +} + +// Deployment-alias health with a staleness verdict. +export interface AliasHealth { + alias_name: string + run_id: string + run_status: string + model_type: string + store_id: number + product_id: number + is_stale: boolean + stale_reason: string | null + wape: number | null +} + +// How current the underlying data and model state are. +export interface DataFreshness { + latest_sales_date: string | null + latest_job_completed_at: string | null + latest_run_completed_at: string | null +} + +// One entry in the "needs attention" list. +export interface AttentionItem { + item_type: 'failed_job' | 'failed_run' | 'stale_alias' + entity_id: string + label: string + detail: string + occurred_at: string | null +} + +// Aggregated operational summary — GET /ops/summary. +export interface OpsSummaryResponse { + system: SystemHealth + jobs: JobHealth + runs: RunHealth + aliases: AliasHealth[] + freshness: DataFreshness + attention_items: AttentionItem[] + generated_at: string +} + +// One (store, product) pair ranked for retraining. +export interface RetrainingCandidate { + store_id: number + product_id: number + priority_score: number + staleness_days: number + wape: number | null + latest_run_id: string | null + latest_run_status: string | null + reason: string +} + +// Ranked retraining-candidate queue — GET /ops/retraining-candidates. +export interface RetrainingCandidatesResponse { + candidates: RetrainingCandidate[] + total_evaluated: number + generated_at: string +}