diff --git a/app/features/featuresets/service.py b/app/features/featuresets/service.py index 33d09782..84e5e7c9 100644 --- a/app/features/featuresets/service.py +++ b/app/features/featuresets/service.py @@ -158,6 +158,15 @@ def compute_features( result, cols = self._compute_promotion_features(result, promotion_rows_df) feature_columns.extend(cols) + # 8. Replenishment features (PRP-3.1C — Phase 2) + if self.config.replenishment_config: + events_df = getattr(self, "_replenishment_events_df", None) + # PRP-3.1E wires the DB JOIN that sets this attribute via the + # loader; tests set it via private-attr access. Mirrors the + # PRP-3.1D promotion sidecar pattern (no public setter). + result, cols = self._compute_replenishment_features(result, events_df=events_df) + feature_columns.extend(cols) + # Compute stats null_counts: dict[str, int] = {} if feature_columns: @@ -621,6 +630,155 @@ def _compute_promotion_features( return result, columns + def _compute_replenishment_features( + self, + df: pd.DataFrame, + events_df: pd.DataFrame | None = None, + ) -> tuple[pd.DataFrame, list[str]]: + """Compute replenishment-event features (PRP-3.1C). + + CRITICAL: All replenishment features are lagged to prevent leakage. + The events DataFrame must be pre-filtered to event_date <= cutoff_date + by the caller (the loader does this SQL-side; tests do it explicitly). + + Produced columns (when matching flags are set): + * days_since_last_replenishment_lag{N}: float64 -- gap (days) from + the current sales-row date to the most-recent prior event for + the SAME (store_id, product_id). NaN when no prior event exists. + * replenishment_count_w{W}_lag{N}: int64 -- number of events in + the trailing W-day window, excluding the current day (via + shift(1)). 0 for entity-windows with no events. + + Args: + df: Sales-shape DataFrame sorted by entity_cols + date_col. + events_df: ReplenishmentEvent rows with columns + [store_id, product_id, event_date]. May include extra columns + (lead_time_days, ordered_qty, received_qty) -- they are ignored. + REQUIRED for this method; pass None and the method raises. + + Returns: + Tuple of (df with new columns appended, list of new column names). + + Raises: + RuntimeError: If replenishment_config is None, or events_df is None. + """ + config = self.config.replenishment_config + if config is None: + raise RuntimeError( + "_compute_replenishment_features called without replenishment_config" + ) + if events_df is None: + raise RuntimeError( + "_compute_replenishment_features requires events_df " + "(load via FeatureDataLoader.load_replenishment_events or " + "inject in tests)" + ) + + result = df.copy() + columns: list[str] = [] + + # Internal helper column for date alignment (dropped before return). + # Force ``datetime64[ns]`` on both sides so merge_asof sees matching + # dtypes regardless of how the caller built the input frames + # (``datetime.date`` columns from SQLAlchemy land as ``[s]``; + # ``pd.date_range`` defaults to ``[ns]``). + sales_dt_col = "_sales_dt_internal" + result[sales_dt_col] = pd.to_datetime(result[self.date_col]).astype("datetime64[ns]") + + # Normalize events: select needed cols, coerce dtype, sort by date. + # merge_asof requires the right-side key sorted. + events = events_df.loc[:, ["store_id", "product_id", "event_date"]].copy() + events["event_date"] = pd.to_datetime(events["event_date"]).astype("datetime64[ns]") + events = events.sort_values(["event_date", "store_id", "product_id"]).reset_index(drop=True) + + # --- Feature 1: days_since_last_replenishment_lag{N} ----------------- + if config.include_days_since_last: + # merge_asof requires the left side sorted by the on-key. Sort + # result by sales_dt_col, run the asof, then restore canonical + # (entity, date) order before shifting. + sorted_result = result.sort_values(sales_dt_col).reset_index(drop=True) + with_last = pd.merge_asof( + sorted_result, + events.rename(columns={"event_date": "_last_event_dt"}), + left_on=sales_dt_col, + right_on="_last_event_dt", + by=["store_id", "product_id"], + direction="backward", + allow_exact_matches=True, + ) + with_last = with_last.sort_values([*self.entity_cols, self.date_col]).reset_index( + drop=True + ) + + # Days-since-last: (sales_date - last_event_date).dt.days; cast to + # float64 so NaN survives (numpy int can't represent missing). + days_since = (with_last[sales_dt_col] - with_last["_last_event_dt"]).dt.days.astype( + "float64" + ) + + col_name = f"days_since_last_replenishment_lag{config.lag_days}" + result[col_name] = ( + days_since.groupby( + [with_last[c] for c in self.entity_cols], + observed=True, + ) + .shift(config.lag_days) + .reset_index(drop=True) + ) + columns.append(col_name) + + # --- Feature 2: replenishment_count_w{W}_lag{N} ----------------------- + if config.include_count_window: + # Aggregate events to per-(entity, date) counts then left-merge + # onto sales. Multiple events on the same date are summed. + event_counts = ( + events.assign(_one=1) + .groupby(["store_id", "product_id", "event_date"], observed=True)["_one"] + .sum() + .reset_index() + .rename( + columns={"_one": "_event_count", "event_date": sales_dt_col}, + ) + ) + merged = result.merge( + event_counts, + on=["store_id", "product_id", sales_dt_col], + how="left", + ) + merged["_event_count"] = merged["_event_count"].fillna(0).astype("int64") + + # CRITICAL: shift(1).rolling(W).sum() per entity -- + # NEVER rolling(W).sum().shift(1). + window = config.count_window_days + + def _shift_rolling_count( + x: pd.Series[int], + w: int = window, + ) -> pd.Series[float]: + return x.shift(1).rolling(window=w, min_periods=1).sum() + + rolling_counts = merged.groupby(self.entity_cols, observed=True)[ + "_event_count" + ].transform(_shift_rolling_count) + + # For lag_days > 1, layer an extra shift on the already-shifted + # rolling result. Preserves the canonical shift(1).rolling(W) + # safety boundary (PRP-3.1C §15 Decision C). + if config.lag_days > 1: + rolling_counts = rolling_counts.groupby( + [merged[c] for c in self.entity_cols], + observed=True, + ).shift(config.lag_days - 1) + + col_name = f"replenishment_count_w{window}_lag{config.lag_days}" + result[col_name] = rolling_counts.fillna(0).astype("int64").reset_index(drop=True) + columns.append(col_name) + + # Drop the internal helper column so the response shape stays clean. + result = result.drop(columns=[sales_dt_col]) + + return result, columns + class FeatureDataLoader: """Async data loader for feature computation. @@ -749,6 +907,82 @@ async def load_calendar_data( ] ) + async def load_replenishment_events( + self, + db: AsyncSession, + store_ids: list[int], + product_ids: list[int], + cutoff_date: date_type, + ) -> pd.DataFrame: + """Load replenishment events for the given entities up to cutoff_date. + + CRITICAL: SQL-side filter ``date <= cutoff_date`` enforces time-safety + BEFORE any pandas code sees the rows (PRP-3.1C decisions log §2). + + Args: + db: Async database session. + store_ids: Store IDs to include. + product_ids: Product IDs to include. + cutoff_date: Maximum event date (inclusive). + + Returns: + DataFrame with columns + ``[store_id, product_id, event_date, lead_time_days, ordered_qty, + received_qty]``. The DB column ``date`` is renamed to ``event_date`` + for clarity at the compute boundary. + """ + from app.features.data_platform.models import ReplenishmentEvent + + stmt = ( + select( + ReplenishmentEvent.store_id, + ReplenishmentEvent.product_id, + ReplenishmentEvent.date, + ReplenishmentEvent.lead_time_days, + ReplenishmentEvent.ordered_qty, + ReplenishmentEvent.received_qty, + ) + .where( + ReplenishmentEvent.store_id.in_(store_ids), + ReplenishmentEvent.product_id.in_(product_ids), + ReplenishmentEvent.date <= cutoff_date, + ) + .order_by( + ReplenishmentEvent.store_id, + ReplenishmentEvent.product_id, + ReplenishmentEvent.date, + ) + ) + + result = await db.execute(stmt) + rows = result.all() + + if not rows: + return pd.DataFrame( + columns=[ + "store_id", + "product_id", + "event_date", + "lead_time_days", + "ordered_qty", + "received_qty", + ] + ) + + return pd.DataFrame( + [ + { + "store_id": row.store_id, + "product_id": row.product_id, + "event_date": row.date, # rename at the boundary + "lead_time_days": row.lead_time_days, + "ordered_qty": row.ordered_qty, + "received_qty": row.received_qty, + } + for row in rows + ] + ) + async def compute_features_for_series( db: AsyncSession, @@ -801,6 +1035,23 @@ async def compute_features_for_series( how="left", ) + # Optionally load replenishment events (PRP-3.1C) before constructing + # the service. SQL-side date filter enforces time-safety. + events_df: pd.DataFrame | None = None + if config.replenishment_config: + events_df = await loader.load_replenishment_events( + db=db, + store_ids=[store_id], + product_ids=[product_id], + cutoff_date=cutoff_date, + ) + # Compute features service = FeatureEngineeringService(config) + if events_df is not None: + # Sidecar attach via setattr — see PRP-3.1C §15 Decision A and the + # matching ``_promotion_rows_df`` pattern in PRP-3.1D. setattr keeps + # the attribute dynamic so mypy/pyright don't flag a private-member + # access on a non-declared attribute. + setattr(service, "_replenishment_events_df", events_df) # noqa: B010 return service.compute_features(df, cutoff_date=cutoff_date) diff --git a/app/features/featuresets/tests/test_leakage.py b/app/features/featuresets/tests/test_leakage.py index 42be62ec..773db4cf 100644 --- a/app/features/featuresets/tests/test_leakage.py +++ b/app/features/featuresets/tests/test_leakage.py @@ -571,3 +571,266 @@ def test_chain_wide_promo_does_not_bleed_across_products( # Product 2 should see ZERO activity (chain-wide is product-scoped). prod2 = df[df["product_id"] == 2] assert int(prod2["promo_markdown_active_lag1"].sum()) == 0 + + +class TestReplenishmentLeakage: + """CRITICAL: Replenishment features (PRP-3.1C) must never use future events. + + These tests assert (a) per-entity ``shift(N)`` invariance on days-since, + (b) ``shift(1).rolling(W).sum()`` ordering on the rolling count -- NEVER + ``rolling(W).sum().shift(1)``, and (c) cross-series isolation between + ``(store_id, product_id)`` pairs. + """ + + def test_days_since_uses_only_past_events(self) -> None: + """``days_since_last_replenishment_lag1`` must never look at today's event. + + With sales on dates 2024-01-01..2024-01-07 and a single event on + 2024-01-03, the per-row gap-to-last-event (BEFORE the lag) is + [NaN, NaN, 0, 1, 2, 3, 4]. After the ``groupby.shift(1)``, lag-1 + produces [NaN, NaN, NaN, 0, 1, 2, 3] -- so on the event-date + (row 2) the column is NaN (the event itself is invisible at + lag-1), and the post-event rows see strictly past gaps. + + The math-detectable leakage failure mode would set row 2 to 0 + (i.e. ``shift(0)`` -- include today's event) or set row 3 to 1 + (i.e. compute the gap from 01-04 to 01-03 = 1 day, but advance + the result one row too early). Either regression breaks the + equality below. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=7, freq="D"), + "store_id": [1] * 7, + "product_id": [1] * 7, + "quantity": list(range(1, 8)), + } + ) + events = pd.DataFrame( + { + "store_id": [1], + "product_id": [1], + "event_date": [date(2024, 1, 3)], + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=False, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + col = "days_since_last_replenishment_lag1" + values = result.df[col].tolist() + # Expected: [NaN, NaN, NaN, 0, 1, 2, 3] + # Pre-event (rows 0,1): no prior event observed -> per-row days NaN -> + # shift(1) -> NaN. + # Row 2 (event date 01-03): per-row gap=0, but shift(1) -> previous + # row's per-row value -> NaN (which is the time-safety guarantee -- + # the event is invisible the day it lands). + # Rows 3..6: shift(1) yields 0, 1, 2, 3 -- strictly past gaps. + for i in (0, 1, 2): + assert pd.isna(values[i]), ( + f"LEAKAGE DETECTED at row {i}: days_since={values[i]} " + "(expected NaN -- lag-1 must NOT see today's event)." + ) + assert values[3] == pytest.approx(0.0), ( + f"LEAKAGE DETECTED at row 3: days_since={values[3]} (expected 0.0). " + "lag-1 at 2024-01-04 should see 2024-01-03's gap of 0 days." + ) + assert values[4] == pytest.approx(1.0), ( + f"LEAKAGE DETECTED at row 4: days_since={values[4]} (expected 1.0)." + ) + assert values[5] == pytest.approx(2.0), ( + f"LEAKAGE DETECTED at row 5: days_since={values[5]} (expected 2.0)." + ) + assert values[6] == pytest.approx(3.0), ( + f"LEAKAGE DETECTED at row 6: days_since={values[6]} (expected 3.0)." + ) + + def test_count_window_uses_shift_then_rolling(self) -> None: + """CRITICAL: ``shift(1).rolling(W).sum()`` MUST be the order. + + Events on dates 2024-01-01, 2024-01-03, 2024-01-05 with W=7 (the + smallest window the config schema allows; ``ReplenishmentConfig`` + bounds ``count_window_days`` at ``ge=7``). Sales rows on every + date 2024-01-01..2024-01-07. + + Per-row event_count = [1, 0, 1, 0, 1, 0, 0]. + Correct ``shift(1).rolling(7, min_periods=1).sum()`` -> NaN at + position 0 (filled to 0), then cumulative sum of all prior: + [0, 1, 1, 2, 2, 3, 3]. + + The INCORRECT ``rolling(7).sum()`` -- with NO leading shift -- would + INCLUDE today's event at every step, giving [1, 1, 2, 2, 3, 3, 3]. + The two patterns differ at multiple rows (e.g. row 0 = 1 vs 0, + row 2 = 2 vs 1), so this single assertion catches a missing + ``shift(1)`` regression. The math-distinct difference at row 0 is + also a direct probe for the "current-event included in window" bug + called out by PRP-3.1C decisions log §C. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=7, freq="D"), + "store_id": [1] * 7, + "product_id": [1] * 7, + "quantity": list(range(1, 8)), + } + ) + events = pd.DataFrame( + { + "store_id": [1, 1, 1], + "product_id": [1, 1, 1], + "event_date": [date(2024, 1, 1), date(2024, 1, 3), date(2024, 1, 5)], + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=False, + include_count_window=True, + count_window_days=7, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + col = "replenishment_count_w7_lag1" + counts = result.df[col].tolist() + expected = [0, 1, 1, 2, 2, 3, 3] + assert counts == expected, ( + f"LEAKAGE DETECTED: count column = {counts}, expected {expected}. " + "shift(1).rolling(W).sum() order may be reversed -- today's event " + "is leaking into the rolling window." + ) + + def test_cross_series_isolation(self) -> None: + """CRITICAL: events on (store=1, product=1) must NEVER affect store=2. + + Two stores x one product; events only on (1, 1). Store=2 must see + all-zero counts and all-NaN days-since across every row. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": list(pd.date_range("2024-01-01", periods=5, freq="D")) * 2, + "store_id": [1] * 5 + [2] * 5, + "product_id": [1] * 10, + "quantity": list(range(10)), + } + ) + events = pd.DataFrame( + { + "store_id": [1, 1], + "product_id": [1, 1], + "event_date": [date(2024, 1, 2), date(2024, 1, 4)], + } + ) + config = FeatureSetConfig( + name="test", + entity_columns=("store_id", "product_id"), + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + count_window_days=7, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + days_col = "days_since_last_replenishment_lag1" + count_col = "replenishment_count_w7_lag1" + + store_2 = result.df[result.df["store_id"] == 2] + # store=2 has no events anywhere -> count must be 0 across every row. + store_2_counts = store_2[count_col].tolist() + assert store_2_counts == [0, 0, 0, 0, 0], ( + f"CROSS-SERIES LEAKAGE DETECTED: store=2 counts = {store_2_counts}, " + "expected all 0 (events were only on store=1). merge_asof may be " + "missing by=['store_id', 'product_id']." + ) + # store=2 has no observed prior event -> days_since all NaN. + store_2_days = store_2[days_col].tolist() + for i, v in enumerate(store_2_days): + assert pd.isna(v), ( + f"CROSS-SERIES LEAKAGE DETECTED: store=2 row {i} days_since={v}, " + "expected NaN. Events from store=1 may have bled into store=2." + ) + + def test_event_on_cutoff_date_included_via_le_filter(self) -> None: + """Event ON ``cutoff_date`` must be visible (``date <= cutoff_date``). + + Sales 2024-01-01..2024-01-07; cutoff=2024-01-05. Event on + 2024-01-05 (the cutoff itself) must contribute to the rolling count + on 2024-01-06 / 2024-01-07. Mirrors the SQL-side ``date <= cutoff`` + predicate (PRP-3.1C decisions log §2). + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=7, freq="D"), + "store_id": [1] * 7, + "product_id": [1] * 7, + "quantity": list(range(1, 8)), + } + ) + cutoff = date(2024, 1, 5) + # Pre-filter events to event_date <= cutoff (mirrors loader's SQL). + all_events = pd.DataFrame( + { + "store_id": [1, 1], + "product_id": [1, 1], + "event_date": [date(2024, 1, 5), date(2024, 1, 7)], + } + ) + events = all_events[all_events["event_date"] <= cutoff].reset_index(drop=True) + + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + count_window_days=7, + lag_days=1, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales, cutoff_date=cutoff) + + # compute_features drops rows after cutoff -> 5 rows remain. + assert len(result.df) == 5, ( + f"CUTOFF VIOLATION: expected 5 rows after cutoff, got {len(result.df)}" + ) + days_col = "days_since_last_replenishment_lag1" + # On the cutoff row (2024-01-05), lag-1 cannot see today's event. + # On 2024-01-04 there was no prior event -> NaN. + last_row = result.df.iloc[-1] + assert pd.isna(last_row[days_col]), ( + f"LEAKAGE DETECTED at cutoff row: days_since={last_row[days_col]} " + "(expected NaN -- lag-1 must NOT include today's event)." + ) + # The event was admitted via <= filter (post-cutoff event was + # filtered OUT). To prove inclusion of the on-cutoff event, run a + # second compute extending the sales range -- not done here to + # honor the cutoff boundary; instead we assert events DataFrame + # itself contains exactly one row (the on-cutoff event). + assert len(events) == 1, ( + f"FILTER ERROR: expected 1 event after <= cutoff filter, got {len(events)}" + ) diff --git a/app/features/featuresets/tests/test_service.py b/app/features/featuresets/tests/test_service.py index f763bce4..5d12d230 100644 --- a/app/features/featuresets/tests/test_service.py +++ b/app/features/featuresets/tests/test_service.py @@ -702,3 +702,236 @@ def test_active_column_dtype_is_nullable_int( assert str(result.df["promo_markdown_active_lag1"].dtype) == "Int64" # Intensity stays plain float64. assert str(result.df["promo_markdown_intensity_lag1"].dtype) == "float64" + + +class TestReplenishmentFeatures: + """Unit tests for replenishment-event features (PRP-3.1C). + + These cases exercise the happy path, zero-event entities, single-event + entities, cutoff-boundary alignment, and dtype contracts. Time-safety is + covered separately in ``TestReplenishmentLeakage``. + """ + + def test_happy_path_three_events(self) -> None: + """Three events on (1,1) yield monotonically growing rolling counts.""" + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=10, freq="D"), + "store_id": [1] * 10, + "product_id": [1] * 10, + "quantity": list(range(1, 11)), + } + ) + events = pd.DataFrame( + { + "store_id": [1, 1, 1], + "product_id": [1, 1, 1], + "event_date": [date(2024, 1, 2), date(2024, 1, 5), date(2024, 1, 8)], + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + # Both columns should be in feature_columns. + assert "days_since_last_replenishment_lag1" in result.feature_columns + assert "replenishment_count_w7_lag1" in result.feature_columns + + # Per-row event_count = [0,1,0,0,1,0,0,1,0,0]; shifted then rolling: + # shift(1) = [NaN,0,1,0,0,1,0,0,1,0]; rolling(7,min_periods=1).sum(): + # pos 7 window=[0,1,0,0,1,0,0] -> 2 + # pos 8 window=[1,0,0,1,0,0,1] -> 3 + # pos 9 window=[0,0,1,0,0,1,0] -> 2 (the day-2 event has now rolled + # out of the trailing-7 window — this is the expected behavior of + # ``count_window_days=7``). + # fillna 0 -> [0,0,1,1,1,2,2,2,3,2]. + counts = result.df["replenishment_count_w7_lag1"].tolist() + assert counts == [0, 0, 1, 1, 1, 2, 2, 2, 3, 2] + + def test_zero_events_entity(self) -> None: + """An entity with zero events must have count=0 and days-since=NaN. + + Dtype contracts: count is int64 (with 0 fill), days-since is float64 + (NaN for no-prior-event). PRP-3.1C §15 Decision B. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=5, freq="D"), + "store_id": [1] * 5, + "product_id": [1] * 5, + "quantity": [1, 2, 3, 4, 5], + } + ) + # Zero events — DataFrame with named columns but no rows. + events = pd.DataFrame( + { + "store_id": pd.Series([], dtype="int64"), + "product_id": pd.Series([], dtype="int64"), + "event_date": pd.Series([], dtype="datetime64[ns]"), + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + count_col = "replenishment_count_w7_lag1" + days_col = "days_since_last_replenishment_lag1" + assert result.df[count_col].tolist() == [0, 0, 0, 0, 0] + assert result.df[count_col].dtype == "int64" + assert result.df[days_col].isna().all() + assert result.df[days_col].dtype == "float64" + + def test_single_event_entity(self) -> None: + """One event on day 3; days-since and count cross the boundary cleanly.""" + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=5, freq="D"), + "store_id": [1] * 5, + "product_id": [1] * 5, + "quantity": [1, 2, 3, 4, 5], + } + ) + events = pd.DataFrame( + { + "store_id": [1], + "product_id": [1], + "event_date": [date(2024, 1, 3)], + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + count_col = "replenishment_count_w7_lag1" + days_col = "days_since_last_replenishment_lag1" + # event_count = [0,0,1,0,0]; shift(1)=[NaN,0,0,1,0]; rolling sum = + # [NaN,0,0,1,1] -> fillna 0 -> [0,0,0,1,1]. + assert result.df[count_col].tolist() == [0, 0, 0, 1, 1] + # Per-row days-since = [NaN, NaN, 0, 1, 2]; shift(1) = + # [NaN, NaN, NaN, 0, 1]. + values = result.df[days_col].tolist() + for i in (0, 1, 2): + assert pd.isna(values[i]), f"row {i} should be NaN, got {values[i]}" + assert values[3] == pytest.approx(0.0) + assert values[4] == pytest.approx(1.0) + + def test_cutoff_excludes_post_events(self) -> None: + """Events AFTER cutoff (filtered by caller) must not influence features. + + Mirrors the loader's SQL-side ``date <= cutoff_date`` predicate. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=5, freq="D"), + "store_id": [1] * 5, + "product_id": [1] * 5, + "quantity": [1, 2, 3, 4, 5], + } + ) + cutoff = date(2024, 1, 4) + # Build events with one pre-cutoff and one post-cutoff row, then + # apply the same time-safety filter the loader applies SQL-side. + all_events = pd.DataFrame( + { + "store_id": [1, 1], + "product_id": [1, 1], + "event_date": [date(2024, 1, 2), date(2024, 1, 5)], + } + ) + events = all_events[all_events["event_date"] <= cutoff].reset_index(drop=True) + assert len(events) == 1 # post-cutoff event filtered out + + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales, cutoff_date=cutoff) + + # 4 rows survive the cutoff (01..04). + assert len(result.df) == 4 + count_col = "replenishment_count_w7_lag1" + # event_count = [0,1,0,0] (post-cutoff filtered); shift(1) = + # [NaN,0,1,0]; rolling = [NaN,0,1,1] -> [0,0,1,1]. + assert result.df[count_col].tolist() == [0, 0, 1, 1] + + def test_dtypes_are_int64_and_float64(self) -> None: + """Column dtype contracts: count=int64, days-since=float64. + + Verifies PRP-3.1C §15 Decision B. Tested against a small frame + with mixed-population entities so both NaN-then-fill paths are + exercised. + """ + from app.features.featuresets.schemas import ReplenishmentConfig + + sales = pd.DataFrame( + { + "date": pd.date_range("2024-01-01", periods=4, freq="D"), + "store_id": [1] * 4, + "product_id": [1] * 4, + "quantity": [1, 2, 3, 4], + } + ) + events = pd.DataFrame( + { + "store_id": [1], + "product_id": [1], + "event_date": [date(2024, 1, 2)], + } + ) + config = FeatureSetConfig( + name="test", + replenishment_config=ReplenishmentConfig( + include_days_since_last=True, + include_count_window=True, + lag_days=1, + count_window_days=7, + ), + ) + service = FeatureEngineeringService(config) + service._replenishment_events_df = events # type: ignore[attr-defined] + result = service.compute_features(sales) + + assert result.df["replenishment_count_w7_lag1"].dtype == "int64" + assert result.df["days_since_last_replenishment_lag1"].dtype == "float64"