Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,047 changes: 1,047 additions & 0 deletions PRPs/PRP-26-scenario-simulation-what-if-planning.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Portfolio-grade end-to-end retail demand forecasting system.
- **Dashboard**: React 19 + Vite + Tailwind CSS 4 + shadcn/ui for data exploration and model management
- **Explorer**: Click-through detail pages for stores, products, model runs, and jobs; run-vs-run comparison and SHA-256 artifact integrity verification; server-side sortable, CSV-exportable tables with column-visibility toggles and URL-shareable filter/sort/page state across every Explorer page; date-scoped KPIs, revenue bar/line charts, and cross-filtering on the Sales page
- **Demand Planner**: `/visualize/demand` — every completed forecast rolled into a multi-SKU table (tomorrow / next-week / next-month demand + inventory requirement), with a lead-time selector and a single-SKU drill-in; the Forecast and Backtest pages run jobs in-page, export CSV, toggle a prediction-interval band, and cross-link to runs/jobs
- **What-If Planner**: `/visualize/planner` — take an existing forecast, apply deterministic price / promotion / holiday / inventory / lifecycle assumptions, and see the baseline-vs-scenario demand and revenue impact (clearly labelled heuristic); save, reload, and delete named scenario plans
- **RAG Knowledge Base**: Postgres pgvector embeddings + evidence-grounded answers with citations
- **Agentic Layer**: PydanticAI agents for autonomous experimentation and evidence-grounded Q&A with human-in-the-loop approval
- **Data Seeder (The Forge)**: Reproducible synthetic data generator with realistic time-series patterns, scenario presets, and retail effects
Expand Down
1 change: 1 addition & 0 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from app.features.jobs import models as jobs_models # noqa: F401
from app.features.rag import models as rag_models # noqa: F401
from app.features.registry import models as registry_models # noqa: F401
from app.features.scenarios import models as scenarios_models # noqa: F401

# Alembic Config object
config = context.config
Expand Down
97 changes: 97 additions & 0 deletions alembic/versions/43e35957a248_create_scenario_plan_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""create scenario plan table

Revision ID: 43e35957a248
Revises: 378c112e4b32
Create Date: 2026-05-19 07:34:30.545495

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '43e35957a248'
down_revision: Union[str, None] = '378c112e4b32'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Apply migration — create the scenario_plan table."""
op.create_table(
'scenario_plan',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('scenario_id', sa.String(length=32), nullable=False),
sa.Column('name', sa.String(length=200), nullable=False),
sa.Column('store_id', sa.Integer(), nullable=False),
sa.Column('product_id', sa.Integer(), nullable=False),
sa.Column('run_id', sa.String(length=32), nullable=False),
sa.Column('horizon', sa.Integer(), nullable=False),
sa.Column('assumptions', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('comparison', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('method', sa.String(length=20), nullable=False),
sa.Column(
'created_at',
sa.DateTime(timezone=True),
server_default=sa.text('now()'),
nullable=False,
),
sa.Column(
'updated_at',
sa.DateTime(timezone=True),
server_default=sa.text('now()'),
nullable=False,
),
sa.CheckConstraint("method IN ('heuristic')", name='ck_scenario_plan_method'),
sa.PrimaryKeyConstraint('id'),
)
op.create_index(
op.f('ix_scenario_plan_scenario_id'), 'scenario_plan', ['scenario_id'], unique=True
)
op.create_index(
op.f('ix_scenario_plan_store_id'), 'scenario_plan', ['store_id'], unique=False
)
op.create_index(
op.f('ix_scenario_plan_product_id'), 'scenario_plan', ['product_id'], unique=False
)
op.create_index(
op.f('ix_scenario_plan_run_id'), 'scenario_plan', ['run_id'], unique=False
)
op.create_index(
'ix_scenario_plan_assumptions_gin',
'scenario_plan',
['assumptions'],
unique=False,
postgresql_using='gin',
)
op.create_index(
'ix_scenario_plan_comparison_gin',
'scenario_plan',
['comparison'],
unique=False,
postgresql_using='gin',
)
op.create_index(
'ix_scenario_plan_store_product',
'scenario_plan',
['store_id', 'product_id'],
unique=False,
)


def downgrade() -> None:
"""Revert migration — drop the scenario_plan table."""
op.drop_index('ix_scenario_plan_store_product', table_name='scenario_plan')
op.drop_index(
'ix_scenario_plan_comparison_gin', table_name='scenario_plan', postgresql_using='gin'
)
op.drop_index(
'ix_scenario_plan_assumptions_gin', table_name='scenario_plan', postgresql_using='gin'
)
op.drop_index(op.f('ix_scenario_plan_run_id'), table_name='scenario_plan')
op.drop_index(op.f('ix_scenario_plan_product_id'), table_name='scenario_plan')
op.drop_index(op.f('ix_scenario_plan_store_id'), table_name='scenario_plan')
op.drop_index(op.f('ix_scenario_plan_scenario_id'), table_name='scenario_plan')
op.drop_table('scenario_plan')
31 changes: 31 additions & 0 deletions app/features/scenarios/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Scenario Simulation / What-If Planning slice.

A vertical slice that turns a baseline forecast into a *plan*: it loads an
already-trained baseline model, runs its forecast, applies deterministic,
transparent uplift / drag factors for future assumptions (price change,
promotion, holiday, inventory, lifecycle), and returns a baseline-vs-scenario
comparison. Comparisons can be persisted as named ``scenario_plan`` rows.

DECISIONS LOCKED (PRP-26): the baseline forecasters ignore exogenous
regressors, so a "what-if" is applied as a deterministic post-forecast
multiplier — never a leakage-prone re-training. Every result is explicitly
labelled ``method = "heuristic"`` with a fixed disclaimer.
"""

from app.features.scenarios.models import ScenarioPlan
from app.features.scenarios.routes import router
from app.features.scenarios.schemas import (
ScenarioComparison,
ScenarioListResponse,
ScenarioPlanResponse,
)
from app.features.scenarios.service import ScenarioService

__all__ = [
"ScenarioComparison",
"ScenarioListResponse",
"ScenarioPlan",
"ScenarioPlanResponse",
"ScenarioService",
"router",
]
169 changes: 169 additions & 0 deletions app/features/scenarios/adjustments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Pure deterministic adjustment engine for scenario simulation.

Every function here is a pure factor computation — no DB, no I/O, no mutation
of its inputs, and it NEVER raises on junk input (a negative price change, an
unknown promotion kind, a ``None`` lifecycle stage all return a sane factor).

DECISIONS LOCKED (PRP-26 #1): the baseline forecasters ignore exogenous
regressors, so a what-if cannot be answered by re-prediction. The MVP applies
these factors as a post-forecast multiplier on a baseline forecast. Each factor
is a documented, tunable constant so a reviewer can see and adjust the
heuristic; the tests assert direction and bounds, not exact magnitudes.
"""

from __future__ import annotations

from datetime import date
from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
from app.features.scenarios.schemas import ScenarioAssumptions

# Constant-elasticity price response: factor = (1 + change_pct) ** PRICE_ELASTICITY.
# A negative elasticity means a price cut (change_pct < 0) lifts demand.
PRICE_ELASTICITY: float = -1.2

# Multiplicative demand uplift per promotion kind (1.0 == no effect).
PROMOTION_UPLIFT_BY_KIND: dict[str, float] = {
"pct_off": 1.25,
"bogo": 1.40,
"bundle": 1.15,
"markdown": 1.30,
}

# Demand uplift applied on a holiday / event day.
HOLIDAY_UPLIFT: float = 1.30

# Demand multiplier per forced product lifecycle stage.
LIFECYCLE_FACTOR: dict[str, float] = {
"launch": 1.2,
"growth": 1.1,
"maturity": 1.0,
"decline": 0.85,
}

# Clamp band — keeps a combined factor away from a zero / explosive forecast.
FACTOR_BAND: tuple[float, float] = (0.1, 5.0)

# Relative band around on-hand stock within which coverage is "at_risk".
COVERAGE_AT_RISK_BAND: float = 0.10

CoverageVerdict = Literal["covered", "at_risk", "stockout", "unknown"]


def clamp(value: float, lo: float, hi: float) -> float:
"""Clamp ``value`` into the inclusive ``[lo, hi]`` range."""
return max(lo, min(hi, value))


def price_factor(price_change_pct: float) -> float:
"""Return the demand multiplier for a relative price change.

Constant-elasticity response: ``(1 + change) ** PRICE_ELASTICITY``. A price
cut (negative change) yields a factor > 1; a price rise yields < 1.
Tolerates junk — a change of -100% or worse (a non-positive price) clamps to
the upper band rather than raising or returning a complex / NaN value.
"""
base = 1.0 + price_change_pct
if base <= 0.0:
return FACTOR_BAND[1]
return clamp(base**PRICE_ELASTICITY, *FACTOR_BAND)


def promotion_factor(kind: str, active: bool) -> float:
"""Return the demand multiplier for a promotion of ``kind``.

Returns ``1.0`` when the promotion is not active or the kind is unknown.
"""
if not active:
return 1.0
return PROMOTION_UPLIFT_BY_KIND.get(kind, 1.0)


def holiday_factor(is_holiday: bool) -> float:
"""Return the demand multiplier for a holiday / event day."""
return HOLIDAY_UPLIFT if is_holiday else 1.0


def lifecycle_factor(stage: str | None) -> float:
"""Return the demand multiplier for a product lifecycle stage.

Returns ``1.0`` for ``None`` or an unknown stage.
"""
if stage is None:
return 1.0
return LIFECYCLE_FACTOR.get(stage, 1.0)


def _in_window(point_date: date, start: date, end: date) -> bool:
"""True when ``point_date`` is inside the inclusive ``[start, end]`` window.

A reversed window (``start`` after ``end``) is normalised rather than
treated as empty — junk input must not raise.
"""
lo, hi = (start, end) if start <= end else (end, start)
return lo <= point_date <= hi


def combined_daily_factor(point_date: date, assumptions: ScenarioAssumptions) -> float:
"""Multiply every applicable per-day factor for ``point_date``, then clamp.

Time-safety: every window test is keyed on ``point_date`` — always a horizon
(future) date — so an assumption window that falls entirely before the
forecast start contributes factor ``1.0`` and can never reach back into the
historical series. An empty ``ScenarioAssumptions`` yields exactly ``1.0``.
"""
factor = 1.0

price = assumptions.price
if price is not None and _in_window(point_date, price.start_date, price.end_date):
factor *= price_factor(price.change_pct)

promotion = assumptions.promotion
if promotion is not None and _in_window(point_date, promotion.start_date, promotion.end_date):
factor *= promotion_factor(promotion.kind, active=True)

holiday = assumptions.holiday
if holiday is not None and point_date in holiday.dates:
factor *= holiday_factor(True)

lifecycle = assumptions.lifecycle
if lifecycle is not None:
factor *= lifecycle_factor(lifecycle.stage)

return clamp(factor, *FACTOR_BAND)


def apply_adjustment(baseline: list[float], factors: list[float]) -> list[float]:
"""Element-wise multiply ``baseline`` by ``factors``, flooring each at 0.0.

Returns a NEW list — the input ``baseline`` is never mutated (the leakage
spec depends on this). Raises ``ValueError`` on a length mismatch: that is a
caller-contract violation, not junk data.
"""
if len(baseline) != len(factors):
raise ValueError(
f"baseline and factors must be equal length: {len(baseline)} != {len(factors)}"
)
return [max(0.0, value * factor) for value, factor in zip(baseline, factors, strict=True)]


def coverage_verdict(scenario_total_units: float, on_hand_units: int | None) -> CoverageVerdict:
"""Classify whether projected demand is covered by on-hand stock.

Returns ``unknown`` when no inventory assumption was supplied. Otherwise:
``covered`` when demand sits comfortably below stock, ``at_risk`` when it is
within ``COVERAGE_AT_RISK_BAND`` of stock, ``stockout`` when it exceeds that
band. Never raises.
"""
if on_hand_units is None:
return "unknown"
if on_hand_units <= 0:
return "stockout" if scenario_total_units > 0.0 else "at_risk"
upper = on_hand_units * (1.0 + COVERAGE_AT_RISK_BAND)
lower = on_hand_units * (1.0 - COVERAGE_AT_RISK_BAND)
if scenario_total_units > upper:
return "stockout"
if scenario_total_units >= lower:
return "at_risk"
return "covered"
70 changes: 70 additions & 0 deletions app/features/scenarios/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Scenario plan ORM model.

A ``scenario_plan`` row persists a saved what-if analysis: the raw
``ScenarioAssumptions`` *and* the full ``ScenarioComparison`` snapshot, both as
JSONB. Storing the snapshot (PRP-26 decision #3) means a reloaded plan
re-renders without recomputation — and without the original model artifact
still having to exist on disk.

GOTCHA: SQLAlchemy reserves the declarative attribute name ``metadata``; the
JSONB columns are therefore named ``assumptions`` and ``comparison``.
"""

from __future__ import annotations

from typing import Any

from sqlalchemy import CheckConstraint, Index, Integer, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column

from app.core.database import Base
from app.shared.models import TimestampMixin

# The only adjustment method the MVP produces — guarded by a CHECK constraint.
SCENARIO_METHOD_HEURISTIC = "heuristic"


class ScenarioPlan(TimestampMixin, Base):
"""A saved scenario plan.

Attributes:
id: Surrogate primary key.
scenario_id: Unique external identifier (UUID hex, 32 chars).
name: Human-readable plan name.
store_id: Store the baseline model targets.
product_id: Product the baseline model targets.
run_id: Artifact key of the baseline model (model_{run_id}.joblib).
horizon: Number of days simulated.
assumptions: Raw ScenarioAssumptions as JSONB.
comparison: Full ScenarioComparison snapshot as JSONB.
method: Adjustment method — always 'heuristic' (CHECK-constrained).
"""

__tablename__ = "scenario_plan"

id: Mapped[int] = mapped_column(Integer, primary_key=True)
scenario_id: Mapped[str] = mapped_column(String(32), unique=True, index=True)
name: Mapped[str] = mapped_column(String(200), nullable=False)
store_id: Mapped[int] = mapped_column(Integer, index=True, nullable=False)
product_id: Mapped[int] = mapped_column(Integer, index=True, nullable=False)
run_id: Mapped[str] = mapped_column(String(32), index=True, nullable=False)
horizon: Mapped[int] = mapped_column(Integer, nullable=False)

# JSONB blobs — never named ``metadata`` (SQLAlchemy reserves it).
assumptions: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
comparison: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)

method: Mapped[str] = mapped_column(
String(20), nullable=False, default=SCENARIO_METHOD_HEURISTIC
)

__table_args__ = (
# GIN indexes for JSONB containment queries on either blob.
Index("ix_scenario_plan_assumptions_gin", "assumptions", postgresql_using="gin"),
Index("ix_scenario_plan_comparison_gin", "comparison", postgresql_using="gin"),
# Composite index for the common "plans for this store/product" query.
Index("ix_scenario_plan_store_product", "store_id", "product_id"),
# The MVP only ever produces heuristic comparisons.
CheckConstraint("method IN ('heuristic')", name="ck_scenario_plan_method"),
)
Loading