diff --git a/app/features/featuresets/service.py b/app/features/featuresets/service.py index 669772a6..33d09782 100644 --- a/app/features/featuresets/service.py +++ b/app/features/featuresets/service.py @@ -138,6 +138,26 @@ def compute_features( result, cols = self._compute_lifecycle_features(result) feature_columns.extend(cols) + # 7. Promotion features (PRP-3.1D — Phase 2) + if self.config.promotion_config: + promotion_rows_df = getattr(self, "_promotion_rows_df", None) + if promotion_rows_df is None: + # PRP-3.1E wires the DB JOIN that sets this attribute. + # In unit tests, the test sets it directly on the service. + # An empty DataFrame is the safe no-op fallback. + promotion_rows_df = pd.DataFrame( + columns=[ + "product_id", + "store_id", + "kind", + "discount_pct", + "start_date", + "end_date", + ] + ) + result, cols = self._compute_promotion_features(result, promotion_rows_df) + feature_columns.extend(cols) + # Compute stats null_counts: dict[str, int] = {} if feature_columns: @@ -491,6 +511,116 @@ def _compute_lifecycle_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, l return result, columns + def _compute_promotion_features( + self, + df: pd.DataFrame, + promotion_rows_df: pd.DataFrame, + ) -> tuple[pd.DataFrame, list[str]]: + """Compute promotion-family features (active + intensity per kind). + + CRITICAL: Time-safe via ``groupby(entity_cols).shift(lag_days)`` on + a daily-grain indicator. Per the time-safety contract, the active + indicator at row D reads activity from D - lag_days. A promotion + active on D itself must NOT appear in active_lag{N} at D. + + Date-range semantics: ``start_date <= D <= end_date`` (both inclusive). + + Chain-wide promotions: rows with ``store_id`` NaN/None apply to + EVERY store of that product. Handled via a two-pass match (store- + specific OR chain-wide), never via a NaN-key merge. + + Overlapping promotions on the same kind/day reduce via ``max`` over + ``discount_pct`` for intensity (Decision §15-C); active stays 0/1. + + Args: + df: Sales DataFrame, pre-sorted and cutoff-filtered (per + compute_features pipeline). + promotion_rows_df: Promotion rows. Columns: + ``[product_id, store_id, kind, discount_pct, start_date, end_date]``. + ``store_id`` may be NaN (chain-wide). ``discount_pct`` may + be NaN (bogo / bundle kinds). + + Returns: + Tuple of (DataFrame with new columns, list of new column names). + """ + config = self.config.promotion_config + if config is None: + raise RuntimeError("_compute_promotion_features called without promotion_config") + + result = df.copy() + columns: list[str] = [] + lag = config.lag_days + + # Defensive re-sort to match the caller invariant. + result = result.sort_values([*self.entity_cols, self.date_col]) + dates = pd.to_datetime(result[self.date_col]).dt.date + + # Deterministic column ordering: sorted kinds, active before intensity. + sorted_kinds: tuple[str, ...] = tuple(sorted(config.kinds_to_track)) + + for kind in sorted_kinds: + kind_rows = promotion_rows_df[promotion_rows_df["kind"] == kind] + + # Per-row daily indicators (D-day truth, BEFORE lag shift). + active_today: pd.Series[Any] = pd.Series(0, index=result.index, dtype="int64") + intensity_today: pd.Series[Any] = pd.Series(np.nan, index=result.index, dtype="float64") + + # Two-pass match: store-specific then chain-wide. Never merge on NaN keys. + store_specific = kind_rows[kind_rows["store_id"].notna()] + chain_wide = kind_rows[kind_rows["store_id"].isna()] + + for _, promo in store_specific.iterrows(): + mask = ( + (result["store_id"] == promo["store_id"]) + & (result["product_id"] == promo["product_id"]) + & (dates >= promo["start_date"]) + & (dates <= promo["end_date"]) + ) + active_today = active_today.where(~mask, 1) + disc = promo["discount_pct"] + if pd.notna(disc): + # Overlapping-on-same-kind reduction = max (Decision §15-C). + masked_disc = intensity_today.where(~mask, float(disc)) + intensity_today = pd.concat([intensity_today, masked_disc], axis=1).max(axis=1) + + for _, promo in chain_wide.iterrows(): + mask = ( + (result["product_id"] == promo["product_id"]) + & (dates >= promo["start_date"]) + & (dates <= promo["end_date"]) + ) + active_today = active_today.where(~mask, 1) + disc = promo["discount_pct"] + if pd.notna(disc): + masked_disc = intensity_today.where(~mask, float(disc)) + intensity_today = pd.concat([intensity_today, masked_disc], axis=1).max(axis=1) + + # CRITICAL: groupby(entity_cols).shift(lag) — the leakage gate. + # Feature at row D reads daily indicator at D - lag. + if config.include_active: + col = f"promo_{kind}_active_lag{lag}" + shifted_active = ( + result.assign(_a=active_today) + .groupby(self.entity_cols, observed=True)["_a"] + .shift(lag) + ) + # Nullable Int64 preserves NaN at the start of each series + # (mirrors the lag-feature idiom — Decision §15-D). + result[col] = shifted_active.astype("Int64") + columns.append(col) + + if config.include_intensity: + col = f"promo_{kind}_intensity_lag{lag}" + shifted_intensity = ( + result.assign(_i=intensity_today) + .groupby(self.entity_cols, observed=True)["_i"] + .shift(lag) + ) + result[col] = shifted_intensity.astype("float64") + columns.append(col) + + return result, columns + class FeatureDataLoader: """Async data loader for feature computation. diff --git a/app/features/featuresets/tests/test_leakage.py b/app/features/featuresets/tests/test_leakage.py index ad9c7d71..42be62ec 100644 --- a/app/features/featuresets/tests/test_leakage.py +++ b/app/features/featuresets/tests/test_leakage.py @@ -13,6 +13,7 @@ FeatureSetConfig, LagConfig, LifecycleConfig, + PromotionConfig, RollingConfig, ) from app.features.featuresets.service import FeatureEngineeringService @@ -418,3 +419,155 @@ def test_lifecycle_group_isolation_no_cross_product_leakage( f"days_since_launch_lag1={actual}, expected={base_lag}. " "Lifecycle lag is mixing across products." ) + + +class TestPromotionLeakage: + """Tests verifying promotion features never use future data. + + PRP-3.1D — these leakage cases are LOAD-BEARING. They assert that a + promotion active on day D MUST NOT appear in day D's + ``promo__active_lag1`` column; it appears at day D+1 only. The + date-range semantics (start_date <= D <= end_date, both inclusive) + plus ``groupby(...).shift(lag_days)`` are the mathematical leakage gate. + """ + + def test_promotion_active_no_leakage_at_same_day( + self, + sample_time_series: pd.DataFrame, + phase2_promotion_rows_df: pd.DataFrame, + ) -> None: + """CRITICAL: A promotion active on day D MUST NOT appear in lag1 at D.""" + config = FeatureSetConfig( + name="test", + entity_columns=("store_id", "product_id"), + promotion_config=PromotionConfig( + kinds_to_track=("markdown",), + include_active=True, + include_intensity=False, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined] + result = service.compute_features(sample_time_series) + + # The fixture's markdown is active 2024-01-07 .. 2024-01-14 (8 days). + # promo_markdown_active_lag1 should be 1 on 2024-01-08 .. 2024-01-15. + df = result.df.reset_index(drop=True) + dates = pd.to_datetime(df["date"]).dt.date + + # Day BEFORE start (D=Jan 6): lag1 reads Jan 5 — inactive. EXPECT 0. + assert df.loc[dates == date(2024, 1, 6), "promo_markdown_active_lag1"].iloc[0] == 0 + + # Day OF start (D=Jan 7): lag1 reads Jan 6 — inactive. EXPECT 0. + # This is the load-bearing leakage check: same-day MUST NOT leak. + assert df.loc[dates == date(2024, 1, 7), "promo_markdown_active_lag1"].iloc[0] == 0, ( + "LEAKAGE DETECTED: promo active on day D appeared in active_lag1 at day D" + ) + + # Day AFTER start (D=Jan 8): lag1 reads Jan 7 — active. EXPECT 1. + assert df.loc[dates == date(2024, 1, 8), "promo_markdown_active_lag1"].iloc[0] == 1 + + # Day AFTER end (D=Jan 15): lag1 reads Jan 14 — last active day. EXPECT 1. + assert df.loc[dates == date(2024, 1, 15), "promo_markdown_active_lag1"].iloc[0] == 1 + + # Two days AFTER end (D=Jan 16): lag1 reads Jan 15 — inactive. EXPECT 0. + assert df.loc[dates == date(2024, 1, 16), "promo_markdown_active_lag1"].iloc[0] == 0 + + def test_promotion_boundary_end_date_at_cutoff( + self, + sample_time_series: pd.DataFrame, + ) -> None: + """A promo ending exactly on cutoff_date - 1 yields active_lag1=1 at cutoff.""" + cutoff = date(2024, 1, 15) + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [1], + "kind": ["markdown"], + "discount_pct": [0.20], + "start_date": [date(2024, 1, 10)], + "end_date": [date(2024, 1, 14)], # cutoff - 1 + } + ) + config = FeatureSetConfig( + name="test", + entity_columns=("store_id", "product_id"), + promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(sample_time_series, cutoff_date=cutoff) + + df = result.df.reset_index(drop=True) + dates = pd.to_datetime(df["date"]).dt.date + # At cutoff (Jan 15), lag1 reads Jan 14 — end_date, INCLUSIVE → active. + last = df.loc[dates == cutoff].iloc[0] + assert last["promo_markdown_active_lag1"] == 1, ( + "Boundary leakage: end_date INCLUSIVE on the previous day failed" + ) + + def test_promotion_starts_on_cutoff_not_in_lag1( + self, + sample_time_series: pd.DataFrame, + ) -> None: + """A promo starting exactly on cutoff is NOT in active_lag1 at cutoff.""" + cutoff = date(2024, 1, 15) + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [1], + "kind": ["markdown"], + "discount_pct": [0.20], + "start_date": [cutoff], # starts today + "end_date": [date(2024, 1, 25)], + } + ) + config = FeatureSetConfig( + name="test", + entity_columns=("store_id", "product_id"), + promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(sample_time_series, cutoff_date=cutoff) + + df = result.df.reset_index(drop=True) + dates = pd.to_datetime(df["date"]).dt.date + last = df.loc[dates == cutoff].iloc[0] + # lag1 reads cutoff - 1 = Jan 14, BEFORE start_date. + assert last["promo_markdown_active_lag1"] == 0, ( + "Same-day leakage: promo starting on D appeared in active_lag1 at D" + ) + + def test_chain_wide_promo_does_not_bleed_across_products( + self, + multi_series_time_series: pd.DataFrame, + ) -> None: + """A chain-wide promo on product=1 must NOT activate features for product=2.""" + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [None], # chain-wide + "kind": ["markdown"], + "discount_pct": [0.30], + "start_date": [date(2024, 1, 3)], + "end_date": [date(2024, 1, 7)], + } + ) + config = FeatureSetConfig( + name="test", + entity_columns=("store_id", "product_id"), + promotion_config=PromotionConfig(kinds_to_track=("markdown",), lag_days=1), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(multi_series_time_series) + + df = result.df + # Product 1 should see activity 2024-01-04 .. 2024-01-08 (lag1) -- 5 days x 2 stores. + prod1 = df[df["product_id"] == 1] + assert int(prod1["promo_markdown_active_lag1"].sum()) == 5 * 2 + # Product 2 should see ZERO activity (chain-wide is product-scoped). + prod2 = df[df["product_id"] == 2] + assert int(prod2["promo_markdown_active_lag1"].sum()) == 0 diff --git a/app/features/featuresets/tests/test_service.py b/app/features/featuresets/tests/test_service.py index 22f4f96c..f763bce4 100644 --- a/app/features/featuresets/tests/test_service.py +++ b/app/features/featuresets/tests/test_service.py @@ -11,6 +11,7 @@ ImputationConfig, LagConfig, LifecycleConfig, + PromotionConfig, RollingConfig, ) from app.features.featuresets.service import FeatureEngineeringService @@ -441,3 +442,263 @@ def test_compute_lifecycle_uses_phase2_fixture( # -> row 1 (date=2024-01-02, lag1 reflects row 0) = 214 expected = (date(2024, 1, 1) - date(2023, 6, 1)).days assert result.df.iloc[1]["days_since_launch_lag1"] == expected + + +class TestPromotionFeatures: + """Tests for promotion feature computation (PRP-3.1D).""" + + def test_single_kind_happy_path( + self, + sample_time_series: pd.DataFrame, + phase2_promotion_rows_df: pd.DataFrame, + ) -> None: + """Single-kind config produces exactly active+intensity columns for that kind.""" + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined] + result = service.compute_features(sample_time_series) + + assert "promo_markdown_active_lag1" in result.feature_columns + assert "promo_markdown_intensity_lag1" in result.feature_columns + # Determinism: exactly 2 columns, active before intensity. + promo_cols = [c for c in result.feature_columns if c.startswith("promo_")] + assert promo_cols == [ + "promo_markdown_active_lag1", + "promo_markdown_intensity_lag1", + ] + + def test_multi_kind_produces_all_columns_sorted( + self, + sample_time_series: pd.DataFrame, + phase2_promotion_rows_df: pd.DataFrame, + ) -> None: + """Multi-kind config produces columns in deterministic (sorted-kind) order.""" + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig( + # Intentionally NOT alphabetical — assert the method sorts. + kinds_to_track=("pct_off", "markdown"), + ), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined] + result = service.compute_features(sample_time_series) + + promo_cols = [c for c in result.feature_columns if c.startswith("promo_")] + # Decision §15-A: sorted by kind, then active before intensity. + assert promo_cols == [ + "promo_markdown_active_lag1", + "promo_markdown_intensity_lag1", + "promo_pct_off_active_lag1", + "promo_pct_off_intensity_lag1", + ] + + def test_chain_wide_promo_applies_to_all_stores( + self, + multi_series_time_series: pd.DataFrame, + ) -> None: + """A chain-wide promo (store_id IS NULL) applies to every store of the product.""" + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [None], + "kind": ["pct_off"], + "discount_pct": [0.10], + "start_date": [date(2024, 1, 3)], + "end_date": [date(2024, 1, 5)], + } + ) + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("pct_off",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(multi_series_time_series) + + # All product=1 rows in 2024-01-04..2024-01-06 (lag1) should be active. + df = result.df + prod1_active = df[(df["product_id"] == 1) & (df["promo_pct_off_active_lag1"] == 1)] + # 2 stores x 3 active-lagged days = 6 + assert len(prod1_active) == 6 + + def test_null_discount_pct_yields_nan_intensity_but_active_one(self) -> None: + """A bogo promo with NULL discount_pct: active=1, intensity=NaN.""" + sample = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=10, freq="D"), + "store_id": [1] * 10, + "product_id": [1] * 10, + "quantity": list(range(1, 11)), + "unit_price": [10.0] * 10, + "total_amount": [q * 10.0 for q in range(1, 11)], + } + ) + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [1], + "kind": ["bogo"], + "discount_pct": [None], + "start_date": [date(2024, 1, 3)], + "end_date": [date(2024, 1, 5)], + } + ) + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("bogo",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(sample) + + df = result.df.reset_index(drop=True) + dates = pd.to_datetime(df["date"]).dt.date + # D=Jan 4 reads Jan 3 (start). active=1. + row = df.loc[dates == date(2024, 1, 4)].iloc[0] + assert row["promo_bogo_active_lag1"] == 1 + assert pd.isna(row["promo_bogo_intensity_lag1"]) + + def test_overlapping_promos_intensity_uses_max(self) -> None: + """Two markdowns active on the same (store, product, day) → intensity = max.""" + sample = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=10, freq="D"), + "store_id": [1] * 10, + "product_id": [1] * 10, + "quantity": list(range(1, 11)), + "unit_price": [10.0] * 10, + "total_amount": [q * 10.0 for q in range(1, 11)], + } + ) + promo_rows = pd.DataFrame( + { + "product_id": [1, 1], + "store_id": [1, 1], + "kind": ["markdown", "markdown"], + "discount_pct": [0.15, 0.25], # overlap → max = 0.25 + "start_date": [date(2024, 1, 3), date(2024, 1, 4)], + "end_date": [date(2024, 1, 6), date(2024, 1, 5)], + } + ) + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(sample) + + df = result.df.reset_index(drop=True) + dates = pd.to_datetime(df["date"]).dt.date + # D=Jan 5 reads Jan 4 — BOTH active. intensity = max(0.15, 0.25) = 0.25. + row = df.loc[dates == date(2024, 1, 5)].iloc[0] + assert row["promo_markdown_active_lag1"] == 1 + assert row["promo_markdown_intensity_lag1"] == pytest.approx(0.25) + + def test_no_active_promo_yields_zero_active_and_nan_intensity( + self, + sample_time_series: pd.DataFrame, + ) -> None: + """No promo rows at all → active is NaN at first row then 0, intensity all NaN.""" + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = pd.DataFrame( # type: ignore[attr-defined] + columns=[ + "product_id", + "store_id", + "kind", + "discount_pct", + "start_date", + "end_date", + ] + ) + result = service.compute_features(sample_time_series) + + active = result.df["promo_markdown_active_lag1"] + intensity = result.df["promo_markdown_intensity_lag1"] + # First row of each series has NaN from the lag shift; remaining rows are 0. + assert pd.isna(active.iloc[0]) + assert (active.iloc[1:] == 0).all() + assert intensity.isna().all() + + def test_defensive_skip_when_rows_absent_via_orchestrator( + self, + sample_time_series: pd.DataFrame, + ) -> None: + """When ``_promotion_rows_df`` attribute is unset, orchestrator falls back + to an empty DataFrame and emits all-NaN-then-zero columns — never crashes. + """ + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + # Deliberately DO NOT set _promotion_rows_df — exercises the getattr fallback. + result = service.compute_features(sample_time_series) + + # Columns are still emitted (additive contract preserved). + assert "promo_markdown_active_lag1" in result.feature_columns + assert "promo_markdown_intensity_lag1" in result.feature_columns + # No exception. No active days because the rows-DataFrame is empty. + active = result.df["promo_markdown_active_lag1"] + intensity = result.df["promo_markdown_intensity_lag1"] + assert pd.isna(active.iloc[0]) + assert (active.iloc[1:] == 0).all() + assert intensity.isna().all() + + def test_cutoff_alignment_drops_post_cutoff_rows( + self, + sample_time_series: pd.DataFrame, + ) -> None: + """cutoff_date filtering applies BEFORE promotion compute; result is bounded.""" + cutoff = date(2024, 1, 10) + promo_rows = pd.DataFrame( + { + "product_id": [1], + "store_id": [1], + "kind": ["markdown"], + "discount_pct": [0.20], + "start_date": [date(2024, 1, 5)], + "end_date": [date(2024, 1, 9)], + } + ) + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = promo_rows # type: ignore[attr-defined] + result = service.compute_features(sample_time_series, cutoff_date=cutoff) + + # No rows past cutoff. + result_dates = pd.to_datetime(result.df["date"]).dt.date + assert (result_dates <= cutoff).all() + # At cutoff Jan 10, lag1 reads Jan 9 — last active day. active=1. + row = result.df.loc[result_dates == cutoff].iloc[0] + assert row["promo_markdown_active_lag1"] == 1 + + def test_active_column_dtype_is_nullable_int( + self, + sample_time_series: pd.DataFrame, + phase2_promotion_rows_df: pd.DataFrame, + ) -> None: + """Active column is Int64 (nullable int) to preserve NaN at series start.""" + config = FeatureSetConfig( + name="test", + promotion_config=PromotionConfig(kinds_to_track=("markdown",)), + ) + service = FeatureEngineeringService(config) + service._promotion_rows_df = phase2_promotion_rows_df # type: ignore[attr-defined] + result = service.compute_features(sample_time_series) + + # Decision §15-D — Int64 nullable extension dtype. + assert str(result.df["promo_markdown_active_lag1"].dtype) == "Int64" + # Intensity stays plain float64. + assert str(result.df["promo_markdown_intensity_lag1"].dtype) == "float64"