From 292e8c67957488de981da27686bbd20f03040ed0 Mon Sep 17 00:00:00 2001 From: "Gabe@w7dev" Date: Mon, 26 Jan 2026 11:52:06 +0000 Subject: [PATCH 1/4] docs: add DAILY-FLOW and PHASE-FLOW documentation - DAILY-FLOW.md: Daily development workflow (branch strategy, PR flow, commands) - PHASE-FLOW.md: Phase completion workflow (lifecycle, snapshot, sync) Co-Authored-By: Claude Opus 4.5 --- docs/DAILY-FLOW.md | 175 ++++++++++++++++++++++++++++++++ docs/PHASE-FLOW.md | 243 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 418 insertions(+) create mode 100644 docs/DAILY-FLOW.md create mode 100644 docs/PHASE-FLOW.md diff --git a/docs/DAILY-FLOW.md b/docs/DAILY-FLOW.md new file mode 100644 index 00000000..3ceefe42 --- /dev/null +++ b/docs/DAILY-FLOW.md @@ -0,0 +1,175 @@ +# Daily Development Flow + +A napi fejlesztési munkafolyamat a ForecastLabAI projekthez. + +--- + +## Branch Stratégia + +``` +main (protected) <- Production releases only + │ + └── dev <- Integration branch + │ + └── feat/* <- Feature branches +``` + +--- + +## Napi Fejlesztési Ciklus + +### 1. Munka Kezdése + +```bash +# Frissítsd a dev branchet +git checkout dev +git pull origin dev + +# Hozz létre feature branchet +git checkout -b feat/prp-X-feature-name +``` + +### 2. Fejlesztés Közben + +```bash +# Rendszeres commitok +git add +git commit -m "feat(module): description" + +# Lokális ellenőrzések +uv run ruff check . +uv run ruff format . +uv run mypy app/ +uv run pyright app/ +uv run pytest -v +``` + +### 3. PR Létrehozása + +```bash +# Push feature branch +git push origin feat/prp-X-feature-name + +# PR létrehozása dev-be +gh pr create --base dev --title "feat(module): description" --body "..." +``` + +### 4. CI Ellenőrzések + +A PR-nek át kell mennie: +- [ ] Lint & Format +- [ ] Type Check (MyPy + Pyright) +- [ ] Test +- [ ] Migration Check +- [ ] Code Review (Sourcery, CodeRabbit) + +### 5. Merge to Dev + +```bash +# Review után merge +gh pr merge --squash --delete-branch +``` + +--- + +## Dev → Main Merge (Feature Complete) + +Amikor egy feature teljesen kész: + +```bash +# Checkout dev +git checkout dev +git pull origin dev + +# PR létrehozása main-be +gh pr create --base main --head dev --title "feat(module): merge feature to main" + +# CI ellenőrzések után merge +gh pr merge --squash +``` + +--- + +## Release Flow + +A `main` branch-re történő merge után a `release-please` automatikusan: +1. Létrehoz egy Release PR-t (version bump + CHANGELOG) +2. CI lefut a Release PR-en +3. Merge után GitHub Release + tag jön létre + +```bash +# Release PR ellenőrzése +gh pr list --label "autorelease: pending" + +# CI trigger ha szükséges +git checkout +git commit --allow-empty -m "chore: trigger CI" +git push + +# Merge release PR +gh pr merge --squash --delete-branch +``` + +--- + +## Commit Message Konvenció + +``` +(): + +Types: +- feat: Új feature +- fix: Bug fix +- docs: Dokumentáció +- refactor: Refaktorálás +- test: Teszt hozzáadás/módosítás +- chore: Build, CI, dependencies + +Scope (opcionális): +- data-platform +- ingest +- forecasting +- backtesting +- registry +- rag +- dashboard +``` + +--- + +## Gyors Parancsok + +```bash +# Status check +git status && git log --oneline -5 + +# Lint + format +uv run ruff check --fix . && uv run ruff format . + +# Type check +uv run mypy app/ && uv run pyright app/ + +# Tesztek +uv run pytest -v +uv run pytest -v -m integration # DB szükséges + +# PR checks +gh pr checks + +# Watch workflow +gh run watch +``` + +--- + +## Következő Phase: Ingest Layer (PRP-3) + +```bash +# Kezdés +git checkout dev +git pull origin dev +git checkout -b feat/prp-3-ingest-layer + +# Fejlesztés... +# PR → dev → main → release → phase-2 snapshot +``` diff --git a/docs/PHASE-FLOW.md b/docs/PHASE-FLOW.md new file mode 100644 index 00000000..e69365d3 --- /dev/null +++ b/docs/PHASE-FLOW.md @@ -0,0 +1,243 @@ +# Phase Completion Flow + +A phase lezárási munkafolyamat a ForecastLabAI projekthez. + +--- + +## Phase Lifecycle + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ PHASE LIFECYCLE │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ 1. DEVELOPMENT (feat/* → dev) │ +│ └── Feature branches merged to dev │ +│ │ +│ 2. INTEGRATION (dev → main) │ +│ └── All features complete, merge dev to main │ +│ │ +│ 3. RELEASE (release-please) │ +│ └── Automated version bump + GitHub Release │ +│ │ +│ 4. DOCUMENTATION │ +│ └── Update PHASE-index.md, phase docs │ +│ │ +│ 5. SNAPSHOT (main → phase-N) │ +│ └── Create protected branch + audit tag │ +│ │ +│ 6. SYNC (phase-N → dev) │ +│ └── Sync dev with main for next phase │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Részletes Lépések + +### 1. Feature Complete - Dev → Main + +```bash +# Ellenőrizd hogy minden feature merged dev-be +git checkout dev +git pull origin dev + +# PR létrehozása main-be +gh pr create --base main --head dev \ + --title "feat(phase-N): complete Phase N implementation" \ + --body "## Summary +- Feature 1 +- Feature 2 +- Feature 3 + +## Test plan +- [x] All tests passing +- [x] Type checks green +- [x] Lint checks green" + +# CI ellenőrzés és merge +gh pr checks --watch +gh pr merge --squash +``` + +### 2. Release + +```bash +# Várd meg a release-please PR-t +gh pr list --label "autorelease: pending" + +# Trigger CI ha szükséges +git checkout +git commit --allow-empty -m "chore: trigger CI" +git push + +# Merge release PR (review szükséges) +gh pr merge --squash --delete-branch + +# Ellenőrizd a release-t +gh release view v0.X.Y +``` + +### 3. Dokumentáció Frissítése + +```bash +# Checkout main +git checkout main +git pull origin main + +# Hozz létre docs branchet +git checkout -b docs/phase-N-completed + +# Frissítsd a dokumentációt: +# - docs/PHASE-index.md (status: Completed) +# - docs/PHASE/N-PHASE_NAME.md (status, release info) + +# PR és merge +gh pr create --base main --title "docs: mark Phase N as completed" +gh pr merge --squash --delete-branch +``` + +### 4. Phase Snapshot + +```bash +# Frissítsd main-t +git checkout main +git pull origin main + +# Hozd létre a phase branchet MAIN-ről +git checkout -b phase-N +git push origin phase-N + +# Várj a phase-snapshot.yml workflow-ra +gh run list --workflow=phase-snapshot.yml --limit 1 +gh run watch + +# Ellenőrizd a taget +git fetch --tags +git tag -l "phase-N-*" +``` + +### 5. Dev Szinkronizálás + +```bash +# Szinkronizáld dev-et a phase branch-ről +git checkout dev +git reset --hard phase-N +git push origin dev --force + +# VAGY PR-rel (biztonságosabb) +gh pr create --base dev --head phase-N \ + --title "sync: update dev from phase-N" +``` + +--- + +## Phase Snapshot Workflow + +A `phase-snapshot.yml` automatikusan: + +1. **Full Validation** + - Lint check + - Type check (MyPy + Pyright) + - Migration check + - All tests (unit + integration) + +2. **Create Audit Snapshot** + - Generate metadata JSON + - Collect validation results + - Create markdown report + - Upload audit artifact + - Create annotated tag: `phase-N-snapshot-YYYYMMDD-` + +--- + +## Branch Protection + +A `phase-*` branchek automatikusan védettek: +- No force push +- No deletion +- Serves as audit trail + +--- + +## Checklist: Phase Completion + +```markdown +## Phase N Completion Checklist + +### Development +- [ ] All PRPs implemented +- [ ] All features merged to dev +- [ ] All tests passing + +### Integration +- [ ] Dev merged to main +- [ ] Release PR merged +- [ ] GitHub Release created (vX.Y.Z) + +### Documentation +- [ ] PHASE-index.md updated (status: Completed) +- [ ] docs/PHASE/N-*.md updated +- [ ] Version history entry added + +### Snapshot +- [ ] phase-N branch created from main +- [ ] phase-snapshot workflow passed +- [ ] Annotated tag created +- [ ] Audit artifact uploaded + +### Sync +- [ ] Dev synced with main/phase-N +- [ ] Ready for next phase +``` + +--- + +## Példa: Phase 1 → Phase 2 Átmenet + +```bash +# Phase 1 lezárva, Phase 2 kezdése + +# 1. Ellenőrizd hogy phase-1 rendben van +git checkout phase-1 +git log --oneline -3 + +# 2. Dev szinkronizálva van +git checkout dev +git log --oneline -3 # Ugyanaz mint phase-1 + +# 3. Kezdd Phase 2-t +git checkout -b feat/prp-3-ingest-layer +# ... fejlesztés ... +``` + +--- + +## Hibaelhárítás + +### Phase branch behind main +```bash +# Töröld és hozd újra létre main-ről +git push origin --delete phase-N +git checkout main && git pull +git checkout -b phase-N +git push origin phase-N +``` + +### Dev diverged from main +```bash +# Reset dev to main +git checkout dev +git reset --hard origin/main +git push origin dev --force +``` + +### Régi phase tag törlése +```bash +# Töröld a remote taget +git push origin --delete phase-N-snapshot-YYYYMMDD- + +# Töröld a local taget +git tag -d phase-N-snapshot-YYYYMMDD- +``` From 0e15cb34587c744c41e20c554c82adf3ff27f853 Mon Sep 17 00:00:00 2001 From: Gabor Szabo <168316277+w7-mgfcode@users.noreply.github.com> Date: Mon, 26 Jan 2026 13:57:20 +0100 Subject: [PATCH 2/4] feat(ingest): implement idempotent batch upsert endpoint for sales_daily (#19) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs(prp): add PRP-3 for ingest layer implementation Comprehensive implementation plan for idempotent batch upsert endpoints: - POST /ingest/sales-daily with natural key resolution (store_code, sku) - PostgreSQL ON CONFLICT DO UPDATE for replay-safe ingestion - Partial success pattern with row-level error reporting - Configurable batch size and timeout settings - 13 ordered implementation tasks with validation gates Confidence score: 8/10 Co-Authored-By: Claude Opus 4.5 * feat(ingest): implement idempotent batch upsert endpoint for sales_daily Implements PRP-3 Ingest Layer with: - POST /ingest/sales-daily endpoint accepting natural keys (store_code, sku) - KeyResolver service for store_code → store_id, sku → product_id resolution - Calendar date FK validation (dates must exist in calendar table) - PostgreSQL ON CONFLICT DO UPDATE for replay-safe upserts - Partial success pattern: valid rows processed, invalid rows returned as errors - Configurable batch size and timeout via Settings - Structured logging with ingest.sales_daily.{action}_{state} events New files: - app/features/ingest/ - Complete vertical slice (schemas, service, routes, tests) - examples/api/ingest_sales_daily.http - HTTP client examples Test coverage: 42 new tests (18 schema + 15 service + 9 integration) All validation gates passing: ruff, mypy, pyright, pytest (101 tests) Co-Authored-By: Claude Opus 4.5 * fix(ingest): address Sourcery code review comments - Replace inserted_count/updated_count with processed_count (ON CONFLICT can't distinguish inserts vs updates without xmax check complexity) - Remove misleading updated_count field that was always 0 - Use result.rowcount instead of len(result.fetchall()) for efficiency - Change errors field from mutable default=[] to default_factory=list - Catch specific SQLAlchemyError instead of bare Exception - Wire db_session into FastAPI deps override in integration tests - Remove no-op validate_total_amount_consistency validator Co-Authored-By: Claude Opus 4.5 --------- Co-authored-by: Gabe@w7dev Co-authored-by: Claude Opus 4.5 --- PRPs/PRP-3-ingest-layer.md | 888 ++++++++++++++++++++++ app/core/config.py | 4 + app/features/ingest/__init__.py | 20 + app/features/ingest/routes.py | 97 +++ app/features/ingest/schemas.py | 73 ++ app/features/ingest/service.py | 234 ++++++ app/features/ingest/tests/__init__.py | 1 + app/features/ingest/tests/conftest.py | 112 +++ app/features/ingest/tests/test_routes.py | 373 +++++++++ app/features/ingest/tests/test_schemas.py | 285 +++++++ app/features/ingest/tests/test_service.py | 321 ++++++++ app/main.py | 2 + examples/api/ingest_sales_daily.http | 312 ++++++++ pyproject.toml | 7 +- 14 files changed, 2726 insertions(+), 3 deletions(-) create mode 100644 PRPs/PRP-3-ingest-layer.md create mode 100644 app/features/ingest/__init__.py create mode 100644 app/features/ingest/routes.py create mode 100644 app/features/ingest/schemas.py create mode 100644 app/features/ingest/service.py create mode 100644 app/features/ingest/tests/__init__.py create mode 100644 app/features/ingest/tests/conftest.py create mode 100644 app/features/ingest/tests/test_routes.py create mode 100644 app/features/ingest/tests/test_schemas.py create mode 100644 app/features/ingest/tests/test_service.py create mode 100644 examples/api/ingest_sales_daily.http diff --git a/PRPs/PRP-3-ingest-layer.md b/PRPs/PRP-3-ingest-layer.md new file mode 100644 index 00000000..1d24ab78 --- /dev/null +++ b/PRPs/PRP-3-ingest-layer.md @@ -0,0 +1,888 @@ +# PRP-3: Ingest Layer — Idempotent Batch Upserts + +## Goal + +Implement typed, idempotent batch upsert endpoints for the ForecastLabAI data platform. The primary endpoint `POST /ingest/sales-daily` accepts sales records with natural keys (`store_code`, `sku`) and performs replay-safe upserts using PostgreSQL's `ON CONFLICT DO UPDATE`. + +**End State:** A production-ready ingest layer with: +- `POST /ingest/sales-daily` — batch upsert endpoint accepting natural keys +- Key resolution service (store_code → store_id, sku → product_id) +- Configurable batch sizing and timeouts via Settings +- Comprehensive error handling with row-level validation +- Structured logging with inserted/updated counts and duration metrics +- All validation gates passing (ruff, mypy, pyright, pytest) + +--- + +## Why + +- **Foundation for ForecastOps**: Training and backtesting (INITIAL-4 through INITIAL-6) require populated sales data +- **Replay-Safe Ingestion**: `ON CONFLICT DO UPDATE` enables re-running pipelines without duplicates +- **External System Compatibility**: Real-world systems send natural keys (store_code, sku), not internal database IDs +- **Data Quality**: Row-level validation prevents bad data while allowing valid rows to succeed +- **Operational Visibility**: Structured logging enables monitoring ingestion health and performance + +--- + +## What + +### Success Criteria + +- [ ] `POST /ingest/sales-daily` accepts batch of sales records with `store_code` and `sku` +- [ ] Service resolves natural keys to internal IDs via lookup (with basic caching) +- [ ] Unknown store_code/sku rejects individual row, continues processing valid rows +- [ ] `ON CONFLICT (date, store_id, product_id) DO UPDATE` ensures idempotency +- [ ] Response includes `inserted_count`, `updated_count`, `rejected_count`, `errors[]` +- [ ] Batch size configurable via `INGEST_BATCH_SIZE` setting (default: 1000) +- [ ] Request timeout configurable via `INGEST_TIMEOUT_SECONDS` setting (default: 60) +- [ ] All logs follow `ingest.{component}.{action}_{state}` naming convention +- [ ] Unit tests for schemas, service logic, key resolution +- [ ] Integration tests verify upsert idempotency and constraint enforcement +- [ ] Example files: `examples/api/ingest_sales_daily.http` + +--- + +## All Needed Context + +### Documentation & References + +```yaml +# MUST READ - Critical for implementation +- url: https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert + why: SQLAlchemy 2.0 PostgreSQL-specific INSERT...ON CONFLICT syntax + critical: Use `from sqlalchemy.dialects.postgresql import insert` for pg_insert + +- url: https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html + why: ORM-enabled INSERT/UPDATE/DELETE operations with SQLAlchemy 2.0 + critical: Use `insert(Model).values()` not `session.add()` for bulk upserts + +- url: https://docs.pydantic.dev/latest/concepts/validators/ + why: Pydantic v2 field validators and model validators + critical: Use `@field_validator` and `@model_validator` decorators + +- url: https://fastapi.tiangolo.com/tutorial/request-files/ + why: FastAPI request body handling patterns + critical: Use `List[Model]` in request body for batch operations + +- url: https://www.dbvis.com/thetable/postgresql-upsert-insert-on-conflict-guide/ + why: PostgreSQL UPSERT best practices and performance considerations + critical: conflict_target must match unique constraint exactly + +- url: https://overflow.no/blog/2025/1/5/using-staging-tables-for-faster-bulk-upserts-with-python-and-postgresql/ + why: Performance optimization for large batch upserts + critical: For >10k rows, staging table approach is faster (YAGNI for now) + +# Internal codebase files - MUST reference these patterns +- file: app/features/data_platform/models.py + why: SalesDaily model with UniqueConstraint("date", "store_id", "product_id") + +- file: app/features/data_platform/schemas.py + why: Existing Pydantic schema patterns (Base/Create/Read pattern) + +- file: app/core/database.py + why: AsyncSession dependency pattern (get_db) + +- file: app/core/exceptions.py + why: ForecastLabError, ValidationError, DatabaseError patterns + +- file: app/core/health.py + why: Router structure and endpoint patterns + +- file: app/core/logging.py + why: get_logger() and request_id_ctx patterns + +- file: docs/validation/logging-standard.md + why: Event naming convention: `{domain}.{component}.{action}_{state}` + +- file: CLAUDE.md + why: Type safety requirements, vertical slice architecture, KISS/YAGNI principles +``` + +### Current Codebase Tree + +```bash +app/ +├── __init__.py +├── main.py # FastAPI entry, router registration +├── core/ +│ ├── config.py # Pydantic Settings (add ingest config here) +│ ├── database.py # AsyncSession, get_db() +│ ├── exceptions.py # ForecastLabError hierarchy +│ ├── health.py # Router pattern example +│ ├── logging.py # Structured logging, get_logger() +│ └── middleware.py # RequestIdMiddleware +├── shared/ +│ ├── models.py # TimestampMixin +│ ├── schemas.py # ErrorResponse, PaginationParams +│ └── utils.py # Utilities +└── features/ + └── data_platform/ + ├── models.py # Store, Product, Calendar, SalesDaily (with grain constraint) + ├── schemas.py # StoreRead, ProductRead, SalesDailyCreate, etc. + └── tests/ # Model and constraint tests +``` + +### Desired Codebase Tree (files to be added) + +```bash +app/ +├── core/ +│ └── config.py # MODIFY: Add ingest_batch_size, ingest_timeout_seconds +└── features/ + └── ingest/ # NEW: Ingest vertical slice + ├── __init__.py # Module exports + ├── schemas.py # IngestSalesDailyRequest, IngestSalesDailyResponse, IngestRowError + ├── service.py # KeyResolver, upsert_sales_daily_batch() + ├── routes.py # POST /ingest/sales-daily endpoint + └── tests/ + ├── __init__.py + ├── conftest.py # Feature-specific fixtures + ├── test_schemas.py # Schema validation tests + ├── test_service.py # Service logic tests (with mocked DB) + └── test_routes.py # Integration tests for API endpoint + +examples/ +└── api/ + └── ingest_sales_daily.http # NEW: HTTP client example +``` + +### Known Gotchas & Library Quirks + +```python +# CRITICAL: PostgreSQL-specific INSERT for ON CONFLICT support +# ❌ WRONG: from sqlalchemy import insert +# ✅ CORRECT: from sqlalchemy.dialects.postgresql import insert as pg_insert + +# CRITICAL: ON CONFLICT index_elements must match UniqueConstraint EXACTLY +# The constraint is: UniqueConstraint("date", "store_id", "product_id", name="uq_sales_daily_grain") +# ✅ CORRECT: +stmt = pg_insert(SalesDaily).values(data).on_conflict_do_update( + index_elements=["date", "store_id", "product_id"], # Must be column names, not constraint name + set_={...} +) + +# CRITICAL: Use stmt.excluded for UPDATE values +# ✅ CORRECT: +stmt = pg_insert(SalesDaily).values(data).on_conflict_do_update( + index_elements=["date", "store_id", "product_id"], + set_={ + "quantity": stmt.excluded.quantity, + "unit_price": stmt.excluded.unit_price, + "total_amount": stmt.excluded.total_amount, + "updated_at": func.now(), + } +) + +# CRITICAL: Batch execution with executemany semantics +# For bulk upserts, execute each statement individually or use RETURNING +# asyncpg has 32767 parameter limit - batch accordingly + +# CRITICAL: Decimal precision for monetary values +# ❌ WRONG: total_amount: float +# ✅ CORRECT: total_amount: Decimal = Field(..., decimal_places=2) + +# CRITICAL: Foreign key lookups must happen BEFORE upsert +# The ingest payload has store_code/sku, but SalesDaily needs store_id/product_id +# Resolve these first, reject rows with unknown codes + +# CRITICAL: Calendar FK constraint +# SalesDaily.date has FK to calendar.date - calendar entry MUST exist +# Either auto-create calendar entries or reject if missing + +# CRITICAL: Error handling - partial success pattern +# Don't fail entire batch on one bad row +# Return inserted/updated/rejected counts with error details +``` + +--- + +## Implementation Blueprint + +### Data Models and Structure + +#### Request/Response Schemas (app/features/ingest/schemas.py) + +```python +"""Pydantic schemas for ingest API.""" + +from datetime import date +from decimal import Decimal + +from pydantic import BaseModel, Field, field_validator, model_validator + + +class SalesDailyIngestRow(BaseModel): + """Single row in sales daily ingest payload. + + Uses natural keys (store_code, sku) instead of internal IDs. + Service resolves these to store_id, product_id before upsert. + """ + + date: date + store_code: str = Field(..., min_length=1, max_length=20, description="Store code (natural key)") + sku: str = Field(..., min_length=1, max_length=50, description="Product SKU (natural key)") + quantity: int = Field(..., ge=0, description="Units sold (non-negative)") + unit_price: Decimal = Field(..., ge=0, description="Price per unit") + total_amount: Decimal = Field(..., ge=0, description="Total sales amount") + + @model_validator(mode="after") + def validate_total_amount_consistency(self) -> "SalesDailyIngestRow": + """Warn if total_amount doesn't match quantity * unit_price.""" + expected = self.quantity * self.unit_price + if abs(self.total_amount - expected) > Decimal("0.01"): + # Allow through but could log warning + pass + return self + + +class SalesDailyIngestRequest(BaseModel): + """Request body for POST /ingest/sales-daily.""" + + records: list[SalesDailyIngestRow] = Field( + ..., + min_length=1, + max_length=10000, # Configurable max batch size + description="Sales records to upsert" + ) + + +class IngestRowError(BaseModel): + """Error detail for a single rejected row.""" + + row_index: int = Field(..., description="0-based index of the failed row") + store_code: str = Field(..., description="Store code from the row") + sku: str = Field(..., description="SKU from the row") + date: date = Field(..., description="Date from the row") + error_code: str = Field(..., description="Machine-readable error code") + error_message: str = Field(..., description="Human-readable error message") + + +class SalesDailyIngestResponse(BaseModel): + """Response body for POST /ingest/sales-daily.""" + + inserted_count: int = Field(..., ge=0, description="Number of new rows inserted") + updated_count: int = Field(..., ge=0, description="Number of existing rows updated") + rejected_count: int = Field(..., ge=0, description="Number of rows rejected") + total_processed: int = Field(..., ge=0, description="Total rows processed") + errors: list[IngestRowError] = Field(default_factory=list, description="Details of rejected rows") + duration_ms: float = Field(..., ge=0, description="Processing duration in milliseconds") +``` + +#### Settings Extension (app/core/config.py) + +```python +# Add to Settings class: +# Ingest configuration +ingest_batch_size: int = Field(default=1000, ge=1, le=10000, description="Max rows per upsert batch") +ingest_timeout_seconds: int = Field(default=60, ge=1, le=300, description="Request timeout for ingest") +``` + +### Tasks (Ordered Implementation) + +```yaml +Task 1: Create ingest feature directory structure + FILES: + - app/features/ingest/__init__.py + - app/features/ingest/schemas.py + - app/features/ingest/service.py + - app/features/ingest/routes.py + - app/features/ingest/tests/__init__.py + - app/features/ingest/tests/conftest.py + VALIDATION: + - ls -la app/features/ingest/ + +Task 2: Add ingest configuration to Settings + MODIFY: app/core/config.py + ADD: + - ingest_batch_size: int = 1000 + - ingest_timeout_seconds: int = 60 + VALIDATION: + - uv run python -c "from app.core.config import get_settings; s = get_settings(); print(f'batch_size={s.ingest_batch_size}')" + +Task 3: Implement ingest schemas + FILE: app/features/ingest/schemas.py + IMPLEMENT: + - SalesDailyIngestRow (natural keys: store_code, sku) + - SalesDailyIngestRequest (list of rows with validation) + - IngestRowError (error details for rejected rows) + - SalesDailyIngestResponse (counts + errors + duration) + VALIDATION: + - uv run mypy app/features/ingest/schemas.py + - uv run pyright app/features/ingest/schemas.py + +Task 4: Implement KeyResolver service + FILE: app/features/ingest/service.py + IMPLEMENT: + - KeyResolver class with resolve_store_codes() and resolve_skus() methods + - Uses simple dict lookup from DB (can optimize with caching later) + - Returns mapping: {store_code: store_id} and {sku: product_id} + PSEUDOCODE: + ```python + class KeyResolver: + async def resolve_store_codes( + self, db: AsyncSession, codes: set[str] + ) -> dict[str, int]: + """Resolve store codes to IDs. Returns {code: id} for found stores.""" + stmt = select(Store.code, Store.id).where(Store.code.in_(codes)) + result = await db.execute(stmt) + return {row.code: row.id for row in result} + + async def resolve_skus( + self, db: AsyncSession, skus: set[str] + ) -> dict[str, int]: + """Resolve SKUs to product IDs. Returns {sku: id} for found products.""" + stmt = select(Product.sku, Product.id).where(Product.sku.in_(skus)) + result = await db.execute(stmt) + return {row.sku: row.id for row in result} + ``` + VALIDATION: + - uv run mypy app/features/ingest/service.py + +Task 5: Implement upsert_sales_daily_batch service function + FILE: app/features/ingest/service.py (append) + IMPLEMENT: + - upsert_sales_daily_batch(db, records) -> UpsertResult + - Uses pg_insert with on_conflict_do_update + - Handles partial success (collects errors for invalid rows) + - Tracks inserted vs updated counts + PSEUDOCODE: + ```python + @dataclass + class UpsertResult: + inserted_count: int + updated_count: int + rejected_count: int + errors: list[IngestRowError] + + async def upsert_sales_daily_batch( + db: AsyncSession, + records: list[SalesDailyIngestRow], + key_resolver: KeyResolver, + ) -> UpsertResult: + """Upsert sales daily records with key resolution and partial success.""" + logger = get_logger(__name__) + logger.info("ingest.sales_daily.upsert_started", batch_size=len(records)) + + # 1. Extract unique codes and SKUs + store_codes = {r.store_code for r in records} + skus = {r.sku for r in records} + + # 2. Resolve keys + store_map = await key_resolver.resolve_store_codes(db, store_codes) + product_map = await key_resolver.resolve_skus(db, skus) + + # 3. Validate and prepare rows + valid_rows = [] + errors = [] + for idx, record in enumerate(records): + store_id = store_map.get(record.store_code) + product_id = product_map.get(record.sku) + + if store_id is None: + errors.append(IngestRowError( + row_index=idx, + store_code=record.store_code, + sku=record.sku, + date=record.date, + error_code="UNKNOWN_STORE", + error_message=f"Store code '{record.store_code}' not found", + )) + continue + + if product_id is None: + errors.append(IngestRowError( + row_index=idx, + store_code=record.store_code, + sku=record.sku, + date=record.date, + error_code="UNKNOWN_PRODUCT", + error_message=f"SKU '{record.sku}' not found", + )) + continue + + valid_rows.append({ + "date": record.date, + "store_id": store_id, + "product_id": product_id, + "quantity": record.quantity, + "unit_price": record.unit_price, + "total_amount": record.total_amount, + }) + + # 4. Perform upsert for valid rows + inserted = 0 + updated = 0 + + if valid_rows: + # Use PostgreSQL INSERT...ON CONFLICT + stmt = pg_insert(SalesDaily).values(valid_rows) + stmt = stmt.on_conflict_do_update( + index_elements=["date", "store_id", "product_id"], + set_={ + "quantity": stmt.excluded.quantity, + "unit_price": stmt.excluded.unit_price, + "total_amount": stmt.excluded.total_amount, + "updated_at": func.now(), + } + ) + # Note: Counting inserted vs updated requires RETURNING + checking xmax + # Simplified: count all as "processed" for now + await db.execute(stmt) + await db.commit() + # For accurate counts, would need separate logic + inserted = len(valid_rows) # Simplified + + logger.info( + "ingest.sales_daily.upsert_completed", + inserted=inserted, + updated=updated, + rejected=len(errors), + ) + + return UpsertResult( + inserted_count=inserted, + updated_count=updated, + rejected_count=len(errors), + errors=errors, + ) + ``` + VALIDATION: + - uv run mypy app/features/ingest/service.py + - uv run pyright app/features/ingest/service.py + +Task 6: Implement ingest routes + FILE: app/features/ingest/routes.py + IMPLEMENT: + - Router with tag "ingest" + - POST /ingest/sales-daily endpoint + - Uses get_db() dependency + - Returns SalesDailyIngestResponse + PSEUDOCODE: + ```python + router = APIRouter(prefix="/ingest", tags=["ingest"]) + + @router.post( + "/sales-daily", + response_model=SalesDailyIngestResponse, + status_code=status.HTTP_200_OK, + ) + async def ingest_sales_daily( + request: SalesDailyIngestRequest, + db: AsyncSession = Depends(get_db), + ) -> SalesDailyIngestResponse: + """Batch upsert daily sales records. + + Accepts sales records with natural keys (store_code, sku). + Resolves to internal IDs and performs idempotent upsert. + + Returns counts of inserted, updated, and rejected rows. + Rejected rows include error details for debugging. + """ + logger = get_logger(__name__) + start_time = time.time() + + logger.info("ingest.sales_daily.request_received", record_count=len(request.records)) + + try: + key_resolver = KeyResolver() + result = await upsert_sales_daily_batch(db, request.records, key_resolver) + + duration_ms = (time.time() - start_time) * 1000 + + return SalesDailyIngestResponse( + inserted_count=result.inserted_count, + updated_count=result.updated_count, + rejected_count=result.rejected_count, + total_processed=len(request.records), + errors=result.errors, + duration_ms=round(duration_ms, 2), + ) + except Exception as e: + logger.error( + "ingest.sales_daily.request_failed", + error=str(e), + error_type=type(e).__name__, + exc_info=True, + ) + raise DatabaseError( + message="Failed to process sales daily ingest", + details={"error": str(e)}, + ) + ``` + VALIDATION: + - uv run mypy app/features/ingest/routes.py + - uv run pyright app/features/ingest/routes.py + +Task 7: Register ingest router in main.py + MODIFY: app/main.py + ADD: + - from app.features.ingest.routes import router as ingest_router + - app.include_router(ingest_router) + VALIDATION: + - uv run python -c "from app.main import app; print([r.path for r in app.routes])" + +Task 8: Create unit tests for schemas + FILE: app/features/ingest/tests/test_schemas.py + IMPLEMENT: + - Test SalesDailyIngestRow validation (valid inputs) + - Test SalesDailyIngestRow rejection (negative quantity, etc.) + - Test SalesDailyIngestRequest with empty list (should fail) + - Test SalesDailyIngestResponse serialization + VALIDATION: + - uv run pytest app/features/ingest/tests/test_schemas.py -v + +Task 9: Create unit tests for service + FILE: app/features/ingest/tests/test_service.py + IMPLEMENT: + - Test KeyResolver with mocked DB session + - Test upsert_sales_daily_batch with valid records + - Test partial success with unknown store_code + - Test partial success with unknown sku + VALIDATION: + - uv run pytest app/features/ingest/tests/test_service.py -v + +Task 10: Create integration tests for routes + FILE: app/features/ingest/tests/test_routes.py + IMPLEMENT: + - Test POST /ingest/sales-daily with valid payload + - Test idempotency (same payload twice = updates, not duplicates) + - Test partial success response with mixed valid/invalid rows + - Test empty records list returns 422 + REQUIRES: + - Running PostgreSQL (docker-compose up -d) + - Sample store, product, calendar data + VALIDATION: + - docker-compose up -d + - uv run alembic upgrade head + - uv run pytest app/features/ingest/tests/test_routes.py -v -m integration + - docker-compose down + +Task 11: Create HTTP example file + FILE: examples/api/ingest_sales_daily.http + CONTENT: + ```http + ### Ingest Sales Daily - Happy Path + POST {{API_BASE_URL}}/ingest/sales-daily + Content-Type: application/json + + { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + }, + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-002", + "quantity": 5, + "unit_price": 19.99, + "total_amount": 99.95 + } + ] + } + + ### Expected Response: + # { + # "inserted_count": 2, + # "updated_count": 0, + # "rejected_count": 0, + # "total_processed": 2, + # "errors": [], + # "duration_ms": 45.23 + # } + + ### Ingest Sales Daily - Replay (Idempotent) + # Running the same request again should update, not duplicate + POST {{API_BASE_URL}}/ingest/sales-daily + Content-Type: application/json + + { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 15, + "unit_price": 9.99, + "total_amount": 149.85 + } + ] + } + + ### Expected Response: + # { + # "inserted_count": 0, + # "updated_count": 1, + # "rejected_count": 0, + # "total_processed": 1, + # "errors": [], + # "duration_ms": 32.15 + # } + + ### Ingest Sales Daily - Partial Success + POST {{API_BASE_URL}}/ingest/sales-daily + Content-Type: application/json + + { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + }, + { + "date": "2024-01-15", + "store_code": "UNKNOWN", + "sku": "SKU-001", + "quantity": 5, + "unit_price": 9.99, + "total_amount": 49.95 + } + ] + } + + ### Expected Response: + # { + # "inserted_count": 1, + # "updated_count": 0, + # "rejected_count": 1, + # "total_processed": 2, + # "errors": [ + # { + # "row_index": 1, + # "store_code": "UNKNOWN", + # "sku": "SKU-001", + # "date": "2024-01-15", + # "error_code": "UNKNOWN_STORE", + # "error_message": "Store code 'UNKNOWN' not found" + # } + # ], + # "duration_ms": 38.45 + # } + ``` + VALIDATION: + - ls -la examples/api/ingest_sales_daily.http + +Task 12: Create feature-specific test fixtures + FILE: app/features/ingest/tests/conftest.py + IMPLEMENT: + - sample_ingest_row fixture + - sample_ingest_request fixture + - mock_key_resolver fixture for unit tests + VALIDATION: + - uv run pytest app/features/ingest/tests/ --collect-only + +Task 13: Final validation - Run all quality gates + COMMANDS: + - uv run ruff check app/features/ingest/ --fix + - uv run ruff format app/features/ingest/ + - uv run mypy app/features/ingest/ + - uv run pyright app/features/ingest/ + - uv run pytest app/features/ingest/tests/ -v + - docker-compose up -d && sleep 5 + - uv run alembic upgrade head + - uv run pytest app/features/ingest/tests/ -v -m integration + - docker-compose down +``` + +### Integration Points + +```yaml +DATABASE: + - No new migrations required (uses existing SalesDaily, Store, Product tables) + - Uses existing grain constraint: uq_sales_daily_grain(date, store_id, product_id) + +CONFIG: + - MODIFY: app/core/config.py + - ADD: INGEST_BATCH_SIZE (default: 1000) + - ADD: INGEST_TIMEOUT_SECONDS (default: 60) + +ROUTES: + - MODIFY: app/main.py + - ADD: app.include_router(ingest_router) + - ENDPOINT: POST /ingest/sales-daily + +DEPENDENCIES: + - Store table must have stores with matching codes + - Product table must have products with matching SKUs + - Calendar table must have entries for dates in payload (FK constraint) +``` + +--- + +## Validation Loop + +### Level 1: Syntax & Style + +```bash +# Run FIRST - fix any errors before proceeding +uv run ruff check app/features/ingest/ --fix +uv run ruff format app/features/ingest/ + +# Expected: No errors +``` + +### Level 2: Type Checking + +```bash +# Run SECOND - type safety is non-negotiable +uv run mypy app/features/ingest/ +uv run pyright app/features/ingest/ + +# Expected: 0 errors, 0 warnings +``` + +### Level 3: Unit Tests + +```bash +# Run THIRD - verify schemas and service logic +uv run pytest app/features/ingest/tests/test_schemas.py -v +uv run pytest app/features/ingest/tests/test_service.py -v + +# Expected: All tests pass +``` + +### Level 4: Integration Tests + +```bash +# Run FOURTH - verify API and database behavior +docker-compose up -d +sleep 5 +uv run alembic upgrade head + +# Seed test data (stores, products, calendar) +uv run python examples/seed_demo_data.py + +# Run integration tests +uv run pytest app/features/ingest/tests/test_routes.py -v -m integration + +docker-compose down + +# Expected: All tests pass +``` + +### Level 5: Manual API Test + +```bash +# Start API server +uv run uvicorn app.main:app --reload --port 8123 + +# In another terminal, test endpoint +curl -X POST http://localhost:8123/ingest/sales-daily \ + -H "Content-Type: application/json" \ + -d '{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] + }' + +# Expected: {"inserted_count":1,"updated_count":0,"rejected_count":0,...} +``` + +--- + +## Final Validation Checklist + +- [ ] `uv run ruff check app/features/ingest/` passes with no errors +- [ ] `uv run ruff format --check app/features/ingest/` passes +- [ ] `uv run mypy app/features/ingest/` passes with 0 errors +- [ ] `uv run pyright app/features/ingest/` passes with 0 errors +- [ ] `uv run pytest app/features/ingest/tests/test_schemas.py -v` all tests pass +- [ ] `uv run pytest app/features/ingest/tests/test_service.py -v` all tests pass +- [ ] `uv run pytest app/features/ingest/tests/test_routes.py -v -m integration` all tests pass +- [ ] POST /ingest/sales-daily returns correct response structure +- [ ] Replay same payload = updated_count > 0, no duplicates in DB +- [ ] Unknown store_code returns UNKNOWN_STORE error for that row +- [ ] Unknown sku returns UNKNOWN_PRODUCT error for that row +- [ ] Valid rows processed despite invalid rows in same batch +- [ ] Logs follow `ingest.{component}.{action}_{state}` naming convention +- [ ] Duration tracked in response and logs +- [ ] `examples/api/ingest_sales_daily.http` created with examples + +--- + +## Anti-Patterns to Avoid + +- ❌ **Don't** fail entire batch on one bad row — use partial success pattern +- ❌ **Don't** use `session.add()` for bulk inserts — use `pg_insert().values()` +- ❌ **Don't** resolve keys one-by-one — batch lookup with `WHERE code IN (...)` +- ❌ **Don't** skip calendar FK validation — dates must exist in calendar table +- ❌ **Don't** use float for money — use `Decimal` with explicit precision +- ❌ **Don't** hardcode batch sizes — make configurable via Settings +- ❌ **Don't** catch generic `Exception` and hide errors — log and re-raise appropriately +- ❌ **Don't** skip type hints — strict mypy/pyright enforcement +- ❌ **Don't** use sync DB operations — all database calls must be async + +--- + +## Confidence Score: 8/10 + +**Rationale:** + +- (+) Clear endpoint contract with natural key → ID resolution +- (+) Idempotent upsert pattern well-documented for PostgreSQL +- (+) Partial success pattern handles mixed valid/invalid rows +- (+) Follows existing codebase patterns (vertical slice, schemas, logging) +- (+) Comprehensive test strategy (unit + integration) +- (+) Type-safe throughout with Pydantic v2 and SQLAlchemy 2.0 +- (+) Configurable batch size and timeout +- (-) Calendar FK constraint requires calendar entries to exist (may need seeding) +- (-) Accurate inserted vs updated count requires additional logic (xmax trick or two-phase) +- (-) Large batch performance (>10k rows) may need staging table optimization (YAGNI for now) + +**Recommended Approach:** + +1. Execute tasks 1-3 (directory structure, config, schemas) +2. Run type checkers after each file +3. Execute tasks 4-6 (service, routes) +4. Run unit tests +5. Execute task 7 (register router) +6. Execute tasks 8-10 (all tests) +7. Execute tasks 11-12 (examples, fixtures) +8. Run full validation loop + +--- + +## Version + +- **PRP Version:** 1.0 +- **Target INITIAL:** INITIAL-3.md (Ingest Layer) +- **Created:** 2026-01-26 +- **Author:** Claude Code + +--- + +## References + +### SQLAlchemy 2.0 +- [PostgreSQL Dialect - INSERT ON CONFLICT](https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert) +- [ORM-Enabled INSERT/UPDATE/DELETE](https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html) + +### PostgreSQL +- [INSERT Statement with ON CONFLICT](https://www.postgresql.org/docs/current/sql-insert.html) +- [PostgreSQL Upsert Guide](https://www.dbvis.com/thetable/postgresql-upsert-insert-on-conflict-guide/) + +### FastAPI + asyncpg +- [FastAPI SQLAlchemy asyncpg Example](https://github.com/grillazz/fastapi-sqlalchemy-asyncpg) +- [Building High-Performance Async APIs](https://leapcell.io/blog/building-high-performance-async-apis-with-fastapi-sqlalchemy-2-0-and-asyncpg) + +### Pydantic v2 +- [Field Validators](https://docs.pydantic.dev/latest/concepts/validators/) + +### Performance Optimization (Future) +- [Staging Tables for Faster Bulk Upserts](https://overflow.no/blog/2025/1/5/using-staging-tables-for-faster-bulk-upserts-with-python-and-postgresql/) diff --git a/app/core/config.py b/app/core/config.py index 3ea7d9ad..95b04106 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -31,6 +31,10 @@ class Settings(BaseSettings): api_host: str = "0.0.0.0" # noqa: S104 api_port: int = 8123 + # Ingest + ingest_batch_size: int = 1000 + ingest_timeout_seconds: int = 60 + @property def is_development(self) -> bool: """Check if running in development mode.""" diff --git a/app/features/ingest/__init__.py b/app/features/ingest/__init__.py new file mode 100644 index 00000000..c960fd85 --- /dev/null +++ b/app/features/ingest/__init__.py @@ -0,0 +1,20 @@ +"""Ingest feature module for idempotent batch upserts.""" + +from app.features.ingest.routes import router +from app.features.ingest.schemas import ( + IngestRowError, + SalesDailyIngestRequest, + SalesDailyIngestResponse, + SalesDailyIngestRow, +) +from app.features.ingest.service import KeyResolver, upsert_sales_daily_batch + +__all__ = [ + "IngestRowError", + "KeyResolver", + "SalesDailyIngestRequest", + "SalesDailyIngestResponse", + "SalesDailyIngestRow", + "router", + "upsert_sales_daily_batch", +] diff --git a/app/features/ingest/routes.py b/app/features/ingest/routes.py new file mode 100644 index 00000000..8a59b6a8 --- /dev/null +++ b/app/features/ingest/routes.py @@ -0,0 +1,97 @@ +"""Ingest API routes for batch upsert operations.""" + +import time + +from fastapi import APIRouter, Depends, status +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_db +from app.core.exceptions import DatabaseError +from app.core.logging import get_logger +from app.features.ingest.schemas import ( + SalesDailyIngestRequest, + SalesDailyIngestResponse, +) +from app.features.ingest.service import KeyResolver, upsert_sales_daily_batch + +logger = get_logger(__name__) + +router = APIRouter(prefix="/ingest", tags=["ingest"]) + + +@router.post( + "/sales-daily", + response_model=SalesDailyIngestResponse, + status_code=status.HTTP_200_OK, + summary="Batch upsert daily sales records", + description=""" +Batch upsert daily sales records using natural keys. + +Accepts sales records with store_code and sku (natural keys). +Resolves to internal IDs and performs idempotent upsert using +PostgreSQL ON CONFLICT DO UPDATE. + +**Idempotency:** Running the same request twice will update existing +records rather than create duplicates. The grain (date, store_id, product_id) +is enforced by a unique constraint. + +**Partial Success:** Invalid rows (unknown store_code, sku, or date) are +rejected while valid rows are processed. Response includes counts and +error details for rejected rows. +""", +) +async def ingest_sales_daily( + request: SalesDailyIngestRequest, + db: AsyncSession = Depends(get_db), +) -> SalesDailyIngestResponse: + """Batch upsert daily sales records. + + Args: + request: Sales ingest request with list of records. + db: Async database session from dependency. + + Returns: + Response with processed, rejected counts and error details. + + Raises: + DatabaseError: If database operation fails unexpectedly. + """ + start_time = time.perf_counter() + + logger.info( + "ingest.sales_daily.request_received", + record_count=len(request.records), + ) + + try: + key_resolver = KeyResolver() + result = await upsert_sales_daily_batch(db, request.records, key_resolver) + + duration_ms = (time.perf_counter() - start_time) * 1000 + + logger.info( + "ingest.sales_daily.request_completed", + processed=result.processed_count, + rejected=result.rejected_count, + duration_ms=round(duration_ms, 2), + ) + + return SalesDailyIngestResponse( + processed_count=result.processed_count, + rejected_count=result.rejected_count, + total_received=len(request.records), + errors=result.errors, + duration_ms=round(duration_ms, 2), + ) + except SQLAlchemyError as e: + logger.error( + "ingest.sales_daily.request_failed", + error=str(e), + error_type=type(e).__name__, + exc_info=True, + ) + raise DatabaseError( + message="Failed to process sales daily ingest", + details={"error": str(e)}, + ) from e diff --git a/app/features/ingest/schemas.py b/app/features/ingest/schemas.py new file mode 100644 index 00000000..7657f75f --- /dev/null +++ b/app/features/ingest/schemas.py @@ -0,0 +1,73 @@ +"""Pydantic schemas for ingest API.""" + +from datetime import date as date_type +from decimal import Decimal + +from pydantic import BaseModel, Field + + +class SalesDailyIngestRow(BaseModel): + """Single row in sales daily ingest payload. + + Uses natural keys (store_code, sku) instead of internal IDs. + Service resolves these to store_id, product_id before upsert. + + Note: total_amount is accepted as-is without validation against + quantity * unit_price, as source systems may apply discounts or + rounding that we trust. + """ + + date: date_type + store_code: str = Field( + ..., min_length=1, max_length=20, description="Store code (natural key)" + ) + sku: str = Field(..., min_length=1, max_length=50, description="Product SKU (natural key)") + quantity: int = Field(..., ge=0, description="Units sold (non-negative)") + unit_price: Decimal = Field(..., ge=0, decimal_places=2, description="Price per unit") + total_amount: Decimal = Field( + ..., + ge=0, + decimal_places=2, + description="Total sales amount (trusted as-is from source system)", + ) + + +class SalesDailyIngestRequest(BaseModel): + """Request body for POST /ingest/sales-daily.""" + + records: list[SalesDailyIngestRow] = Field( + ..., + min_length=1, + max_length=10000, + description="Sales records to upsert", + ) + + +class IngestRowError(BaseModel): + """Error detail for a single rejected row.""" + + row_index: int = Field(..., description="0-based index of the failed row") + store_code: str = Field(..., description="Store code from the row") + sku: str = Field(..., description="SKU from the row") + date: date_type = Field(..., description="Date from the row") + error_code: str = Field(..., description="Machine-readable error code") + error_message: str = Field(..., description="Human-readable error message") + + +class SalesDailyIngestResponse(BaseModel): + """Response body for POST /ingest/sales-daily. + + Note: Due to PostgreSQL ON CONFLICT semantics, we cannot distinguish + inserts from updates without additional complexity. The `processed_count` + field represents all rows successfully written (inserted or updated). + """ + + processed_count: int = Field( + ..., ge=0, description="Number of rows successfully written (inserted or updated)" + ) + rejected_count: int = Field(..., ge=0, description="Number of rows rejected") + total_received: int = Field(..., ge=0, description="Total rows received in request") + errors: "list[IngestRowError]" = Field( # pyright: ignore[reportUnknownVariableType] + default_factory=list, description="Details of rejected rows" + ) + duration_ms: float = Field(..., ge=0, description="Processing duration in milliseconds") diff --git a/app/features/ingest/service.py b/app/features/ingest/service.py new file mode 100644 index 00000000..d182dfa4 --- /dev/null +++ b/app/features/ingest/service.py @@ -0,0 +1,234 @@ +"""Ingest service with key resolution and batch upsert logic.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import date as date_type +from typing import Any, Protocol, runtime_checkable + +from sqlalchemy import func, select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logging import get_logger +from app.features.data_platform.models import Calendar, Product, SalesDaily, Store +from app.features.ingest.schemas import IngestRowError, SalesDailyIngestRow + +logger = get_logger(__name__) + + +@runtime_checkable +class KeyResolverProtocol(Protocol): + """Protocol for key resolution services.""" + + async def resolve_store_codes(self, db: AsyncSession, codes: set[str]) -> dict[str, int]: + """Resolve store codes to store IDs.""" + ... + + async def resolve_skus(self, db: AsyncSession, skus: set[str]) -> dict[str, int]: + """Resolve SKUs to product IDs.""" + ... + + async def resolve_dates(self, db: AsyncSession, dates: set[date_type]) -> set[date_type]: + """Check which dates exist in the calendar table.""" + ... + + +class KeyResolver: + """Resolves natural keys (store_code, sku) to internal database IDs.""" + + async def resolve_store_codes(self, db: AsyncSession, codes: set[str]) -> dict[str, int]: + """Resolve store codes to store IDs. + + Args: + db: Async database session. + codes: Set of store codes to resolve. + + Returns: + Dictionary mapping store_code -> store_id for found stores. + """ + if not codes: + return {} + + stmt = select(Store.code, Store.id).where(Store.code.in_(codes)) + result = await db.execute(stmt) + return {row.code: row.id for row in result} + + async def resolve_skus(self, db: AsyncSession, skus: set[str]) -> dict[str, int]: + """Resolve SKUs to product IDs. + + Args: + db: Async database session. + skus: Set of SKUs to resolve. + + Returns: + Dictionary mapping sku -> product_id for found products. + """ + if not skus: + return {} + + stmt = select(Product.sku, Product.id).where(Product.sku.in_(skus)) + result = await db.execute(stmt) + return {row.sku: row.id for row in result} + + async def resolve_dates(self, db: AsyncSession, dates: set[date_type]) -> set[date_type]: + """Check which dates exist in the calendar table. + + Args: + db: Async database session. + dates: Set of dates to check. + + Returns: + Set of dates that exist in the calendar table. + """ + if not dates: + return set() + + stmt = select(Calendar.date).where(Calendar.date.in_(dates)) + result = await db.execute(stmt) + return {row.date for row in result} + + +@dataclass +class UpsertResult: + """Result of batch upsert operation. + + Note: Due to PostgreSQL ON CONFLICT semantics, we cannot distinguish + inserts from updates without additional complexity (xmax check). + The processed_count represents all rows successfully written. + """ + + processed_count: int = 0 + rejected_count: int = 0 + errors: list[IngestRowError] = field( # pyright: ignore[reportUnknownVariableType] + default_factory=list + ) + + +async def upsert_sales_daily_batch( + db: AsyncSession, + records: list[SalesDailyIngestRow], + key_resolver: KeyResolverProtocol, +) -> UpsertResult: + """Upsert sales daily records with key resolution and partial success. + + Resolves natural keys (store_code, sku) to internal IDs, validates + calendar dates exist, then performs idempotent upsert using + PostgreSQL's ON CONFLICT DO UPDATE. + + Args: + db: Async database session. + records: List of sales records with natural keys. + key_resolver: KeyResolver instance for ID lookups. + + Returns: + UpsertResult with counts and error details. + """ + logger.info("ingest.sales_daily.upsert_started", batch_size=len(records)) + + # Extract unique codes, SKUs, and dates + store_codes = {r.store_code for r in records} + skus = {r.sku for r in records} + dates = {r.date for r in records} + + # Resolve all keys in batch + store_map = await key_resolver.resolve_store_codes(db, store_codes) + product_map = await key_resolver.resolve_skus(db, skus) + valid_dates = await key_resolver.resolve_dates(db, dates) + + # Validate and prepare rows + valid_rows: list[dict[str, Any]] = [] + errors: list[IngestRowError] = [] + + for idx, record in enumerate(records): + store_id = store_map.get(record.store_code) + product_id = product_map.get(record.sku) + date_exists = record.date in valid_dates + + # Check for unknown store + if store_id is None: + errors.append( + IngestRowError( + row_index=idx, + store_code=record.store_code, + sku=record.sku, + date=record.date, + error_code="UNKNOWN_STORE", + error_message=f"Store code '{record.store_code}' not found", + ) + ) + continue + + # Check for unknown product + if product_id is None: + errors.append( + IngestRowError( + row_index=idx, + store_code=record.store_code, + sku=record.sku, + date=record.date, + error_code="UNKNOWN_PRODUCT", + error_message=f"SKU '{record.sku}' not found", + ) + ) + continue + + # Check for missing calendar date + if not date_exists: + errors.append( + IngestRowError( + row_index=idx, + store_code=record.store_code, + sku=record.sku, + date=record.date, + error_code="UNKNOWN_DATE", + error_message=f"Date '{record.date}' not found in calendar", + ) + ) + continue + + valid_rows.append( + { + "date": record.date, + "store_id": store_id, + "product_id": product_id, + "quantity": record.quantity, + "unit_price": record.unit_price, + "total_amount": record.total_amount, + } + ) + + # Perform upsert for valid rows + processed = 0 + + if valid_rows: + # Use PostgreSQL INSERT...ON CONFLICT DO UPDATE + insert_stmt = pg_insert(SalesDaily).values(valid_rows) + upsert_stmt = insert_stmt.on_conflict_do_update( + index_elements=["date", "store_id", "product_id"], + set_={ + "quantity": insert_stmt.excluded.quantity, + "unit_price": insert_stmt.excluded.unit_price, + "total_amount": insert_stmt.excluded.total_amount, + "updated_at": func.now(), + }, + ) + + # Execute and get row count (more efficient than fetchall) + # Note: rowcount exists on CursorResult but SQLAlchemy's generic Result type + # doesn't expose it in its type stubs (available at runtime for DML operations) + cursor_result = await db.execute(upsert_stmt) + processed = int(cursor_result.rowcount) if cursor_result.rowcount else 0 # type: ignore[attr-defined] # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType] + + logger.info( + "ingest.sales_daily.upsert_completed", + processed=processed, + rejected=len(errors), + total_valid=len(valid_rows), + ) + + return UpsertResult( + processed_count=processed, + rejected_count=len(errors), + errors=errors, + ) diff --git a/app/features/ingest/tests/__init__.py b/app/features/ingest/tests/__init__.py new file mode 100644 index 00000000..7b260e4f --- /dev/null +++ b/app/features/ingest/tests/__init__.py @@ -0,0 +1 @@ +"""Ingest feature tests.""" diff --git a/app/features/ingest/tests/conftest.py b/app/features/ingest/tests/conftest.py new file mode 100644 index 00000000..aeedfddc --- /dev/null +++ b/app/features/ingest/tests/conftest.py @@ -0,0 +1,112 @@ +"""Feature-specific test fixtures for ingest module.""" + +from datetime import date +from decimal import Decimal +from typing import Any + +import pytest + +from app.features.ingest.schemas import SalesDailyIngestRequest, SalesDailyIngestRow + + +class MockKeyResolver: + """Mock KeyResolver for testing with predefined mappings.""" + + def __init__( + self, + store_map: dict[str, int] | None = None, + product_map: dict[str, int] | None = None, + valid_dates: set[date] | None = None, + ) -> None: + self._store_map = store_map or {"S001": 1, "S002": 2} + self._product_map = product_map or {"SKU-001": 101, "SKU-002": 102, "SKU-003": 103} + self._valid_dates = valid_dates or {date(2024, 1, 15), date(2024, 1, 16)} + + async def resolve_store_codes(self, db: Any, codes: set[str]) -> dict[str, int]: + return {code: self._store_map[code] for code in codes if code in self._store_map} + + async def resolve_skus(self, db: Any, skus: set[str]) -> dict[str, int]: + return {sku: self._product_map[sku] for sku in skus if sku in self._product_map} + + async def resolve_dates(self, db: Any, dates: set[date]) -> set[date]: + return {d for d in dates if d in self._valid_dates} + + +@pytest.fixture +def sample_ingest_row() -> SalesDailyIngestRow: + """Create a sample valid ingest row.""" + return SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + + +@pytest.fixture +def sample_ingest_request(sample_ingest_row) -> SalesDailyIngestRequest: + """Create a sample valid ingest request.""" + return SalesDailyIngestRequest( + records=[ + sample_ingest_row, + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-002", + quantity=5, + unit_price=Decimal("19.99"), + total_amount=Decimal("99.95"), + ), + ] + ) + + +@pytest.fixture +def mock_key_resolver() -> MockKeyResolver: + """Create a mock KeyResolver with predefined mappings.""" + return MockKeyResolver() + + +@pytest.fixture +def sample_ingest_rows_mixed() -> list[SalesDailyIngestRow]: + """Create a list of ingest rows with mixed valid/invalid data.""" + return [ + # Valid row + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + # Unknown store + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="UNKNOWN", + sku="SKU-001", + quantity=5, + unit_price=Decimal("9.99"), + total_amount=Decimal("49.95"), + ), + # Unknown product + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="UNKNOWN-SKU", + quantity=3, + unit_price=Decimal("5.00"), + total_amount=Decimal("15.00"), + ), + # Another valid row + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S002", + sku="SKU-002", + quantity=7, + unit_price=Decimal("15.00"), + total_amount=Decimal("105.00"), + ), + ] diff --git a/app/features/ingest/tests/test_routes.py b/app/features/ingest/tests/test_routes.py new file mode 100644 index 00000000..6facf362 --- /dev/null +++ b/app/features/ingest/tests/test_routes.py @@ -0,0 +1,373 @@ +"""Integration tests for ingest API routes. + +These tests require a running PostgreSQL database (docker-compose up -d). +""" + +from datetime import date +from decimal import Decimal + +import pytest +from httpx import ASGITransport, AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.core.config import get_settings +from app.core.database import Base +from app.features.data_platform.models import Calendar, Product, SalesDaily, Store +from app.main import app + + +@pytest.fixture +async def db_session(): + """Create async database session for integration tests. + + Creates all tables, provides a session, and cleans up after. + Requires PostgreSQL to be running (docker-compose up -d). + """ + settings = get_settings() + engine = create_async_engine(settings.database_url, echo=False) + + # Create tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + # Create session + async_session_maker = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + ) + + async with async_session_maker() as session: + try: + yield session + finally: + await session.rollback() + + # Cleanup: drop all tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + await engine.dispose() + + +@pytest.fixture +async def seed_data(db_session: AsyncSession): + """Seed test data for ingest integration tests.""" + # Create stores + stores = [ + Store(code="S001", name="Store 1", region="North", city="City A", store_type="supermarket"), + Store(code="S002", name="Store 2", region="South", city="City B", store_type="express"), + ] + db_session.add_all(stores) + + # Create products + products = [ + Product( + sku="SKU-001", + name="Product 1", + category="Category A", + brand="Brand A", + base_price=Decimal("9.99"), + base_cost=Decimal("5.00"), + ), + Product( + sku="SKU-002", + name="Product 2", + category="Category A", + brand="Brand B", + base_price=Decimal("19.99"), + base_cost=Decimal("10.00"), + ), + ] + db_session.add_all(products) + + # Create calendar entries + calendars = [ + Calendar( + date=date(2024, 1, 15), + day_of_week=0, + month=1, + quarter=1, + year=2024, + is_holiday=False, + ), + Calendar( + date=date(2024, 1, 16), + day_of_week=1, + month=1, + quarter=1, + year=2024, + is_holiday=False, + ), + ] + db_session.add_all(calendars) + + await db_session.commit() + + return {"stores": stores, "products": products, "calendars": calendars} + + +@pytest.fixture +async def client(db_session: AsyncSession): + """Create async test client with shared database session. + + Overrides the get_db dependency to use the test session, + ensuring test data is visible to the API. + """ + from app.core.database import get_db + + async def override_get_db(): + yield db_session + + app.dependency_overrides[get_db] = override_get_db + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + # Clean up override after test + app.dependency_overrides.clear() + + +@pytest.mark.integration +class TestIngestSalesDaily: + """Integration tests for POST /ingest/sales-daily endpoint.""" + + @pytest.mark.asyncio + async def test_ingest_valid_records(self, client, db_session, seed_data): + """Test ingesting valid sales records.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-002", + "quantity": 5, + "unit_price": "19.99", + "total_amount": "99.95", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["processed_count"] == 2 + assert data["rejected_count"] == 0 + assert data["total_received"] == 2 + assert data["errors"] == [] + assert data["duration_ms"] >= 0 + + # Verify data in database + result = await db_session.execute(select(SalesDaily)) + sales_records = result.scalars().all() + assert len(sales_records) == 2 + + @pytest.mark.asyncio + async def test_ingest_idempotency(self, client, db_session, seed_data): + """Test that running same ingest twice updates rather than duplicates.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + # First ingest + response1 = await client.post("/ingest/sales-daily", json=payload) + assert response1.status_code == 200 + data1 = response1.json() + assert data1["processed_count"] == 1 + + # Verify one record exists + result = await db_session.execute(select(SalesDaily)) + records_after_first = result.scalars().all() + assert len(records_after_first) == 1 + assert records_after_first[0].quantity == 10 + + # Second ingest with updated quantity + payload["records"][0]["quantity"] = 15 + payload["records"][0]["total_amount"] = "149.85" + + response2 = await client.post("/ingest/sales-daily", json=payload) + assert response2.status_code == 200 + + # Verify still only one record (updated, not duplicated) + db_session.expire_all() # Synchronous method + result = await db_session.execute(select(SalesDaily)) + records_after_second = result.scalars().all() + assert len(records_after_second) == 1 + assert records_after_second[0].quantity == 15 + + @pytest.mark.asyncio + async def test_ingest_partial_success(self, client, db_session, seed_data): + """Test partial success with mixed valid/invalid rows.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + { + "date": "2024-01-15", + "store_code": "UNKNOWN", # Invalid store + "sku": "SKU-001", + "quantity": 5, + "unit_price": "9.99", + "total_amount": "49.95", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["processed_count"] == 1 + assert data["rejected_count"] == 1 + assert data["total_received"] == 2 + assert len(data["errors"]) == 1 + assert data["errors"][0]["error_code"] == "UNKNOWN_STORE" + assert data["errors"][0]["row_index"] == 1 + + @pytest.mark.asyncio + async def test_ingest_unknown_store(self, client, db_session, seed_data): + """Test that unknown store code returns error.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "NONEXISTENT", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["processed_count"] == 0 + assert data["rejected_count"] == 1 + assert data["errors"][0]["error_code"] == "UNKNOWN_STORE" + + @pytest.mark.asyncio + async def test_ingest_unknown_product(self, client, db_session, seed_data): + """Test that unknown SKU returns error.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "NONEXISTENT-SKU", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["processed_count"] == 0 + assert data["rejected_count"] == 1 + assert data["errors"][0]["error_code"] == "UNKNOWN_PRODUCT" + + @pytest.mark.asyncio + async def test_ingest_unknown_date(self, client, db_session, seed_data): + """Test that date not in calendar returns error.""" + payload = { + "records": [ + { + "date": "2024-12-31", # Not in seeded calendar + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + data = response.json() + assert data["processed_count"] == 0 + assert data["rejected_count"] == 1 + assert data["errors"][0]["error_code"] == "UNKNOWN_DATE" + + @pytest.mark.asyncio + async def test_ingest_empty_records_rejected(self, client, db_session, seed_data): + """Test that empty records list returns 422.""" + payload: dict[str, list[dict[str, str]]] = {"records": []} + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 422 # Validation error + + @pytest.mark.asyncio + async def test_ingest_negative_quantity_rejected(self, client, db_session, seed_data): + """Test that negative quantity returns 422.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": -5, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 422 # Validation error + + @pytest.mark.asyncio + async def test_ingest_response_has_request_id(self, client, db_session, seed_data): + """Test that response has X-Request-ID header.""" + payload = { + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": "9.99", + "total_amount": "99.90", + }, + ] + } + + response = await client.post("/ingest/sales-daily", json=payload) + + assert response.status_code == 200 + assert "x-request-id" in response.headers diff --git a/app/features/ingest/tests/test_schemas.py b/app/features/ingest/tests/test_schemas.py new file mode 100644 index 00000000..d09d7a3c --- /dev/null +++ b/app/features/ingest/tests/test_schemas.py @@ -0,0 +1,285 @@ +"""Unit tests for ingest schemas.""" + +from datetime import date +from decimal import Decimal + +import pytest +from pydantic import ValidationError + +from app.features.ingest.schemas import ( + IngestRowError, + SalesDailyIngestRequest, + SalesDailyIngestResponse, + SalesDailyIngestRow, +) + + +class TestSalesDailyIngestRow: + """Tests for SalesDailyIngestRow schema.""" + + def test_valid_row(self): + """Test valid row creation.""" + row = SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert row.date == date(2024, 1, 15) + assert row.store_code == "S001" + assert row.sku == "SKU-001" + assert row.quantity == 10 + assert row.unit_price == Decimal("9.99") + assert row.total_amount == Decimal("99.90") + + def test_negative_quantity_rejected(self): + """Test that negative quantity is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=-5, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert "quantity" in str(exc_info.value) + + def test_negative_unit_price_rejected(self): + """Test that negative unit price is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("-9.99"), + total_amount=Decimal("99.90"), + ) + assert "unit_price" in str(exc_info.value) + + def test_negative_total_amount_rejected(self): + """Test that negative total amount is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("-99.90"), + ) + assert "total_amount" in str(exc_info.value) + + def test_empty_store_code_rejected(self): + """Test that empty store code is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert "store_code" in str(exc_info.value) + + def test_empty_sku_rejected(self): + """Test that empty SKU is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert "sku" in str(exc_info.value) + + def test_store_code_max_length(self): + """Test that store code exceeding max length is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S" * 21, # max is 20 + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert "store_code" in str(exc_info.value) + + def test_sku_max_length(self): + """Test that SKU exceeding max length is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU" * 20, # max is 50 + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + assert "sku" in str(exc_info.value) + + def test_zero_quantity_allowed(self): + """Test that zero quantity is allowed.""" + row = SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=0, + unit_price=Decimal("9.99"), + total_amount=Decimal("0.00"), + ) + assert row.quantity == 0 + + def test_total_amount_consistency_validation_passes(self): + """Test that total amount consistency validation passes for matching values.""" + row = SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + # Should pass even with slight mismatch + assert row.total_amount == Decimal("99.90") + + def test_total_amount_allows_mismatch_within_tolerance(self): + """Test that small mismatches in total amount are allowed.""" + # Expected: 10 * 9.99 = 99.90, actual: 99.91 (within 0.01 tolerance) + row = SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.91"), + ) + assert row.total_amount == Decimal("99.91") + + +class TestSalesDailyIngestRequest: + """Tests for SalesDailyIngestRequest schema.""" + + def test_valid_request_single_record(self): + """Test valid request with single record.""" + request = SalesDailyIngestRequest( + records=[ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ) + ] + ) + assert len(request.records) == 1 + + def test_valid_request_multiple_records(self): + """Test valid request with multiple records.""" + request = SalesDailyIngestRequest( + records=[ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-002", + quantity=5, + unit_price=Decimal("19.99"), + total_amount=Decimal("99.95"), + ), + ] + ) + assert len(request.records) == 2 + + def test_empty_records_rejected(self): + """Test that empty records list is rejected.""" + with pytest.raises(ValidationError) as exc_info: + SalesDailyIngestRequest(records=[]) + assert "records" in str(exc_info.value) + + +class TestIngestRowError: + """Tests for IngestRowError schema.""" + + def test_error_serialization(self): + """Test that error can be serialized.""" + error = IngestRowError( + row_index=1, + store_code="UNKNOWN", + sku="SKU-001", + date=date(2024, 1, 15), + error_code="UNKNOWN_STORE", + error_message="Store code 'UNKNOWN' not found", + ) + data = error.model_dump() + assert data["row_index"] == 1 + assert data["error_code"] == "UNKNOWN_STORE" + assert "UNKNOWN" in data["error_message"] + + +class TestSalesDailyIngestResponse: + """Tests for SalesDailyIngestResponse schema.""" + + def test_response_serialization(self): + """Test that response can be serialized.""" + response = SalesDailyIngestResponse( + processed_count=7, + rejected_count=1, + total_received=8, + errors=[ + IngestRowError( + row_index=7, + store_code="BAD", + sku="SKU-001", + date=date(2024, 1, 15), + error_code="UNKNOWN_STORE", + error_message="Store code 'BAD' not found", + ) + ], + duration_ms=45.23, + ) + data = response.model_dump() + assert data["processed_count"] == 7 + assert data["rejected_count"] == 1 + assert data["total_received"] == 8 + assert len(data["errors"]) == 1 + assert data["duration_ms"] == 45.23 + + def test_response_with_no_errors(self): + """Test response with no errors.""" + response = SalesDailyIngestResponse( + processed_count=10, + rejected_count=0, + total_received=10, + errors=[], + duration_ms=30.0, + ) + assert response.rejected_count == 0 + assert len(response.errors) == 0 + + def test_response_counts_non_negative(self): + """Test that negative counts are rejected.""" + with pytest.raises(ValidationError): + SalesDailyIngestResponse( + processed_count=-1, + rejected_count=0, + total_received=0, + errors=[], + duration_ms=0.0, + ) diff --git a/app/features/ingest/tests/test_service.py b/app/features/ingest/tests/test_service.py new file mode 100644 index 00000000..9af36511 --- /dev/null +++ b/app/features/ingest/tests/test_service.py @@ -0,0 +1,321 @@ +"""Unit tests for ingest service.""" + +from datetime import date +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.features.ingest.schemas import SalesDailyIngestRow +from app.features.ingest.service import KeyResolver, UpsertResult, upsert_sales_daily_batch + + +class TestKeyResolver: + """Tests for KeyResolver class.""" + + @pytest.mark.asyncio + async def test_resolve_store_codes_returns_mapping(self): + """Test that resolve_store_codes returns correct mapping.""" + # Setup mock session + mock_session = AsyncMock() + mock_result = MagicMock() + mock_result.__iter__ = lambda self: iter( + [ + MagicMock(code="S001", id=1), + MagicMock(code="S002", id=2), + ] + ) + mock_session.execute.return_value = mock_result + + resolver = KeyResolver() + result = await resolver.resolve_store_codes(mock_session, {"S001", "S002"}) + + assert result == {"S001": 1, "S002": 2} + mock_session.execute.assert_called_once() + + @pytest.mark.asyncio + async def test_resolve_store_codes_empty_set(self): + """Test that resolve_store_codes handles empty set.""" + mock_session = AsyncMock() + resolver = KeyResolver() + result = await resolver.resolve_store_codes(mock_session, set()) + + assert result == {} + mock_session.execute.assert_not_called() + + @pytest.mark.asyncio + async def test_resolve_skus_returns_mapping(self): + """Test that resolve_skus returns correct mapping.""" + mock_session = AsyncMock() + mock_result = MagicMock() + mock_result.__iter__ = lambda self: iter( + [ + MagicMock(sku="SKU-001", id=101), + MagicMock(sku="SKU-002", id=102), + ] + ) + mock_session.execute.return_value = mock_result + + resolver = KeyResolver() + result = await resolver.resolve_skus(mock_session, {"SKU-001", "SKU-002"}) + + assert result == {"SKU-001": 101, "SKU-002": 102} + + @pytest.mark.asyncio + async def test_resolve_skus_empty_set(self): + """Test that resolve_skus handles empty set.""" + mock_session = AsyncMock() + resolver = KeyResolver() + result = await resolver.resolve_skus(mock_session, set()) + + assert result == {} + mock_session.execute.assert_not_called() + + @pytest.mark.asyncio + async def test_resolve_dates_returns_set(self): + """Test that resolve_dates returns set of valid dates.""" + mock_session = AsyncMock() + mock_result = MagicMock() + mock_result.__iter__ = lambda self: iter( + [ + MagicMock(date=date(2024, 1, 15)), + MagicMock(date=date(2024, 1, 16)), + ] + ) + mock_session.execute.return_value = mock_result + + resolver = KeyResolver() + result = await resolver.resolve_dates(mock_session, {date(2024, 1, 15), date(2024, 1, 16)}) + + assert result == {date(2024, 1, 15), date(2024, 1, 16)} + + @pytest.mark.asyncio + async def test_resolve_dates_empty_set(self): + """Test that resolve_dates handles empty set.""" + mock_session = AsyncMock() + resolver = KeyResolver() + result = await resolver.resolve_dates(mock_session, set()) + + assert result == set() + mock_session.execute.assert_not_called() + + +class TestUpsertResult: + """Tests for UpsertResult dataclass.""" + + def test_default_values(self): + """Test UpsertResult default values.""" + result = UpsertResult() + assert result.processed_count == 0 + assert result.rejected_count == 0 + assert result.errors == [] + + def test_with_values(self): + """Test UpsertResult with values.""" + result = UpsertResult( + processed_count=5, + rejected_count=1, + errors=[], + ) + assert result.processed_count == 5 + assert result.rejected_count == 1 + + +class TestUpsertSalesDailyBatch: + """Tests for upsert_sales_daily_batch function.""" + + @pytest.mark.asyncio + async def test_all_valid_rows_processed(self, mock_key_resolver): + """Test that all valid rows are processed.""" + mock_session = AsyncMock() + mock_execute_result = MagicMock() + mock_execute_result.rowcount = 2 + mock_session.execute.return_value = mock_execute_result + + records = [ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S002", + sku="SKU-002", + quantity=5, + unit_price=Decimal("19.99"), + total_amount=Decimal("99.95"), + ), + ] + + result = await upsert_sales_daily_batch(mock_session, records, mock_key_resolver) + + assert result.processed_count == 2 + assert result.rejected_count == 0 + assert len(result.errors) == 0 + + @pytest.mark.asyncio + async def test_unknown_store_rejected(self, mock_key_resolver): + """Test that rows with unknown store codes are rejected.""" + mock_session = AsyncMock() + mock_execute_result = MagicMock() + mock_execute_result.rowcount = 1 + mock_session.execute.return_value = mock_execute_result + + records = [ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="UNKNOWN_STORE", # Not in mock resolver + sku="SKU-001", + quantity=5, + unit_price=Decimal("9.99"), + total_amount=Decimal("49.95"), + ), + ] + + result = await upsert_sales_daily_batch(mock_session, records, mock_key_resolver) + + assert result.processed_count == 1 + assert result.rejected_count == 1 + assert len(result.errors) == 1 + assert result.errors[0].error_code == "UNKNOWN_STORE" + assert result.errors[0].row_index == 1 + + @pytest.mark.asyncio + async def test_unknown_product_rejected(self, mock_key_resolver): + """Test that rows with unknown SKUs are rejected.""" + mock_session = AsyncMock() + mock_execute_result = MagicMock() + mock_execute_result.rowcount = 1 + mock_session.execute.return_value = mock_execute_result + + records = [ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="S001", + sku="UNKNOWN_SKU", # Not in mock resolver + quantity=5, + unit_price=Decimal("9.99"), + total_amount=Decimal("49.95"), + ), + ] + + result = await upsert_sales_daily_batch(mock_session, records, mock_key_resolver) + + assert result.processed_count == 1 + assert result.rejected_count == 1 + assert len(result.errors) == 1 + assert result.errors[0].error_code == "UNKNOWN_PRODUCT" + assert result.errors[0].row_index == 1 + + @pytest.mark.asyncio + async def test_unknown_date_rejected(self, mock_key_resolver): + """Test that rows with unknown dates are rejected.""" + mock_session = AsyncMock() + mock_execute_result = MagicMock() + mock_execute_result.rowcount = 1 + mock_session.execute.return_value = mock_execute_result + + records = [ + SalesDailyIngestRow( + date=date(2024, 1, 15), # In mock resolver + store_code="S001", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + SalesDailyIngestRow( + date=date(2024, 12, 31), # Not in mock resolver + store_code="S001", + sku="SKU-001", + quantity=5, + unit_price=Decimal("9.99"), + total_amount=Decimal("49.95"), + ), + ] + + result = await upsert_sales_daily_batch(mock_session, records, mock_key_resolver) + + assert result.processed_count == 1 + assert result.rejected_count == 1 + assert len(result.errors) == 1 + assert result.errors[0].error_code == "UNKNOWN_DATE" + assert result.errors[0].row_index == 1 + + @pytest.mark.asyncio + async def test_partial_success_mixed_rows(self, sample_ingest_rows_mixed, mock_key_resolver): + """Test partial success with mixed valid/invalid rows.""" + mock_session = AsyncMock() + mock_execute_result = MagicMock() + # 2 valid rows will be processed + mock_execute_result.rowcount = 2 + mock_session.execute.return_value = mock_execute_result + + result = await upsert_sales_daily_batch( + mock_session, sample_ingest_rows_mixed, mock_key_resolver + ) + + # 2 valid rows (S001/SKU-001 and S002/SKU-002) + # 2 invalid rows (UNKNOWN store, UNKNOWN-SKU product) + assert result.processed_count == 2 + assert result.rejected_count == 2 + assert len(result.errors) == 2 + + # Verify error codes + error_codes = {e.error_code for e in result.errors} + assert "UNKNOWN_STORE" in error_codes + assert "UNKNOWN_PRODUCT" in error_codes + + @pytest.mark.asyncio + async def test_all_rows_rejected(self, mock_key_resolver): + """Test that all rows can be rejected.""" + mock_session = AsyncMock() + + records = [ + SalesDailyIngestRow( + date=date(2024, 1, 15), + store_code="INVALID_STORE", + sku="SKU-001", + quantity=10, + unit_price=Decimal("9.99"), + total_amount=Decimal("99.90"), + ), + ] + + result = await upsert_sales_daily_batch(mock_session, records, mock_key_resolver) + + assert result.processed_count == 0 + assert result.rejected_count == 1 + # DB execute should not be called since no valid rows + # (only key resolution queries, no insert) + + @pytest.mark.asyncio + async def test_empty_records_handled(self, mock_key_resolver): + """Test that empty records list is handled.""" + mock_session = AsyncMock() + + result = await upsert_sales_daily_batch(mock_session, [], mock_key_resolver) + + assert result.processed_count == 0 + assert result.rejected_count == 0 + assert result.errors == [] diff --git a/app/main.py b/app/main.py index 0da757cf..5b7837c7 100644 --- a/app/main.py +++ b/app/main.py @@ -10,6 +10,7 @@ from app.core.health import router as health_router from app.core.logging import configure_logging, get_logger from app.core.middleware import RequestIdMiddleware +from app.features.ingest.routes import router as ingest_router logger = get_logger(__name__) @@ -66,6 +67,7 @@ def create_app() -> FastAPI: # Routers app.include_router(health_router) + app.include_router(ingest_router) return app diff --git a/examples/api/ingest_sales_daily.http b/examples/api/ingest_sales_daily.http new file mode 100644 index 00000000..c4c2ff95 --- /dev/null +++ b/examples/api/ingest_sales_daily.http @@ -0,0 +1,312 @@ +### ForecastLabAI - Ingest Sales Daily API Examples +### Using VS Code REST Client or JetBrains HTTP Client +### +### Prerequisites: +### 1. Start PostgreSQL: docker-compose up -d +### 2. Run migrations: uv run alembic upgrade head +### 3. Seed demo data: uv run python examples/seed_demo_data.py +### 4. Start API: uv run uvicorn app.main:app --reload --port 8123 + +@API_BASE_URL = http://localhost:8123 + + +### ============================================================================ +### Ingest Sales Daily - Happy Path +### ============================================================================ +### Ingest multiple valid sales records +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + }, + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-002", + "quantity": 5, + "unit_price": 19.99, + "total_amount": 99.95 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 2, +# "updated_count": 0, +# "rejected_count": 0, +# "total_processed": 2, +# "errors": [], +# "duration_ms": 45.23 +# } + + +### ============================================================================ +### Ingest Sales Daily - Idempotent Replay +### ============================================================================ +### Running the same request again should update, not duplicate +### Notice the updated quantity (15 instead of 10) +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 15, + "unit_price": 9.99, + "total_amount": 149.85 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 1, +# "updated_count": 0, +# "rejected_count": 0, +# "total_processed": 1, +# "errors": [], +# "duration_ms": 32.15 +# } +# Note: The row is updated (not duplicated) due to ON CONFLICT DO UPDATE + + +### ============================================================================ +### Ingest Sales Daily - Partial Success +### ============================================================================ +### Mix of valid and invalid rows - valid rows succeed, invalid rows return errors +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + }, + { + "date": "2024-01-15", + "store_code": "UNKNOWN_STORE", + "sku": "SKU-001", + "quantity": 5, + "unit_price": 9.99, + "total_amount": 49.95 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 1, +# "updated_count": 0, +# "rejected_count": 1, +# "total_processed": 2, +# "errors": [ +# { +# "row_index": 1, +# "store_code": "UNKNOWN_STORE", +# "sku": "SKU-001", +# "date": "2024-01-15", +# "error_code": "UNKNOWN_STORE", +# "error_message": "Store code 'UNKNOWN_STORE' not found" +# } +# ], +# "duration_ms": 38.45 +# } + + +### ============================================================================ +### Ingest Sales Daily - Unknown Product +### ============================================================================ +### Row with unknown SKU is rejected +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "NONEXISTENT-SKU", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 0, +# "updated_count": 0, +# "rejected_count": 1, +# "total_processed": 1, +# "errors": [ +# { +# "row_index": 0, +# "store_code": "S001", +# "sku": "NONEXISTENT-SKU", +# "date": "2024-01-15", +# "error_code": "UNKNOWN_PRODUCT", +# "error_message": "SKU 'NONEXISTENT-SKU' not found" +# } +# ], +# "duration_ms": 25.10 +# } + + +### ============================================================================ +### Ingest Sales Daily - Unknown Date +### ============================================================================ +### Row with date not in calendar is rejected +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2099-12-31", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 0, +# "updated_count": 0, +# "rejected_count": 1, +# "total_processed": 1, +# "errors": [ +# { +# "row_index": 0, +# "store_code": "S001", +# "sku": "SKU-001", +# "date": "2099-12-31", +# "error_code": "UNKNOWN_DATE", +# "error_message": "Date '2099-12-31' not found in calendar" +# } +# ], +# "duration_ms": 20.50 +# } + + +### ============================================================================ +### Ingest Sales Daily - Validation Error +### ============================================================================ +### Empty records list returns 422 Validation Error +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [] +} + +### Expected Response: +# HTTP/1.1 422 Unprocessable Entity +# { +# "detail": [ +# { +# "type": "too_short", +# "loc": ["body", "records"], +# "msg": "List should have at least 1 item after validation, not 0", +# ... +# } +# ] +# } + + +### ============================================================================ +### Ingest Sales Daily - Large Batch +### ============================================================================ +### Example with multiple stores and products +POST {{API_BASE_URL}}/ingest/sales-daily +Content-Type: application/json + +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 100, + "unit_price": 9.99, + "total_amount": 999.00 + }, + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-002", + "quantity": 50, + "unit_price": 19.99, + "total_amount": 999.50 + }, + { + "date": "2024-01-15", + "store_code": "S002", + "sku": "SKU-001", + "quantity": 75, + "unit_price": 9.99, + "total_amount": 749.25 + }, + { + "date": "2024-01-15", + "store_code": "S002", + "sku": "SKU-002", + "quantity": 25, + "unit_price": 19.99, + "total_amount": 499.75 + }, + { + "date": "2024-01-16", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 110, + "unit_price": 9.99, + "total_amount": 1098.90 + }, + { + "date": "2024-01-16", + "store_code": "S001", + "sku": "SKU-002", + "quantity": 55, + "unit_price": 19.99, + "total_amount": 1099.45 + } + ] +} + +### Expected Response: +# HTTP/1.1 200 OK +# { +# "inserted_count": 6, +# "updated_count": 0, +# "rejected_count": 0, +# "total_processed": 6, +# "errors": [], +# "duration_ms": 85.50 +# } diff --git a/pyproject.toml b/pyproject.toml index d92c0023..e256ff40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,13 +77,14 @@ ignore = [ ] [tool.ruff.lint.per-file-ignores] -"tests/**/*.py" = ["S101", "ANN", "ARG001", "D", "E731"] -"test_*.py" = ["S101", "ANN", "ARG001", "D", "E731"] -"**/tests/**/*.py" = ["S101", "ANN", "ARG001", "D", "E731"] +"tests/**/*.py" = ["S101", "ANN", "ARG001", "ARG002", "ARG005", "D", "E731"] +"test_*.py" = ["S101", "ANN", "ARG001", "ARG002", "ARG005", "D", "E731"] +"**/tests/**/*.py" = ["S101", "ANN", "ARG001", "ARG002", "ARG005", "D", "E731"] "__init__.py" = ["F401"] "scripts/**/*.py" = ["T201", "ANN", "S101"] "examples/**/*.py" = ["T201", "ANN"] "app/core/health.py" = ["B008"] +"app/features/*/routes.py" = ["B008"] "alembic/env.py" = ["ANN"] [tool.ruff.format] From 3249bf61387501c38a7455479457ef6cfe778323 Mon Sep 17 00:00:00 2001 From: Gabor Szabo <168316277+w7-mgfcode@users.noreply.github.com> Date: Mon, 26 Jan 2026 14:04:12 +0100 Subject: [PATCH 3/4] docs: add Phase 2 (Ingest Layer) documentation (#20) - Update README.md with API Endpoints section and ingest examples - Mark Phase 2 as completed in PHASE-index.md - Add comprehensive Phase 2 documentation (2-INGEST_LAYER.md) Co-authored-by: Gabe@w7dev Co-authored-by: Claude Opus 4.5 --- README.md | 40 +++- docs/PHASE-index.md | 50 ++++- docs/PHASE/2-INGEST_LAYER.md | 417 +++++++++++++++++++++++++++++++++++ 3 files changed, 502 insertions(+), 5 deletions(-) create mode 100644 docs/PHASE/2-INGEST_LAYER.md diff --git a/README.md b/README.md index 5fa53b06..68e22415 100644 --- a/README.md +++ b/README.md @@ -92,12 +92,14 @@ app/ ├── core/ # Config, database, logging, middleware, exceptions ├── shared/ # Pagination, timestamps, error schemas ├── features/ -│ └── data_platform/ # Store, product, calendar, sales tables +│ ├── data_platform/ # Store, product, calendar, sales tables +│ └── ingest/ # Batch upsert endpoints for sales data └── main.py # FastAPI entry point tests/ # Test fixtures and helpers alembic/ # Database migrations examples/ +├── api/ # HTTP client examples ├── schema/ # Table documentation └── queries/ # Example SQL queries scripts/ # Utility scripts @@ -112,6 +114,42 @@ The data platform includes 7 tables for retail demand forecasting: See [examples/schema/README.md](examples/schema/README.md) for detailed schema documentation. +## API Endpoints + +### Health Check + +- `GET /health` - Returns `{"status": "ok"}` when the API is running + +### Ingest + +- `POST /ingest/sales-daily` - Batch upsert daily sales records + +**Example Request:** +```bash +curl -X POST http://localhost:8123/ingest/sales-daily \ + -H "Content-Type: application/json" \ + -d '{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] + }' +``` + +**Features:** +- Natural key resolution (`store_code` -> `store_id`, `sku` -> `product_id`) +- Idempotent upsert using PostgreSQL `ON CONFLICT DO UPDATE` +- Partial success handling (valid rows processed, invalid rows returned with errors) +- Error codes: `UNKNOWN_STORE`, `UNKNOWN_PRODUCT`, `UNKNOWN_DATE` + +See [examples/api/ingest_sales_daily.http](examples/api/ingest_sales_daily.http) for more examples. + ## API Documentation Once the server is running: diff --git a/docs/PHASE-index.md b/docs/PHASE-index.md index cdc60a05..e1bf3bcc 100644 --- a/docs/PHASE-index.md +++ b/docs/PHASE-index.md @@ -10,7 +10,7 @@ This document indexes all implementation phases of the ForecastLabAI project. |-------|------|--------|-----|---------------| | 0 | Project Foundation | Completed | PRP-0, PRP-1 | [0-INIT_PHASE.md](./PHASE/0-INIT_PHASE.md) | | 1 | Data Platform | Completed | PRP-2 | [1-DATA_PLATFORM.md](./PHASE/1-DATA_PLATFORM.md) | -| 2 | Ingest Layer | Pending | PRP-3 | - | +| 2 | Ingest Layer | Completed | PRP-3 | [2-INGEST_LAYER.md](./PHASE/2-INGEST_LAYER.md) | | 3 | Feature Engineering | Pending | PRP-4 | - | | 4 | Forecasting | Pending | PRP-5 | - | | 5 | Backtesting | Pending | PRP-6 | - | @@ -81,13 +81,54 @@ This document indexes all implementation phases of the ForecastLabAI project. - Pyright: 0 errors - Pytest: 43 tests passed (32 unit + 11 integration) +### [Phase 2: Ingest Layer](./PHASE/2-INGEST_LAYER.md) + +**Date Completed**: 2026-01-26 + +**Summary**: Idempotent batch upsert endpoint for sales data ingestion with: +- `POST /ingest/sales-daily` endpoint for batch upserts +- Natural key resolution (store_code -> store_id, sku -> product_id) +- PostgreSQL `ON CONFLICT DO UPDATE` for replay-safe idempotency +- Partial success handling (valid rows processed, invalid rows returned with errors) +- Calendar date validation (FK constraint enforcement) +- Structured logging with duration metrics + +**Key Deliverables**: +- `app/features/ingest/routes.py` - POST /ingest/sales-daily endpoint +- `app/features/ingest/schemas.py` - Request/response Pydantic schemas +- `app/features/ingest/service.py` - KeyResolver and upsert_sales_daily_batch logic +- `app/core/config.py` - Added ingest_batch_size, ingest_timeout_seconds settings +- `examples/api/ingest_sales_daily.http` - HTTP client examples + +**Error Codes**: +- `UNKNOWN_STORE` - Store code not found in database +- `UNKNOWN_PRODUCT` - SKU not found in database +- `UNKNOWN_DATE` - Date not found in calendar table + +**API Response Schema**: +```json +{ + "processed_count": 10, + "rejected_count": 2, + "total_received": 12, + "errors": [ + { + "row_index": 5, + "store_code": "INVALID", + "sku": "SKU-001", + "date": "2024-01-15", + "error_code": "UNKNOWN_STORE", + "error_message": "Store code 'INVALID' not found" + } + ], + "duration_ms": 45.23 +} +``` + --- ## Pending Phases -### Phase 2: Ingest Layer -Idempotent upsert endpoints for sales_daily and sales_txn data. - ### Phase 3: Feature Engineering Time-safe feature computation with lag, rolling, and exogenous features. @@ -148,3 +189,4 @@ Each phase document (`docs/PHASE/X-PHASE_NAME.md`) contains: | 2026-01-26 | 0 | Initial project foundation completed | | 2026-01-26 | 0 | Added CI/CD infrastructure (5 GitHub Actions workflows) | | 2026-01-26 | 1 | Data Platform schema and migrations completed (v0.1.3) | +| 2026-01-26 | 2 | Ingest Layer with POST /ingest/sales-daily endpoint completed | diff --git a/docs/PHASE/2-INGEST_LAYER.md b/docs/PHASE/2-INGEST_LAYER.md new file mode 100644 index 00000000..0fe27e5c --- /dev/null +++ b/docs/PHASE/2-INGEST_LAYER.md @@ -0,0 +1,417 @@ +# Phase 2: Ingest Layer + +**Status**: Completed +**PRP Reference**: `PRPs/PRP-3-ingest-layer.md` +**Date Completed**: 2026-01-26 +**Branch**: `feat/prp-3-ingest-layer` + +--- + +## Executive Summary + +Phase 2 implements the ingest layer for ForecastLabAI - a typed, idempotent batch upsert endpoint for sales data. The primary endpoint `POST /ingest/sales-daily` accepts sales records with natural keys (`store_code`, `sku`) and performs replay-safe upserts using PostgreSQL's `ON CONFLICT DO UPDATE`. + +--- + +## Objectives + +### Primary Goals +1. Create batch upsert endpoint for sales_daily data +2. Implement natural key resolution (store_code -> store_id, sku -> product_id) +3. Enforce idempotency using PostgreSQL ON CONFLICT DO UPDATE +4. Support partial success (valid rows processed, invalid rows returned with errors) +5. Validate calendar dates exist (FK constraint) +6. Provide structured logging with duration metrics +7. Add configurable batch size and timeout settings + +### Design Principles Applied +- **KISS**: Simple key resolution without premature caching optimization +- **YAGNI**: Single endpoint for sales_daily, no staging table optimization yet +- **Partial Success**: Don't fail entire batch on one bad row +- **Type Safety**: Strict Pydantic v2 schemas with Decimal for monetary values + +--- + +## Deliverables + +### 1. API Endpoint + +#### POST /ingest/sales-daily + +Batch upsert daily sales records using natural keys. + +**Request Body:** +```json +{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] +} +``` + +**Response Body:** +```json +{ + "processed_count": 10, + "rejected_count": 2, + "total_received": 12, + "errors": [ + { + "row_index": 5, + "store_code": "INVALID", + "sku": "SKU-001", + "date": "2024-01-15", + "error_code": "UNKNOWN_STORE", + "error_message": "Store code 'INVALID' not found" + } + ], + "duration_ms": 45.23 +} +``` + +**Validation Rules:** +- `records` array: 1-10,000 items +- `store_code`: 1-20 characters +- `sku`: 1-50 characters +- `quantity`: non-negative integer +- `unit_price`: non-negative decimal (2 decimal places) +- `total_amount`: non-negative decimal (2 decimal places) + +**Error Codes:** +| Code | Description | +|------|-------------| +| `UNKNOWN_STORE` | Store code not found in database | +| `UNKNOWN_PRODUCT` | SKU not found in database | +| `UNKNOWN_DATE` | Date not found in calendar table | + +--- + +### 2. Pydantic Schemas (`app/features/ingest/schemas.py`) + +```python +class SalesDailyIngestRow(BaseModel): + """Single row in sales daily ingest payload.""" + date: date_type + store_code: str = Field(..., min_length=1, max_length=20) + sku: str = Field(..., min_length=1, max_length=50) + quantity: int = Field(..., ge=0) + unit_price: Decimal = Field(..., ge=0, decimal_places=2) + total_amount: Decimal = Field(..., ge=0, decimal_places=2) + +class SalesDailyIngestRequest(BaseModel): + """Request body for POST /ingest/sales-daily.""" + records: list[SalesDailyIngestRow] = Field( + ..., min_length=1, max_length=10000 + ) + +class IngestRowError(BaseModel): + """Error detail for a single rejected row.""" + row_index: int + store_code: str + sku: str + date: date_type + error_code: str + error_message: str + +class SalesDailyIngestResponse(BaseModel): + """Response body for POST /ingest/sales-daily.""" + processed_count: int = Field(..., ge=0) + rejected_count: int = Field(..., ge=0) + total_received: int = Field(..., ge=0) + errors: list[IngestRowError] = Field(default_factory=list) + duration_ms: float = Field(..., ge=0) +``` + +--- + +### 3. Service Layer (`app/features/ingest/service.py`) + +#### KeyResolver Class + +Resolves natural keys to internal database IDs: + +```python +class KeyResolver: + async def resolve_store_codes( + self, db: AsyncSession, codes: set[str] + ) -> dict[str, int]: + """Resolve store codes to store IDs.""" + stmt = select(Store.code, Store.id).where(Store.code.in_(codes)) + result = await db.execute(stmt) + return {row.code: row.id for row in result} + + async def resolve_skus( + self, db: AsyncSession, skus: set[str] + ) -> dict[str, int]: + """Resolve SKUs to product IDs.""" + stmt = select(Product.sku, Product.id).where(Product.sku.in_(skus)) + result = await db.execute(stmt) + return {row.sku: row.id for row in result} + + async def resolve_dates( + self, db: AsyncSession, dates: set[date_type] + ) -> set[date_type]: + """Check which dates exist in the calendar table.""" + stmt = select(Calendar.date).where(Calendar.date.in_(dates)) + result = await db.execute(stmt) + return {row.date for row in result} +``` + +#### upsert_sales_daily_batch Function + +Performs idempotent upsert with partial success handling: + +```python +async def upsert_sales_daily_batch( + db: AsyncSession, + records: list[SalesDailyIngestRow], + key_resolver: KeyResolverProtocol, +) -> UpsertResult: + """Upsert sales daily records with key resolution and partial success.""" + # 1. Extract unique codes, SKUs, and dates + # 2. Resolve all keys in batch + # 3. Validate and prepare rows (collect errors for invalid) + # 4. Perform upsert using pg_insert().on_conflict_do_update() + # 5. Return UpsertResult with counts and errors +``` + +**PostgreSQL Upsert Pattern:** +```python +insert_stmt = pg_insert(SalesDaily).values(valid_rows) +upsert_stmt = insert_stmt.on_conflict_do_update( + index_elements=["date", "store_id", "product_id"], + set_={ + "quantity": insert_stmt.excluded.quantity, + "unit_price": insert_stmt.excluded.unit_price, + "total_amount": insert_stmt.excluded.total_amount, + "updated_at": func.now(), + }, +) +await db.execute(upsert_stmt) +``` + +--- + +### 4. Configuration (`app/core/config.py`) + +Added ingest-specific settings: + +```python +class Settings(BaseSettings): + # ... existing settings ... + + # Ingest + ingest_batch_size: int = 1000 + ingest_timeout_seconds: int = 60 +``` + +| Setting | Default | Description | +|---------|---------|-------------| +| `INGEST_BATCH_SIZE` | 1000 | Max rows per upsert batch | +| `INGEST_TIMEOUT_SECONDS` | 60 | Request timeout for ingest | + +--- + +### 5. Logging Events + +Following the `{domain}.{component}.{action}_{state}` naming convention: + +| Event | Level | Context | +|-------|-------|---------| +| `ingest.sales_daily.request_received` | INFO | record_count | +| `ingest.sales_daily.upsert_started` | INFO | batch_size | +| `ingest.sales_daily.upsert_completed` | INFO | processed, rejected, total_valid | +| `ingest.sales_daily.request_completed` | INFO | processed, rejected, duration_ms | +| `ingest.sales_daily.request_failed` | ERROR | error, error_type, exc_info | + +--- + +## Directory Structure + +``` +app/features/ingest/ +├── __init__.py +├── routes.py # POST /ingest/sales-daily endpoint +├── schemas.py # Pydantic request/response schemas +├── service.py # KeyResolver + upsert_sales_daily_batch +└── tests/ + ├── __init__.py + ├── conftest.py # Feature-specific fixtures + ├── test_schemas.py # Schema validation tests + ├── test_service.py # Service logic tests + └── test_routes.py # Integration tests + +examples/api/ +└── ingest_sales_daily.http # HTTP client examples +``` + +--- + +## Examples + +### Example: Happy Path Ingest + +```bash +curl -X POST http://localhost:8123/ingest/sales-daily \ + -H "Content-Type: application/json" \ + -d '{ + "records": [ + { + "date": "2024-01-15", + "store_code": "S001", + "sku": "SKU-001", + "quantity": 10, + "unit_price": 9.99, + "total_amount": 99.90 + } + ] + }' +``` + +Response: +```json +{ + "processed_count": 1, + "rejected_count": 0, + "total_received": 1, + "errors": [], + "duration_ms": 32.15 +} +``` + +### Example: Idempotent Replay + +Running the same request twice updates (not duplicates) due to ON CONFLICT: + +```bash +# First request: inserts row +# Second request: updates row (same grain) +# Result: 1 row in database, not 2 +``` + +### Example: Partial Success + +```json +{ + "records": [ + {"date": "2024-01-15", "store_code": "S001", "sku": "SKU-001", ...}, + {"date": "2024-01-15", "store_code": "INVALID", "sku": "SKU-001", ...} + ] +} +``` + +Response: +```json +{ + "processed_count": 1, + "rejected_count": 1, + "total_received": 2, + "errors": [ + { + "row_index": 1, + "store_code": "INVALID", + "sku": "SKU-001", + "date": "2024-01-15", + "error_code": "UNKNOWN_STORE", + "error_message": "Store code 'INVALID' not found" + } + ], + "duration_ms": 38.45 +} +``` + +--- + +## Key Design Decisions + +### 1. Natural Keys in Request + +**Decision**: Accept `store_code` and `sku` instead of internal IDs. + +**Rationale**: External systems (POS, ERP) don't know internal database IDs. Natural keys provide a stable interface that doesn't require ID lookups before ingestion. + +### 2. Batch Key Resolution + +**Decision**: Resolve all store codes, SKUs, and dates in three batch queries upfront. + +**Rationale**: Avoids N+1 queries. For 1000 records with 10 unique stores and 50 unique SKUs, this is 3 queries instead of potentially 2000. + +### 3. Calendar FK Validation + +**Decision**: Reject rows with dates not in the calendar table. + +**Rationale**: The `sales_daily.date` column has a FK to `calendar.date`. Rather than auto-create calendar entries (which could mask data issues), we reject and report the error. + +### 4. Partial Success Pattern + +**Decision**: Process valid rows even when some rows fail validation. + +**Rationale**: In real-world batch processing, failing an entire 10,000 row batch because of one bad row is unacceptable. Report errors but process what's valid. + +### 5. No Insert/Update Count Distinction + +**Decision**: Report `processed_count` without distinguishing inserts from updates. + +**Rationale**: PostgreSQL's ON CONFLICT doesn't easily distinguish inserts from updates without additional complexity (checking xmax). The important metric is "rows successfully written." + +--- + +## Integration Points + +```yaml +DATABASE: + - No new migrations required + - Uses existing SalesDaily, Store, Product, Calendar tables + - Relies on grain constraint: uq_sales_daily_grain(date, store_id, product_id) + +CONFIG: + - app/core/config.py: Added INGEST_BATCH_SIZE, INGEST_TIMEOUT_SECONDS + +ROUTES: + - app/main.py: Registered ingest_router + - Endpoint: POST /ingest/sales-daily + +DEPENDENCIES: + - Store table must have stores with matching codes + - Product table must have products with matching SKUs + - Calendar table must have entries for dates in payload +``` + +--- + +## Next Phase Preparation + +Phase 2 provides the foundation for: + +1. **Phase 3 (Feature Engineering)**: With sales data ingested, compute time-safe features (lags, rolling windows) +2. **Phase 4 (Forecasting)**: Train models on ingested sales_daily data +3. **Phase 5 (Backtesting)**: Ingest historical data for backtesting experiments +4. **Future Ingest Endpoints**: Same pattern can be extended for price_history, promotion, inventory_snapshot_daily + +--- + +## Lessons Learned + +1. **PostgreSQL Dialect Import**: Must use `from sqlalchemy.dialects.postgresql import insert as pg_insert` for ON CONFLICT support, not generic SQLAlchemy insert. + +2. **Index Elements vs Constraint Name**: `on_conflict_do_update()` requires `index_elements=["date", "store_id", "product_id"]` (column names), not the constraint name. + +3. **Rowcount Type Stubs**: SQLAlchemy's generic Result type doesn't expose `rowcount` in type stubs, but it's available at runtime for DML operations. Required type ignore comment. + +4. **Decimal for Money**: Always use `Decimal` with explicit `decimal_places` for monetary values to avoid floating point precision issues. + +--- + +## References + +- [PRP-3: Ingest Layer](../../PRPs/PRP-3-ingest-layer.md) +- [Phase 1: Data Platform](./1-DATA_PLATFORM.md) - Schema foundation +- [SQLAlchemy PostgreSQL INSERT ON CONFLICT](https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#insert-on-conflict-upsert) +- [HTTP Client Examples](../../examples/api/ingest_sales_daily.http) From 3fb1b06b584b7f0e39019de49d68ebc456ec02a7 Mon Sep 17 00:00:00 2001 From: "Gabe@w7dev" Date: Mon, 26 Jan 2026 13:17:06 +0000 Subject: [PATCH 4/4] fix(docs): address CodeRabbit review comments - Change "floating point" to "floating-point" (compound adjective) - Add blank lines around Error Codes table (MD058) - Add language identifier to directory structure code fence (MD040) - Update HTTP examples to use processed_count/total_received schema Co-Authored-By: Claude Opus 4.5 --- docs/PHASE/2-INGEST_LAYER.md | 5 +++-- examples/api/ingest_sales_daily.http | 30 +++++++++++----------------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/docs/PHASE/2-INGEST_LAYER.md b/docs/PHASE/2-INGEST_LAYER.md index 0fe27e5c..8a3d5809 100644 --- a/docs/PHASE/2-INGEST_LAYER.md +++ b/docs/PHASE/2-INGEST_LAYER.md @@ -85,6 +85,7 @@ Batch upsert daily sales records using natural keys. - `total_amount`: non-negative decimal (2 decimal places) **Error Codes:** + | Code | Description | |------|-------------| | `UNKNOWN_STORE` | Store code not found in database | @@ -235,7 +236,7 @@ Following the `{domain}.{component}.{action}_{state}` naming convention: ## Directory Structure -``` +```text app/features/ingest/ ├── __init__.py ├── routes.py # POST /ingest/sales-daily endpoint @@ -405,7 +406,7 @@ Phase 2 provides the foundation for: 3. **Rowcount Type Stubs**: SQLAlchemy's generic Result type doesn't expose `rowcount` in type stubs, but it's available at runtime for DML operations. Required type ignore comment. -4. **Decimal for Money**: Always use `Decimal` with explicit `decimal_places` for monetary values to avoid floating point precision issues. +4. **Decimal for Money**: Always use `Decimal` with explicit `decimal_places` for monetary values to avoid floating-point precision issues. --- diff --git a/examples/api/ingest_sales_daily.http b/examples/api/ingest_sales_daily.http index c4c2ff95..db94542e 100644 --- a/examples/api/ingest_sales_daily.http +++ b/examples/api/ingest_sales_daily.http @@ -41,10 +41,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 2, -# "updated_count": 0, +# "processed_count": 2, # "rejected_count": 0, -# "total_processed": 2, +# "total_received": 2, # "errors": [], # "duration_ms": 45.23 # } @@ -74,10 +73,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 1, -# "updated_count": 0, +# "processed_count": 1, # "rejected_count": 0, -# "total_processed": 1, +# "total_received": 1, # "errors": [], # "duration_ms": 32.15 # } @@ -115,10 +113,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 1, -# "updated_count": 0, +# "processed_count": 1, # "rejected_count": 1, -# "total_processed": 2, +# "total_received": 2, # "errors": [ # { # "row_index": 1, @@ -156,10 +153,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 0, -# "updated_count": 0, +# "processed_count": 0, # "rejected_count": 1, -# "total_processed": 1, +# "total_received": 1, # "errors": [ # { # "row_index": 0, @@ -197,10 +193,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 0, -# "updated_count": 0, +# "processed_count": 0, # "rejected_count": 1, -# "total_processed": 1, +# "total_received": 1, # "errors": [ # { # "row_index": 0, @@ -303,10 +298,9 @@ Content-Type: application/json ### Expected Response: # HTTP/1.1 200 OK # { -# "inserted_count": 6, -# "updated_count": 0, +# "processed_count": 6, # "rejected_count": 0, -# "total_processed": 6, +# "total_received": 6, # "errors": [], # "duration_ms": 85.50 # }