Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ AGENT_SESSION_TTL_MINUTES=120
AGENT_MAX_SESSIONS_PER_USER=5

# Human-in-the-loop actions (JSON array format required for safe parsing)
AGENT_REQUIRE_APPROVAL=["create_alias","archive_run"]
AGENT_REQUIRE_APPROVAL=["create_alias","archive_run","save_scenario"]
AGENT_APPROVAL_TIMEOUT_MINUTES=60

# Streaming
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Portfolio-grade end-to-end retail demand forecasting system.
- **Dashboard**: React 19 + Vite + Tailwind CSS 4 + shadcn/ui for data exploration and model management
- **Explorer**: Click-through detail pages for stores, products, model runs, and jobs; run-vs-run comparison and SHA-256 artifact integrity verification; server-side sortable, CSV-exportable tables with column-visibility toggles and URL-shareable filter/sort/page state across every Explorer page; date-scoped KPIs, revenue bar/line charts, and cross-filtering on the Sales page
- **Demand Planner**: `/visualize/demand` — every completed forecast rolled into a multi-SKU table (tomorrow / next-week / next-month demand + inventory requirement), with a lead-time selector and a single-SKU drill-in; the Forecast and Backtest pages run jobs in-page, export CSV, toggle a prediction-interval band, and cross-link to runs/jobs
- **What-If Planner**: `/visualize/planner` — take an existing forecast, apply deterministic price / promotion / holiday / inventory / lifecycle assumptions, and see the baseline-vs-scenario demand and revenue impact (clearly labelled heuristic); save, reload, and delete named scenario plans
- **What-If Planner**: `/visualize/planner` — take an existing forecast, apply price / promotion / holiday / inventory / lifecycle assumptions, and see the baseline-vs-scenario demand and revenue impact; a regression baseline genuinely re-forecasts through the assumptions (`method="model_exogenous"`), any other baseline applies a clearly-labelled deterministic heuristic; save, tag, reload, clone, and delete named scenario plans, and rank 2-5 saved plans side by side in a multi-scenario comparison. The experiment chat agent can also propose a scenario and — behind the human-in-the-loop approval gate — save it for you
- **RAG Knowledge Base**: Postgres pgvector embeddings + evidence-grounded answers with citations
- **Agentic Layer**: PydanticAI agents for autonomous experimentation and evidence-grounded Q&A with human-in-the-loop approval
- **Data Seeder (The Forge)**: Reproducible synthetic data generator with realistic time-series patterns, scenario presets, and retail effects
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""add scenario provenance columns

Revision ID: 7e8f9748581e
Revises: bb8c4587ef1d
Create Date: 2026-05-19 10:47:09.829097

PRP-27 Phase D — adds provenance + approval-audit columns to ``scenario_plan``
so an agent-proposed plan records who/what created it and the human approval
decision that released it. ``source`` server-defaults to ``'user'`` so every
pre-existing row stays valid. Forward-only.
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision: str = '7e8f9748581e'
down_revision: Union[str, None] = 'bb8c4587ef1d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add the source + approval-audit columns, their CHECKs and an index."""
op.add_column(
'scenario_plan',
sa.Column(
'source',
sa.String(length=16),
nullable=False,
server_default=sa.text("'user'"),
),
)
op.add_column(
'scenario_plan',
sa.Column('agent_session_id', sa.String(length=32), nullable=True),
)
op.add_column(
'scenario_plan',
sa.Column('approved_by', sa.String(length=120), nullable=True),
)
op.add_column(
'scenario_plan',
sa.Column('approved_at', sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
'scenario_plan',
sa.Column('approval_decision', sa.String(length=16), nullable=True),
)
op.create_check_constraint(
'ck_scenario_plan_source',
'scenario_plan',
"source IN ('user', 'agent')",
)
op.create_check_constraint(
'ck_scenario_plan_approval_decision',
'scenario_plan',
"approval_decision IS NULL OR approval_decision IN ('approved', 'rejected')",
)
op.create_index('ix_scenario_plan_source', 'scenario_plan', ['source'], unique=False)


def downgrade() -> None:
"""Drop the index, the two CHECKs and the provenance columns."""
op.drop_index('ix_scenario_plan_source', table_name='scenario_plan')
op.drop_constraint('ck_scenario_plan_approval_decision', 'scenario_plan', type_='check')
op.drop_constraint('ck_scenario_plan_source', 'scenario_plan', type_='check')
op.drop_column('scenario_plan', 'approval_decision')
op.drop_column('scenario_plan', 'approved_at')
op.drop_column('scenario_plan', 'approved_by')
op.drop_column('scenario_plan', 'agent_session_id')
op.drop_column('scenario_plan', 'source')
52 changes: 52 additions & 0 deletions alembic/versions/bb8c4587ef1d_add_scenario_library_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""add scenario library columns

Revision ID: bb8c4587ef1d
Revises: e47f5739d7d0
Create Date: 2026-05-19 10:26:58.473203

PRP-27 Phase C — adds the scenario-library columns to ``scenario_plan``:
``tags`` (a JSONB string array, queryable via a GIN index) and ``cloned_from``
(the ``scenario_id`` a plan was cloned from, nullable). Forward-only.
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = 'bb8c4587ef1d'
down_revision: Union[str, None] = 'e47f5739d7d0'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add the tags and cloned_from columns plus a GIN index on tags."""
op.add_column(
'scenario_plan',
sa.Column(
'tags',
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'[]'::jsonb"),
),
)
op.add_column(
'scenario_plan',
sa.Column('cloned_from', sa.String(length=32), nullable=True),
)
op.create_index(
'ix_scenario_plan_tags_gin',
'scenario_plan',
['tags'],
unique=False,
postgresql_using='gin',
)


def downgrade() -> None:
"""Drop the GIN index and the scenario-library columns."""
op.drop_index('ix_scenario_plan_tags_gin', table_name='scenario_plan', postgresql_using='gin')
op.drop_column('scenario_plan', 'cloned_from')
op.drop_column('scenario_plan', 'tags')
43 changes: 43 additions & 0 deletions alembic/versions/e47f5739d7d0_widen_scenario_method_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""widen scenario method check

Revision ID: e47f5739d7d0
Revises: 43e35957a248
Create Date: 2026-05-19 10:06:15.179816

PRP-27 Phase B — widens the ``scenario_plan.method`` CHECK constraint so a
model-driven simulation can persist ``method='model_exogenous'`` alongside the
MVP's ``'heuristic'``. Forward-only: never edits the merged migration that
created the table.
"""
from typing import Sequence, Union

from alembic import op

# revision identifiers, used by Alembic.
revision: str = 'e47f5739d7d0'
down_revision: Union[str, None] = '43e35957a248'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

_CONSTRAINT = "ck_scenario_plan_method"
_TABLE = "scenario_plan"


def upgrade() -> None:
"""Allow method IN ('heuristic', 'model_exogenous')."""
op.drop_constraint(_CONSTRAINT, _TABLE, type_="check")
op.create_check_constraint(
_CONSTRAINT,
_TABLE,
"method IN ('heuristic', 'model_exogenous')",
)


def downgrade() -> None:
"""Revert to method IN ('heuristic') only."""
op.drop_constraint(_CONSTRAINT, _TABLE, type_="check")
op.create_check_constraint(
_CONSTRAINT,
_TABLE,
"method IN ('heuristic')",
)
5 changes: 4 additions & 1 deletion app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ class Settings(BaseSettings):
agent_retry_delay_seconds: float = 1.0

# Human-in-the-Loop Configuration
agent_require_approval: list[str] = ["create_alias", "archive_run"]
# ``save_scenario`` (PRP-27 Phase D) lets the experiment agent persist a
# scenario_plan row — a deliberate mutation-surface widening, so it is
# gated here exactly like create_alias / archive_run.
agent_require_approval: list[str] = ["create_alias", "archive_run", "save_scenario"]
agent_approval_timeout_minutes: int = 60

# Session Configuration
Expand Down
2 changes: 2 additions & 0 deletions app/features/agents/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ def requires_approval(action_name: str) -> bool:
- Use tool_compare_runs to analyze differences between registered runs
- Use tool_create_alias to deploy successful models (requires approval)
- Use tool_archive_run to clean up old experiments (requires approval)
- Use tool_propose_scenario to draft a candidate what-if scenario (read-only)
- Use tool_save_scenario to persist an approved scenario plan (requires approval)
"""

SAFETY_INSTRUCTIONS = """
Expand Down
107 changes: 107 additions & 0 deletions app/features/agents/agents/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
get_run,
list_runs,
)
from app.features.scenarios.agent_tools import propose_scenario, save_scenario
from app.features.scenarios.schemas import SaveScenarioRequest

logger = structlog.get_logger()

Expand Down Expand Up @@ -373,6 +375,111 @@ async def tool_archive_run(

return await archive_run(db=ctx.deps.db, run_id=run_id)

@agent.tool
@recoverable
async def tool_propose_scenario(
ctx: RunContext[AgentDeps],
store_id: int,
product_id: int,
horizon: int = 14,
objective: str = "",
) -> dict[str, Any]:
"""Propose a candidate what-if scenario for a store / product.

READ-ONLY: this drafts a candidate scenario; it persists nothing. To
save the proposal, call tool_save_scenario (which requires approval).

Args:
store_id: Store the proposed scenario targets.
product_id: Product the proposed scenario targets.
horizon: Number of days the proposed scenario should span (default 14).
objective: Free-text planning objective — keywords like 'promotion'
steer the proposal toward a promotion instead of a price cut.

Returns:
A candidate scenario with assumptions and a recommendation.
"""
ctx.deps.increment_tool_calls()
logger.info(
"agents.experiment.tool_propose_scenario",
session_id=ctx.deps.session_id,
store_id=store_id,
product_id=product_id,
)
return await propose_scenario(
db=ctx.deps.db,
store_id=store_id,
product_id=product_id,
horizon=horizon,
objective=objective,
)

@agent.tool
@recoverable
async def tool_save_scenario(
ctx: RunContext[AgentDeps],
name: str,
run_id: str,
store_id: int,
product_id: int,
horizon: int,
assumptions: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Persist a proposed what-if scenario as a saved scenario plan.

REQUIRES HUMAN APPROVAL. This action writes a scenario_plan row.

Use this only after tool_propose_scenario, passing back its candidate
assumptions. The plan is persisted with agent provenance and the
approval audit trail.

Args:
name: Human-readable name for the saved plan.
run_id: Artifact key of the baseline model.
store_id: Store the scenario targets.
product_id: Product the scenario targets.
horizon: Number of days to simulate.
assumptions: The candidate assumptions dict from tool_propose_scenario.

Returns:
The saved plan details, or an approval request.
"""
ctx.deps.increment_tool_calls()
logger.info(
"agents.experiment.tool_save_scenario",
session_id=ctx.deps.session_id,
store_id=store_id,
product_id=product_id,
requires_approval=requires_approval("save_scenario"),
)

arguments: dict[str, Any] = {
"name": name,
"run_id": run_id,
"store_id": store_id,
"product_id": product_id,
"horizon": horizon,
"assumptions": assumptions or {},
"source": "agent",
"agent_session_id": ctx.deps.session_id,
}

# Check if approval is required — mirrors tool_create_alias exactly.
if requires_approval("save_scenario"):
return {
"status": "approval_required",
"action": "save_scenario",
"arguments": arguments,
"message": "This action requires human approval. Please approve to proceed.",
}

request = SaveScenarioRequest.model_validate(arguments)
return await save_scenario(
db=ctx.deps.db,
request=request,
agent_session_id=ctx.deps.session_id,
)

return agent


Expand Down
14 changes: 13 additions & 1 deletion app/features/agents/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,8 @@ async def _execute_pending_action(
ValueError: If action_type is not recognized.
"""
from app.features.agents.tools.registry_tools import archive_run, create_alias
from app.features.scenarios.agent_tools import save_scenario
from app.features.scenarios.schemas import SaveScenarioRequest

if action_type == "create_alias":
alias_name = arguments.get("alias_name", "")
Expand All @@ -886,7 +888,17 @@ async def _execute_pending_action(
if result is None:
raise ValueError(f"Run not found: {run_id}")
return result
elif action_type == "save_scenario":
# The HITL gate has released the agent's save_scenario call — persist
# the scenario_plan row now, stamped with the approved audit trail.
request = SaveScenarioRequest.model_validate(arguments)
return await save_scenario(
db=db,
request=request,
agent_session_id=arguments.get("agent_session_id"),
)
else:
raise ValueError(
f"Unknown action type: {action_type}. Supported actions: create_alias, archive_run"
f"Unknown action type: {action_type}. Supported actions: "
"create_alias, archive_run, save_scenario"
)
Loading