Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion app/features/featuresets/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,15 @@ class ComputeFeaturesRequest(BaseModel):

store_id: int = Field(..., ge=1, description="Store ID")
product_id: int = Field(..., ge=1, description="Product ID")
# NOTE: ``strict=False`` overrides the model-level ``strict=True`` for this
# field so FastAPI's ``validate_python`` (called on the JSON-parsed dict
# by ``fastapi._compat.v2:175``) accepts ISO date strings from JSON
# request bodies. Without this override, ``strict=True`` requires an
# actual ``datetime.date`` instance -- which JSON cannot carry --
# and every HTTP caller would 422.
cutoff_date: date_type = Field(
...,
strict=False,
description="Compute features up to this date (inclusive)",
)
lookback_days: int = Field(
Expand Down Expand Up @@ -463,6 +470,8 @@ class PreviewFeaturesRequest(BaseModel):

store_id: int = Field(..., ge=1)
product_id: int = Field(..., ge=1)
cutoff_date: date_type
# See ``ComputeFeaturesRequest.cutoff_date`` for the rationale on
# ``strict=False`` (FastAPI ``validate_python`` + JSON dates).
cutoff_date: date_type = Field(..., strict=False)
sample_rows: int = Field(default=10, ge=1, le=100)
config: FeatureSetConfig
174 changes: 174 additions & 0 deletions app/features/featuresets/tests/test_leakage.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,177 @@ def test_event_on_cutoff_date_included_via_le_filter(self) -> None:
assert len(events) == 1, (
f"FILTER ERROR: expected 1 event after <= cutoff filter, got {len(events)}"
)


class TestPhase2CrossConfigLeakage:
"""Verify Phase 2 configs compose without future-data leakage (PRP-3.1E).

Even when each Phase 2 family's own leakage tests (PRP-3.1B/C/D) are
green individually, composing all three with Phase 1 (lag + lifecycle
+ replenishment + promotion) may surface a new leakage path if any
family's ``_compute_*`` method mutates ``df`` in place before the next
family reads it. This class is the strongest additive assertion --
feature rows at row ``i`` reference ONLY rows ``<= i - lag_days``.
"""

def test_all_phase2_configs_compose_no_future_leakage(
self,
sample_time_series: pd.DataFrame,
phase2_promotion_rows_df: pd.DataFrame,
) -> None:
"""CRITICAL: All four Phase 2 + Phase 1 configs ON -- no row ``i``
feature reads data ``> i - 1`` for the (store=1, product=1) series.

Strategy:
* lag_1 on sequential quantity 1..30 must equal ``i`` at row ``i``
(the canonical TestLagLeakage invariant).
* Phase 2 columns must be present in ``result.feature_columns``
(proves compose didn't accidentally skip them).
* Lifecycle ``days_since_launch_lag1`` at row 0 must be NaN
(no prior row to lag from).
* Promotion ``promo_markdown_active_lag1`` at row 0 must be NaN
(Int64 dtype -- no prior row).

The replenishment ``replenishment_count_w14_lag{N}`` column is
intentionally NOT checked at row 0 because its compute path fills
NaN -> 0 (see service.py ``_compute_replenishment_features`` --
``rolling_counts.fillna(0).astype("int64")``). A row-0-NaN check
would be a false positive against the documented contract.
"""
from app.features.featuresets.schemas import ReplenishmentConfig

# Lifecycle requires launch_date / discontinue_date columns on
# the input frame (service.py:489 silent-skips otherwise).
df = sample_time_series.copy()
df["launch_date"] = date(2023, 12, 25) # 7 days before the first sales row
df["discontinue_date"] = pd.NaT

config = FeatureSetConfig(
name="phase2-cross-config-leakage",
lag_config=LagConfig(lags=(1,)),
lifecycle_config=LifecycleConfig(lag_days=1),
replenishment_config=ReplenishmentConfig(lag_days=1, count_window_days=14),
promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1),
)
service = FeatureEngineeringService(config)
# Auxiliary frames -- PRP-3.1C / 3.1D use private-attribute sidecars
# (mirrors the in-method ``getattr(self, "_promotion_rows_df", None)``
# / ``getattr(self, "_replenishment_events_df", None)`` reads in
# service.py).
service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined]
# Replenishment requires a non-None events frame; pass an empty
# one with the expected schema AND dtypes (service.py's
# ``merge_asof`` rejects ``object``-typed ``by=`` keys, which is
# what an unsized DataFrame defaults to). An empty events frame
# is the safe cross-config leakage probe -- it proves the JOIN
# path doesn't accidentally invent counts when no events exist.
empty_events = pd.DataFrame(
{
"store_id": pd.Series([], dtype="int64"),
"product_id": pd.Series([], dtype="int64"),
"event_date": pd.Series([], dtype="datetime64[ns]"),
}
)
service._replenishment_events_df = empty_events # type: ignore[attr-defined]
result = service.compute_features(df)

# 1. lag_1 invariant must survive Phase 2 compose.
for i in range(1, len(result.df)):
lag_value = result.df.iloc[i]["lag_1"]
assert lag_value == i, (
f"CROSS-CONFIG LEAKAGE at row {i}: lag_1={lag_value} != "
f"expected={i}. Phase 2 compose corrupted Phase 1 lag."
)

# 2. Phase 2 columns must be present (compose didn't skip).
for expected in (
"days_since_launch_lag1",
"replenishment_count_w14_lag1",
"promo_markdown_active_lag1",
):
assert expected in result.feature_columns, (
f"COMPOSE REGRESSION: column {expected!r} missing from "
f"feature_columns -- {result.feature_columns}"
)

# 3. Row 0 of NaN-friendly Phase 2 columns must be NaN.
# Lifecycle: row 0 has no prior row to lag from -> NaN.
assert pd.isna(result.df.iloc[0]["days_since_launch_lag1"]), (
"CROSS-CONFIG LEAKAGE: days_since_launch_lag1 row 0 is not NaN -- "
f"got {result.df.iloc[0]['days_since_launch_lag1']!r}"
)
# Promotion active: nullable Int64 preserves NaN at the start of
# the series (per service.py ``astype("Int64")``).
assert pd.isna(result.df.iloc[0]["promo_markdown_active_lag1"]), (
"CROSS-CONFIG LEAKAGE: promo_markdown_active_lag1 row 0 is not NaN -- "
f"got {result.df.iloc[0]['promo_markdown_active_lag1']!r}"
)

def test_phase2_compose_preserves_group_isolation(
self,
multi_series_time_series: pd.DataFrame,
) -> None:
"""CRITICAL: Multi-series + all Phase 2 configs -- per-series
``shift(lag_days)`` boundary must hold; no leakage between the
``(1, 1)`` and ``(2, 2)`` series.

Verifies that ``groupby([store_id, product_id]).shift(1)`` keeps
series isolated even when lifecycle + replenishment + promotion
joins are evaluated for the union of entities.
"""
from app.features.featuresets.schemas import ReplenishmentConfig

df = multi_series_time_series.copy()
# Two products with different launch dates so cross-series lifecycle
# contamination would be mathematically visible (mirrors
# TestLifecycleLeakage::test_lifecycle_group_isolation pattern).
launch_map = {1: date(2023, 12, 1), 2: date(2023, 12, 25)}
df["launch_date"] = df["product_id"].map(launch_map)
df["discontinue_date"] = pd.NaT

config = FeatureSetConfig(
name="phase2-group-iso",
entity_columns=("store_id", "product_id"),
lag_config=LagConfig(lags=(1,)),
lifecycle_config=LifecycleConfig(lag_days=1),
replenishment_config=ReplenishmentConfig(lag_days=1, count_window_days=14),
promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1),
)
service = FeatureEngineeringService(config)
# Empty aux frames -- the leakage probe is about the GROUPBY
# boundary, not the JOIN payload.
service._promotion_rows_df = pd.DataFrame( # type: ignore[attr-defined]
columns=[
"product_id",
"store_id",
"kind",
"discount_pct",
"start_date",
"end_date",
]
)
service._replenishment_events_df = pd.DataFrame( # type: ignore[attr-defined]
{
"store_id": pd.Series([], dtype="int64"),
"product_id": pd.Series([], dtype="int64"),
"event_date": pd.Series([], dtype="datetime64[ns]"),
}
)
result = service.compute_features(df)

# First row of each series must have NaN lag_1 (per-group
# isolation -- the canonical TestGroupIsolationLeakage idiom).
for store_id in (1, 2):
for product_id in (1, 2):
series = result.df[
(result.df["store_id"] == store_id) & (result.df["product_id"] == product_id)
].reset_index(drop=True)
assert pd.isna(series.iloc[0]["lag_1"]), (
f"GROUP ISOLATION BREACH at ({store_id},{product_id}): "
"lag_1 first row should be NaN even with Phase 2 composed"
)
# Lifecycle: row 0 of each series must also be NaN.
assert pd.isna(series.iloc[0]["days_since_launch_lag1"]), (
f"GROUP ISOLATION BREACH at ({store_id},{product_id}): "
"days_since_launch_lag1 first row should be NaN per-series"
)
Loading