Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
888 changes: 888 additions & 0 deletions PRPs/PRP-3-ingest-layer.md

Large diffs are not rendered by default.

40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ app/
├── core/ # Config, database, logging, middleware, exceptions
├── shared/ # Pagination, timestamps, error schemas
├── features/
│ └── data_platform/ # Store, product, calendar, sales tables
│ ├── data_platform/ # Store, product, calendar, sales tables
│ └── ingest/ # Batch upsert endpoints for sales data
└── main.py # FastAPI entry point

tests/ # Test fixtures and helpers
alembic/ # Database migrations
examples/
├── api/ # HTTP client examples
├── schema/ # Table documentation
└── queries/ # Example SQL queries
scripts/ # Utility scripts
Expand All @@ -112,6 +114,42 @@ The data platform includes 7 tables for retail demand forecasting:

See [examples/schema/README.md](examples/schema/README.md) for detailed schema documentation.

## API Endpoints

### Health Check

- `GET /health` - Returns `{"status": "ok"}` when the API is running

### Ingest

- `POST /ingest/sales-daily` - Batch upsert daily sales records

**Example Request:**
```bash
curl -X POST http://localhost:8123/ingest/sales-daily \
-H "Content-Type: application/json" \
-d '{
"records": [
{
"date": "2024-01-15",
"store_code": "S001",
"sku": "SKU-001",
"quantity": 10,
"unit_price": 9.99,
"total_amount": 99.90
}
]
}'
```

**Features:**
- Natural key resolution (`store_code` -> `store_id`, `sku` -> `product_id`)
- Idempotent upsert using PostgreSQL `ON CONFLICT DO UPDATE`
- Partial success handling (valid rows processed, invalid rows returned with errors)
- Error codes: `UNKNOWN_STORE`, `UNKNOWN_PRODUCT`, `UNKNOWN_DATE`

See [examples/api/ingest_sales_daily.http](examples/api/ingest_sales_daily.http) for more examples.

## API Documentation

Once the server is running:
Expand Down
4 changes: 4 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class Settings(BaseSettings):
api_host: str = "0.0.0.0" # noqa: S104
api_port: int = 8123

# Ingest
ingest_batch_size: int = 1000
ingest_timeout_seconds: int = 60

@property
def is_development(self) -> bool:
"""Check if running in development mode."""
Expand Down
20 changes: 20 additions & 0 deletions app/features/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Ingest feature module for idempotent batch upserts."""

from app.features.ingest.routes import router
from app.features.ingest.schemas import (
IngestRowError,
SalesDailyIngestRequest,
SalesDailyIngestResponse,
SalesDailyIngestRow,
)
from app.features.ingest.service import KeyResolver, upsert_sales_daily_batch

__all__ = [
"IngestRowError",
"KeyResolver",
"SalesDailyIngestRequest",
"SalesDailyIngestResponse",
"SalesDailyIngestRow",
"router",
"upsert_sales_daily_batch",
]
97 changes: 97 additions & 0 deletions app/features/ingest/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Ingest API routes for batch upsert operations."""

import time

from fastapi import APIRouter, Depends, status
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db
from app.core.exceptions import DatabaseError
from app.core.logging import get_logger
from app.features.ingest.schemas import (
SalesDailyIngestRequest,
SalesDailyIngestResponse,
)
from app.features.ingest.service import KeyResolver, upsert_sales_daily_batch

logger = get_logger(__name__)

router = APIRouter(prefix="/ingest", tags=["ingest"])


@router.post(
"/sales-daily",
response_model=SalesDailyIngestResponse,
status_code=status.HTTP_200_OK,
summary="Batch upsert daily sales records",
description="""
Batch upsert daily sales records using natural keys.

Accepts sales records with store_code and sku (natural keys).
Resolves to internal IDs and performs idempotent upsert using
PostgreSQL ON CONFLICT DO UPDATE.

**Idempotency:** Running the same request twice will update existing
records rather than create duplicates. The grain (date, store_id, product_id)
is enforced by a unique constraint.

**Partial Success:** Invalid rows (unknown store_code, sku, or date) are
rejected while valid rows are processed. Response includes counts and
error details for rejected rows.
""",
)
async def ingest_sales_daily(
request: SalesDailyIngestRequest,
db: AsyncSession = Depends(get_db),
) -> SalesDailyIngestResponse:
"""Batch upsert daily sales records.

Args:
request: Sales ingest request with list of records.
db: Async database session from dependency.

Returns:
Response with processed, rejected counts and error details.

Raises:
DatabaseError: If database operation fails unexpectedly.
"""
start_time = time.perf_counter()

logger.info(
"ingest.sales_daily.request_received",
record_count=len(request.records),
)

try:
key_resolver = KeyResolver()
result = await upsert_sales_daily_batch(db, request.records, key_resolver)

duration_ms = (time.perf_counter() - start_time) * 1000

logger.info(
"ingest.sales_daily.request_completed",
processed=result.processed_count,
rejected=result.rejected_count,
duration_ms=round(duration_ms, 2),
)

return SalesDailyIngestResponse(
processed_count=result.processed_count,
rejected_count=result.rejected_count,
total_received=len(request.records),
errors=result.errors,
duration_ms=round(duration_ms, 2),
)
except SQLAlchemyError as e:
logger.error(
"ingest.sales_daily.request_failed",
error=str(e),
error_type=type(e).__name__,
exc_info=True,
)
raise DatabaseError(
message="Failed to process sales daily ingest",
details={"error": str(e)},
) from e
73 changes: 73 additions & 0 deletions app/features/ingest/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Pydantic schemas for ingest API."""

from datetime import date as date_type
from decimal import Decimal

from pydantic import BaseModel, Field


class SalesDailyIngestRow(BaseModel):
"""Single row in sales daily ingest payload.

Uses natural keys (store_code, sku) instead of internal IDs.
Service resolves these to store_id, product_id before upsert.

Note: total_amount is accepted as-is without validation against
quantity * unit_price, as source systems may apply discounts or
rounding that we trust.
"""

date: date_type
store_code: str = Field(
..., min_length=1, max_length=20, description="Store code (natural key)"
)
sku: str = Field(..., min_length=1, max_length=50, description="Product SKU (natural key)")
quantity: int = Field(..., ge=0, description="Units sold (non-negative)")
unit_price: Decimal = Field(..., ge=0, decimal_places=2, description="Price per unit")
total_amount: Decimal = Field(
...,
ge=0,
decimal_places=2,
description="Total sales amount (trusted as-is from source system)",
)


class SalesDailyIngestRequest(BaseModel):
"""Request body for POST /ingest/sales-daily."""

records: list[SalesDailyIngestRow] = Field(
...,
min_length=1,
max_length=10000,
description="Sales records to upsert",
)


class IngestRowError(BaseModel):
"""Error detail for a single rejected row."""

row_index: int = Field(..., description="0-based index of the failed row")
store_code: str = Field(..., description="Store code from the row")
sku: str = Field(..., description="SKU from the row")
date: date_type = Field(..., description="Date from the row")
error_code: str = Field(..., description="Machine-readable error code")
error_message: str = Field(..., description="Human-readable error message")


class SalesDailyIngestResponse(BaseModel):
"""Response body for POST /ingest/sales-daily.

Note: Due to PostgreSQL ON CONFLICT semantics, we cannot distinguish
inserts from updates without additional complexity. The `processed_count`
field represents all rows successfully written (inserted or updated).
"""

processed_count: int = Field(
..., ge=0, description="Number of rows successfully written (inserted or updated)"
)
rejected_count: int = Field(..., ge=0, description="Number of rows rejected")
total_received: int = Field(..., ge=0, description="Total rows received in request")
errors: "list[IngestRowError]" = Field( # pyright: ignore[reportUnknownVariableType]
default_factory=list, description="Details of rejected rows"
)
duration_ms: float = Field(..., ge=0, description="Processing duration in milliseconds")
Loading