Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
827 changes: 827 additions & 0 deletions PRPs/PRP-25-forecastops-control-center-full.md

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion app/features/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
"""

from app.features.ops.routes import router
from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse
from app.features.ops.schemas import (
ModelHealthResponse,
OpsSummaryResponse,
RetrainingCandidatesResponse,
)
from app.features.ops.service import OpsService

__all__ = [
"ModelHealthResponse",
"OpsService",
"OpsSummaryResponse",
"RetrainingCandidatesResponse",
Expand Down
51 changes: 48 additions & 3 deletions app/features/ops/routes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""API routes for the ForecastOps Control Center.

Two read-only aggregation endpoints backing the ``/ops`` Control Center page:
operational summary and the ranked retraining-candidate queue.
Three read-only aggregation endpoints backing the ``/ops`` Control Center page:
operational summary, the ranked retraining-candidate queue, and per-grain
forecast-error health with a drift verdict.
"""

from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db
from app.features.ops.schemas import OpsSummaryResponse, RetrainingCandidatesResponse
from app.features.ops.schemas import (
ModelHealthResponse,
OpsSummaryResponse,
RetrainingCandidatesResponse,
)
from app.features.ops.service import OpsService

router = APIRouter(prefix="/ops", tags=["ops"])
Expand Down Expand Up @@ -81,3 +86,43 @@ async def get_retraining_candidates(
Candidates sorted by priority score (highest first).
"""
return await OpsService().get_retraining_candidates(db, limit)


@router.get(
"/model-health",
response_model=ModelHealthResponse,
summary="Per-(store, product) forecast-error health and drift",
description="""
Classify forecast-error **performance drift** for every `(store, product)` grain.

For each grain the endpoint reads the **full** successful-run history, extracts
each run's WAPE, and compares the latest WAPE against the mean of the prior
WAPEs within a ±10% relative band — yielding a drift verdict
(`improving` / `stable` / `degrading` / `unknown`).

Entries are sorted **degrading-first**, then by the magnitude of the WAPE
change, and capped at `limit`. Returns HTTP 200 even on an empty database.

This is a performance-drift signal, not data drift — it needs no feature
snapshots and adds no new table or migration.
""",
)
async def get_model_health(
limit: int = Query(
default=20,
ge=1,
le=100,
description="Maximum number of grains to return (1-100, default 20).",
),
db: AsyncSession = Depends(get_db),
) -> ModelHealthResponse:
"""Return per-grain forecast-error health and drift.

Args:
limit: Maximum number of grains to return.
db: Database session.

Returns:
Grains sorted degrading-first, then by absolute WAPE change.
"""
return await OpsService().get_model_health(db, limit)
90 changes: 90 additions & 0 deletions app/features/ops/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,93 @@ class RetrainingCandidatesResponse(BaseModel):
description="Total (store, product) grains evaluated before applying the limit.",
)
generated_at: datetime = Field(..., description="When this queue was computed (UTC).")


# =============================================================================
# Model health & drift
# =============================================================================


# Forecast-error trend verdict for a (store, product) grain.
DriftDirection = Literal["improving", "stable", "degrading", "unknown"]


class WapePoint(BaseModel):
"""One run's WAPE observation in a grain's chronological history."""

model_config = ConfigDict(from_attributes=True)

run_id: str = Field(..., description="External run_id of the observed run.")
created_at: datetime = Field(..., description="When the run was created (UTC).")
wape: float | None = Field(
None,
description="WAPE of the run, when present in its metrics; null otherwise.",
)


class ModelHealthEntry(BaseModel):
"""Forecast-error health and drift verdict for one (store, product) grain."""

model_config = ConfigDict(from_attributes=True)

store_id: int = Field(..., description="Store the grain covers.")
product_id: int = Field(..., description="Product the grain covers.")
run_count: int = Field(
...,
ge=0,
description="Number of successful runs evaluated for this grain.",
)
latest_run_id: str | None = Field(
None,
description="External run_id of the most recent successful run.",
)
latest_run_status: str | None = Field(
None,
description="Status of that run (always 'success' for evaluated grains).",
)
latest_wape: float | None = Field(
None,
description="Most recent numeric WAPE in the grain's history; null when none.",
)
previous_wape: float | None = Field(
None,
description="The prior numeric WAPE, used as the drift baseline; null when none.",
)
wape_delta: float | None = Field(
None,
description="latest_wape minus previous_wape; null when fewer than two numeric WAPEs.",
)
drift_direction: DriftDirection = Field(
...,
description="Forecast-error trend: improving / stable / degrading / unknown.",
)
last_trained_at: datetime | None = Field(
None,
description="created_at of the latest successful run; null when none.",
)
staleness_days: int = Field(
...,
ge=0,
description="Days since the latest run's training-data window ended.",
)
wape_history: list[WapePoint] = Field(
...,
description="Chronological WAPE observations; may carry null gaps.",
)


class ModelHealthResponse(BaseModel):
"""Per-grain forecast-error health, degrading grains first."""

model_config = ConfigDict(from_attributes=True)

entries: list[ModelHealthEntry] = Field(
...,
description="Grains sorted degrading-first, then by |wape_delta| descending.",
)
total_evaluated: int = Field(
...,
ge=0,
description="Total grains evaluated before applying the limit.",
)
generated_at: datetime = Field(..., description="When this report was computed (UTC).")
124 changes: 124 additions & 0 deletions app/features/ops/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""

from datetime import UTC, datetime
from itertools import groupby
from typing import Any

from sqlalchemy import func, select, text
Expand All @@ -20,13 +21,17 @@
AliasHealth,
AttentionItem,
DataFreshness,
DriftDirection,
JobHealth,
ModelHealthEntry,
ModelHealthResponse,
OpsSummaryResponse,
RetrainingCandidate,
RetrainingCandidatesResponse,
RunHealth,
StatusCount,
SystemHealth,
WapePoint,
)
from app.features.registry.models import DeploymentAlias, ModelRun, RunStatus

Expand All @@ -38,6 +43,8 @@
_WAPE_CAP = 100.0
# How many recent failed jobs / runs to surface in the attention list.
_ATTENTION_LIMIT = 10
# Relative WAPE-change band: forecast-error drift is only flagged outside ±10%.
_DRIFT_BAND = 0.10


# =============================================================================
Expand Down Expand Up @@ -91,6 +98,42 @@ def score_retraining_candidate(staleness_days: int, wape: float | None) -> float
return round(0.6 * staleness_norm + 0.4 * error_norm, 4)


def classify_drift(
wape_history: list[float | None],
) -> tuple[DriftDirection, float | None]:
"""Classify a grain's forecast-error (WAPE) trend.

Pure and total: never raises, tolerates None gaps and sparse history.
Compares the latest numeric WAPE against the mean of all prior numeric
WAPEs, applying a ±10% relative band — the heuristic drift tolerance from
MLOps monitoring guidance (a universal threshold does not exist).

Args:
wape_history: Chronological WAPE values; None marks a run with no WAPE.

Returns:
A ``(direction, delta)`` tuple. ``direction`` is improving / stable /
degrading / unknown; ``delta`` is the latest numeric WAPE minus the
previous numeric WAPE, or None when fewer than two numeric values exist.
"""
numeric = [wape for wape in wape_history if wape is not None]
if len(numeric) < 2:
return "unknown", None
latest = numeric[-1]
prior = numeric[:-1]
baseline = sum(prior) / len(prior)
delta = round(latest - prior[-1], 4)
if baseline <= 0:
# Avoid div-by-zero on a zero baseline: any positive error is degrading.
return ("degrading" if latest > 0 else "stable"), delta
relative = (latest - baseline) / baseline
if relative > _DRIFT_BAND:
return "degrading", delta
if relative < -_DRIFT_BAND:
return "improving", delta
return "stable", delta


def _alias_staleness(
run: ModelRun,
latest_success_by_grain: dict[tuple[int, int], ModelRun],
Expand Down Expand Up @@ -417,3 +460,84 @@ async def get_retraining_candidates(
total_evaluated=len(candidates),
generated_at=datetime.now(UTC),
)

async def get_model_health(self, db: AsyncSession, limit: int) -> ModelHealthResponse:
"""Classify per-grain forecast-error drift from full run history.

Unlike the retraining queue, this needs the *full* WAPE history per
grain (not just the latest run), so it queries every successful run
ordered by grain then creation time and groups in Python with
``itertools.groupby`` — NOT ``DISTINCT ON``.

Args:
db: Database session.
limit: Maximum grains to return (bounded 1..100 by the route).

Returns:
Grains sorted degrading-first, then by ``|wape_delta|`` descending,
capped at ``limit``. Never raises on an empty database.
"""
today = datetime.now(UTC).date()

# FULL history — NOT DISTINCT ON. Ordered by (store, product, created_at)
# so itertools.groupby batches each grain in chronological order.
success_runs = (
(
await db.execute(
select(ModelRun)
.where(ModelRun.status == RunStatus.SUCCESS.value)
.order_by(
ModelRun.store_id,
ModelRun.product_id,
ModelRun.created_at,
)
)
)
.scalars()
.all()
)

entries: list[ModelHealthEntry] = []
for (store_id, product_id), grain_iter in groupby(
success_runs, key=lambda run: (run.store_id, run.product_id)
):
grain_runs = list(grain_iter) # already chronological
history = [
WapePoint(
run_id=run.run_id,
created_at=run.created_at,
wape=extract_wape(run.metrics),
)
for run in grain_runs
]
direction, delta = classify_drift([point.wape for point in history])
numeric = [point.wape for point in history if point.wape is not None]
latest_run = grain_runs[-1]
entries.append(
ModelHealthEntry(
store_id=store_id,
product_id=product_id,
run_count=len(grain_runs),
latest_run_id=latest_run.run_id,
latest_run_status=latest_run.status,
latest_wape=(numeric[-1] if numeric else None),
previous_wape=(numeric[-2] if len(numeric) > 1 else None),
wape_delta=delta,
drift_direction=direction,
last_trained_at=latest_run.created_at,
staleness_days=max((today - latest_run.data_window_end).days, 0),
wape_history=history,
)
)

# Degrading grains first; within a tier, the largest WAPE move leads.
rank: dict[str, int] = {"degrading": 0, "improving": 1, "stable": 2, "unknown": 3}
entries.sort(key=lambda entry: (rank[entry.drift_direction], -abs(entry.wape_delta or 0.0)))

logger.info("ops.model_health_computed", grains=len(entries))

return ModelHealthResponse(
entries=entries[:limit],
total_evaluated=len(entries),
generated_at=datetime.now(UTC),
)
32 changes: 32 additions & 0 deletions app/features/ops/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,35 @@ async def sample_sales(db_session: AsyncSession) -> list[SalesDaily]:
for row in sales:
await db_session.refresh(row)
return sales


@pytest.fixture
async def sample_health_runs(db_session: AsyncSession) -> list[ModelRun]:
"""Three successful runs for one grain forming a degrading WAPE history.

Grain (9101, 8101): WAPE 10.0 -> 11.0 -> 25.0. Each run is committed in its
own transaction so its server-side ``created_at`` is strictly later than
the prior one — making the chronological history deterministic for the
model-health endpoint's ``itertools.groupby`` ordering.
"""
wapes = [10.0, 11.0, 25.0]
window_ends = [date(2026, 1, 1), date(2026, 2, 1), date(2026, 3, 1)]
runs: list[ModelRun] = []
for wape, window_end in zip(wapes, window_ends, strict=True):
run = ModelRun(
run_id=f"test-{_short_id()}",
status=RunStatus.SUCCESS.value,
model_type="naive",
model_config={"_test": True},
config_hash=_short_id()[:16],
data_window_start=date(2025, 1, 1),
data_window_end=window_end,
store_id=9101,
product_id=8101,
metrics={"wape": wape},
)
db_session.add(run)
await db_session.commit()
await db_session.refresh(run)
runs.append(run)
return runs
Loading