Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""add exogenous_signal and sales_returns tables

Revision ID: f7a8b9c0d123
Revises: d6e0f2g3h456
Create Date: 2026-05-11 12:00:00.000000

Phase 1 of the seeder realism extension. Additive only — creates two new
fact tables to support exogenous demand signals (weather / macro / events)
and synthetic returns volume. No existing rows are touched.

Downgrade drops both tables; any seeded rows are lost. This is acceptable
because the data is synthetic; do not run downgrade against an environment
that holds user-loaded data.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "f7a8b9c0d123"
down_revision: str | None = "d6e0f2g3h456"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
"""Apply migration: create exogenous_signal and sales_returns."""
op.create_table(
"exogenous_signal",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("signal_name", sa.String(length=50), nullable=False),
sa.Column("store_id", sa.Integer(), nullable=True),
sa.Column("is_global", sa.Boolean(), nullable=False),
sa.Column("value", sa.Float(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.CheckConstraint(
"(is_global = true AND store_id IS NULL) OR "
"(is_global = false AND store_id IS NOT NULL)",
name="ck_exogenous_signal_global_consistency",
),
sa.ForeignKeyConstraint(["date"], ["calendar.date"]),
sa.ForeignKeyConstraint(["store_id"], ["store.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_exogenous_signal_date"), "exogenous_signal", ["date"], unique=False
)
op.create_index(
op.f("ix_exogenous_signal_signal_name"),
"exogenous_signal",
["signal_name"],
unique=False,
)
op.create_index(
op.f("ix_exogenous_signal_store_id"),
"exogenous_signal",
["store_id"],
unique=False,
)
op.create_index(
"ix_exogenous_signal_name_date",
"exogenous_signal",
["signal_name", "date"],
unique=False,
)
op.create_index(
"uq_exogenous_signal_global",
"exogenous_signal",
["date", "signal_name"],
unique=True,
postgresql_where=sa.text("is_global = true"),
)
op.create_index(
"uq_exogenous_signal_per_store",
"exogenous_signal",
["date", "signal_name", "store_id"],
unique=True,
postgresql_where=sa.text("is_global = false"),
)

op.create_table(
"sales_returns",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("store_id", sa.Integer(), nullable=False),
sa.Column("product_id", sa.Integer(), nullable=False),
sa.Column("return_quantity", sa.Integer(), nullable=False),
sa.Column("return_reason", sa.String(length=50), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.CheckConstraint("return_quantity >= 1", name="ck_sales_returns_quantity_positive"),
sa.ForeignKeyConstraint(["date"], ["calendar.date"]),
sa.ForeignKeyConstraint(["product_id"], ["product.id"]),
sa.ForeignKeyConstraint(["store_id"], ["store.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_sales_returns_product_id"), "sales_returns", ["product_id"], unique=False
)
op.create_index(
op.f("ix_sales_returns_store_id"), "sales_returns", ["store_id"], unique=False
)
op.create_index(
"ix_sales_returns_store_product_date",
"sales_returns",
["store_id", "product_id", "date"],
unique=False,
)
op.create_index("ix_sales_returns_date", "sales_returns", ["date"], unique=False)


def downgrade() -> None:
"""Revert migration: drop sales_returns and exogenous_signal.

WARNING: Any seeded Phase 1 rows are lost. Acceptable for synthetic data
only — do not run against an environment with user-loaded signals.
"""
op.drop_index("ix_sales_returns_date", table_name="sales_returns")
op.drop_index("ix_sales_returns_store_product_date", table_name="sales_returns")
op.drop_index(op.f("ix_sales_returns_store_id"), table_name="sales_returns")
op.drop_index(op.f("ix_sales_returns_product_id"), table_name="sales_returns")
op.drop_table("sales_returns")

op.drop_index("uq_exogenous_signal_per_store", table_name="exogenous_signal")
op.drop_index("uq_exogenous_signal_global", table_name="exogenous_signal")
op.drop_index("ix_exogenous_signal_name_date", table_name="exogenous_signal")
op.drop_index(op.f("ix_exogenous_signal_store_id"), table_name="exogenous_signal")
op.drop_index(op.f("ix_exogenous_signal_signal_name"), table_name="exogenous_signal")
op.drop_index(op.f("ix_exogenous_signal_date"), table_name="exogenous_signal")
op.drop_table("exogenous_signal")
87 changes: 87 additions & 0 deletions app/features/data_platform/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
from decimal import Decimal

from sqlalchemy import (
BigInteger,
Boolean,
CheckConstraint,
Date,
Float,
ForeignKey,
Index,
Integer,
Expand Down Expand Up @@ -308,3 +310,88 @@ class InventorySnapshotDaily(TimestampMixin, Base):
CheckConstraint("on_hand_qty >= 0", name="ck_inventory_on_hand_positive"),
CheckConstraint("on_order_qty >= 0", name="ck_inventory_on_order_positive"),
)


class ExogenousSignal(TimestampMixin, Base):
"""Exogenous demand-relevant signals (weather, macro index, events).

A signal is either chain-wide (``is_global=True``, ``store_id IS NULL``)
or per-store (``is_global=False``, ``store_id IS NOT NULL``). The two
cases are enforced by ``ck_exogenous_signal_global_consistency`` and made
unique by two partial indexes so re-runs of the seeder are idempotent.

Attributes:
id: Surrogate primary key.
date: Signal date (FK to calendar).
signal_name: Short identifier (e.g. ``"weather_temp_c"``, ``"macro_index"``).
store_id: Store (FK) — NULL when ``is_global=True``.
is_global: True for chain-wide signals; mirrors ``store_id IS NULL``.
value: Numeric value of the signal on the given date.
"""

__tablename__ = "exogenous_signal"

id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
date: Mapped[datetime.date] = mapped_column(Date, ForeignKey("calendar.date"), index=True)
signal_name: Mapped[str] = mapped_column(String(50), index=True)
store_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("store.id"), nullable=True, index=True
)
is_global: Mapped[bool] = mapped_column(Boolean, nullable=False)
value: Mapped[float] = mapped_column(Float, nullable=False)

__table_args__ = (
Index("ix_exogenous_signal_name_date", "signal_name", "date"),
Index(
"uq_exogenous_signal_global",
"date",
"signal_name",
unique=True,
postgresql_where=("is_global = true"),
),
Index(
"uq_exogenous_signal_per_store",
"date",
"signal_name",
"store_id",
unique=True,
postgresql_where=("is_global = false"),
),
CheckConstraint(
"(is_global = true AND store_id IS NULL) OR "
"(is_global = false AND store_id IS NOT NULL)",
name="ck_exogenous_signal_global_consistency",
),
)


class SalesReturn(TimestampMixin, Base):
"""Synthetic sales return event.

Returns are not subtracted from ``sales_daily.quantity``; they live in a
separate table so featuresets/forecasting can opt into them as a signal.

Attributes:
id: Surrogate primary key.
date: Return date (FK to calendar).
store_id: Store (FK).
product_id: Product (FK).
return_quantity: Units returned (>= 1).
return_reason: Free-form short reason (e.g. ``"defective"``,
``"changed_mind"``).
"""

__tablename__ = "sales_returns"

id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
date: Mapped[datetime.date] = mapped_column(Date, ForeignKey("calendar.date"))
store_id: Mapped[int] = mapped_column(Integer, ForeignKey("store.id"), index=True)
product_id: Mapped[int] = mapped_column(Integer, ForeignKey("product.id"), index=True)
return_quantity: Mapped[int] = mapped_column(Integer, nullable=False)
return_reason: Mapped[str] = mapped_column(String(50), nullable=False)

__table_args__ = (
Index("ix_sales_returns_store_product_date", "store_id", "product_id", "date"),
Index("ix_sales_returns_date", "date"),
CheckConstraint("return_quantity >= 1", name="ck_sales_returns_quantity_positive"),
)
68 changes: 67 additions & 1 deletion app/features/seeder/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
through the dashboard admin panel.
"""

from fastapi import APIRouter, Depends, HTTPException, status
from datetime import date

from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.config import get_settings
Expand Down Expand Up @@ -226,6 +228,70 @@ async def delete_data(
) from e


@router.get(
"/exogenous",
response_model=schemas.ExogenousSignalResponse,
summary="Query exogenous signals",
description=(
"Return exogenous signal rows (Phase 1) for a given signal name and date "
"window. Available signals: `weather_temp_c`, `macro_index`, `event_flag`."
),
)
async def query_exogenous(
signal_name: str = Query(
...,
min_length=1,
max_length=50,
description="Signal identifier (e.g. weather_temp_c, macro_index, event_flag)",
),
start_date: date = Query(..., description="Window start (inclusive)"),
end_date: date = Query(..., description="Window end (inclusive)"),
store_id: int | None = Query(
default=None,
ge=1,
description="Optional store filter. Omit to include global + per-store rows.",
),
db: AsyncSession = Depends(get_db),
) -> schemas.ExogenousSignalResponse:
"""Query exogenous_signal rows for a signal name and date window.

Returns rows ordered by date. Subject to row and date-range caps to
keep the response bounded.

Raises:
HTTPException: 400 if the date window is invalid or oversized.
"""
try:
return await service.query_exogenous(
db,
signal_name=signal_name,
start_date=start_date,
end_date=end_date,
store_id=store_id,
)
except ValueError as e:
logger.error(
"seeder.exogenous.query_failed",
error=str(e),
error_type=type(e).__name__,
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
except Exception as e:
logger.error(
"seeder.exogenous.query_failed",
error=str(e),
error_type=type(e).__name__,
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Exogenous query failed: {e}",
) from e


@router.post(
"/verify",
response_model=schemas.VerifyResult,
Expand Down
Loading