Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,022 changes: 1,022 additions & 0 deletions PRPs/PRP-21-explorer-runs-jobs-interactivity.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Portfolio-grade end-to-end retail demand forecasting system.
- **Serving Layer**: Typed FastAPI endpoints (Pydantic v2 validation)
- **Model Registry**: Run configs, metrics, artifacts, and data windows for reproducibility
- **Dashboard**: React 19 + Vite + Tailwind CSS 4 + shadcn/ui for data exploration and model management
- **Explorer**: Click-through store & product detail pages with date-scoped KPIs and revenue charts; sortable, CSV-exportable tables with column-visibility toggles; revenue bar/line charts and URL-shareable cross-filtering on the Sales page
- **Explorer**: Click-through detail pages for stores, products, model runs, and jobs; run-vs-run comparison and SHA-256 artifact integrity verification; server-side sortable, CSV-exportable tables with column-visibility toggles and URL-shareable filter/sort/page state across every Explorer page; date-scoped KPIs, revenue bar/line charts, and cross-filtering on the Sales page
- **RAG Knowledge Base**: Postgres pgvector embeddings + evidence-grounded answers with citations
- **Agentic Layer**: PydanticAI agents for autonomous experimentation and evidence-grounded Q&A with human-in-the-loop approval
- **Data Seeder (The Forge)**: Reproducible synthetic data generator with realistic time-series patterns, scenario presets, and retail effects
Expand Down
16 changes: 16 additions & 0 deletions app/features/jobs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,17 @@ async def create_job(
- `job_type`: Filter by job type (train, predict, backtest)
- `status`: Filter by status (pending, running, completed, failed, cancelled)

**Sorting**:
- `sort_by`: Allow-listed column (created_at, completed_at, job_type, status).
Unknown values fall back to the default order.
- `sort_order`: `asc` or `desc` (default: `asc`).

**Example Use Cases**:
1. List all jobs: `GET /jobs`
2. List failed jobs: `GET /jobs?status=failed`
3. List train jobs: `GET /jobs?job_type=train`
4. Paginate: `GET /jobs?page=2&page_size=10`
5. Sort by status: `GET /jobs?sort_by=status&sort_order=desc`
""",
)
async def list_jobs(
Expand All @@ -172,6 +178,12 @@ async def list_jobs(
page_size: int = Query(20, ge=1, le=100, description="Jobs per page (max 100)"),
job_type: JobType | None = Query(None, description="Filter by job type"),
status: JobStatus | None = Query(None, description="Filter by status"),
sort_by: str | None = Query(
None,
description="Sort column: created_at|completed_at|job_type|status. "
"Unknown values use the default order (created_at desc).",
),
sort_order: str = Query("asc", pattern="^(asc|desc)$", description="Sort direction."),
) -> JobListResponse:
"""List jobs with pagination and filtering.

Expand All @@ -181,6 +193,8 @@ async def list_jobs(
page_size: Number of jobs per page.
job_type: Filter by job type (optional).
status: Filter by status (optional).
sort_by: Allow-listed sort column; unknown values use the default order.
sort_order: Sort direction ("asc" or "desc").

Returns:
Paginated list of jobs.
Expand All @@ -192,6 +206,8 @@ async def list_jobs(
page_size=page_size,
job_type=job_type,
status=status,
sort_by=sort_by,
sort_order=sort_order,
)


Expand Down
28 changes: 27 additions & 1 deletion app/features/jobs/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import InstrumentedAttribute

from app.core.config import get_settings
from app.core.logging import get_logger
Expand Down Expand Up @@ -46,6 +47,17 @@
# most meaningful single number; change this constant to pick a different one.
_STABILITY_METRIC: str = "wape"

# Allow-listed sort columns for the job list endpoint. sort_by is user input —
# it MUST resolve through this map to a real mapped column; an unknown key
# falls back to the default order (never an error, never raw SQL). JSONB
# columns (params, result) are intentionally excluded — not meaningfully sortable.
_JOB_SORT_COLUMNS: dict[str, InstrumentedAttribute[Any]] = {
"created_at": Job.created_at,
"completed_at": Job.completed_at,
"job_type": Job.job_type,
"status": Job.status,
}


def _finite(value: float) -> float:
"""Coerce NaN/inf to 0.0 so a job result stays JSON/JSONB-safe.
Expand Down Expand Up @@ -208,6 +220,8 @@ async def list_jobs(
page_size: int = 20,
job_type: JobType | None = None,
status: JobStatus | None = None,
sort_by: str | None = None,
sort_order: str = "asc",
) -> JobListResponse:
"""List jobs with pagination and filtering.

Expand All @@ -217,6 +231,10 @@ async def list_jobs(
page_size: Number of jobs per page.
job_type: Filter by job type (optional).
status: Filter by status (optional).
sort_by: Allow-listed sort column (created_at, completed_at,
job_type, status). Unknown values fall back to the default
order (created_at desc).
sort_order: Sort direction ("asc" or "desc").

Returns:
Paginated list of jobs.
Expand All @@ -235,9 +253,17 @@ async def list_jobs(
count_result = await db.execute(count_stmt)
total = count_result.scalar_one()

# Apply ordering: allow-listed sort column, else the default
# (created_at desc — UNCHANGED, keeps existing callers/tests green).
sort_column = _JOB_SORT_COLUMNS.get(sort_by) if sort_by else None
if sort_column is not None:
order_by = sort_column.desc() if sort_order == "desc" else sort_column.asc()
else:
order_by = Job.created_at.desc()

# Apply pagination
offset = (page - 1) * page_size
stmt = stmt.order_by(Job.created_at.desc()).offset(offset).limit(page_size)
stmt = stmt.order_by(order_by).offset(offset).limit(page_size)

# Execute query
result = await db.execute(stmt)
Expand Down
108 changes: 107 additions & 1 deletion app/features/jobs/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,120 @@
"""Test fixtures for jobs module."""

import uuid
from collections.abc import AsyncGenerator
from datetime import UTC, datetime

import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from app.features.jobs.models import JobStatus, JobType
from app.core.config import get_settings
from app.core.database import get_db
from app.features.jobs.models import Job, JobStatus, JobType
from app.features.jobs.schemas import (
JobCreate,
JobResponse,
)
from app.main import app

# =============================================================================
# Database Fixtures for Integration Tests
# =============================================================================


@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""Create async database session for integration tests.

Provides a session and cleans up test data (jobs whose job_id starts
with "test"). Requires PostgreSQL to be running (docker-compose up -d).
"""
settings = get_settings()
engine = create_async_engine(settings.database_url, echo=False)

async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)

async with async_session_maker() as session:
try:
yield session
finally:
# Job has no model_type column for a cleanup key — key on the
# "test" job_id prefix every fixture/test uses.
await session.execute(delete(Job).where(Job.job_id.like("test%")))
await session.commit()

await engine.dispose()


@pytest.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
"""Create test client with database dependency override."""

async def override_get_db() -> AsyncGenerator[AsyncSession, None]:
try:
yield db_session
await db_session.commit()
except Exception:
await db_session.rollback()
raise

app.dependency_overrides[get_db] = override_get_db

async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac

app.dependency_overrides.clear()


@pytest.fixture
async def sample_jobs_multi(db_session: AsyncSession) -> list[Job]:
"""Insert three jobs with distinct job_type / status / created_at.

Drives the list-endpoint sort tests. Every job_id starts with "test"
so the db_session cleanup removes them. created_at is set explicitly
(overriding the server_default) so created_at sorting is deterministic.
"""
jobs = [
Job(
job_id=f"test{uuid.uuid4().hex[:28]}",
job_type=JobType.TRAIN.value,
status=JobStatus.PENDING.value,
params={},
created_at=datetime(2024, 1, 1, tzinfo=UTC),
),
Job(
job_id=f"test{uuid.uuid4().hex[:28]}",
job_type=JobType.PREDICT.value,
status=JobStatus.RUNNING.value,
params={},
created_at=datetime(2024, 1, 2, tzinfo=UTC),
),
Job(
job_id=f"test{uuid.uuid4().hex[:28]}",
job_type=JobType.BACKTEST.value,
status=JobStatus.COMPLETED.value,
params={},
created_at=datetime(2024, 1, 3, tzinfo=UTC),
),
]
db_session.add_all(jobs)
await db_session.commit()
for job in jobs:
await db_session.refresh(job)
return jobs


# =============================================================================
# Unit Test Fixtures
# =============================================================================


@pytest.fixture
Expand Down
115 changes: 115 additions & 0 deletions app/features/jobs/tests/test_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Integration tests for jobs API routes.

These tests require PostgreSQL to be running (docker-compose up -d).
Run with: pytest app/features/jobs/tests/ -v -m integration
"""

import uuid
from typing import Any

import pytest
from httpx import AsyncClient

from app.features.jobs.models import Job

pytestmark = pytest.mark.integration


class TestListJobsEndpoint:
"""Tests for GET /jobs endpoint."""

async def test_list_jobs_ok(self, client: AsyncClient) -> None:
"""GET /jobs returns 200 with the paginated envelope."""
response = await client.get("/jobs")
assert response.status_code == 200
data = response.json()
assert "jobs" in data
assert data["page"] == 1
assert data["page_size"] == 20

async def test_list_jobs_returns_seeded_rows(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""Seeded jobs appear in the listing."""
response = await client.get("/jobs?page_size=100")
assert response.status_code == 200
data = response.json()
assert data["total"] >= 3
listed_ids = {j["job_id"] for j in data["jobs"]}
assert {job.job_id for job in sample_jobs_multi} <= listed_ids


class TestListJobsSortEndpoint:
"""Tests for sort_by / sort_order on GET /jobs."""

@staticmethod
def _test_job_types(payload: dict[str, Any]) -> list[str]:
"""Job types of the test-prefixed jobs, in response order."""
return [j["job_type"] for j in payload["jobs"] if str(j["job_id"]).startswith("test")]

async def test_sort_by_job_type_asc(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""sort_by=job_type&sort_order=asc orders jobs ascending."""
response = await client.get("/jobs?sort_by=job_type&sort_order=asc&page_size=100")
assert response.status_code == 200
assert self._test_job_types(response.json()) == ["backtest", "predict", "train"]

async def test_sort_by_job_type_desc(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""sort_by=job_type&sort_order=desc orders jobs descending."""
response = await client.get("/jobs?sort_by=job_type&sort_order=desc&page_size=100")
assert response.status_code == 200
assert self._test_job_types(response.json()) == ["train", "predict", "backtest"]

async def test_sort_by_status_asc(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""sort_by=status&sort_order=asc orders jobs by status value."""
response = await client.get("/jobs?sort_by=status&sort_order=asc&page_size=100")
assert response.status_code == 200
# status asc: completed < pending < running -> backtest, train, predict
assert self._test_job_types(response.json()) == ["backtest", "train", "predict"]

async def test_sort_by_created_at_desc(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""sort_by=created_at&sort_order=desc returns newest first."""
response = await client.get("/jobs?sort_by=created_at&sort_order=desc&page_size=100")
assert response.status_code == 200
# created_at 2024-01-03 > 01-02 > 01-01 -> backtest, predict, train
assert self._test_job_types(response.json()) == ["backtest", "predict", "train"]

async def test_unknown_sort_by_falls_back_to_default(
self, client: AsyncClient, sample_jobs_multi: list[Job]
) -> None:
"""An unknown sort_by uses the default order, never errors."""
default = await client.get("/jobs?page_size=100")
unknown = await client.get("/jobs?sort_by=params&page_size=100")
assert default.status_code == 200
assert unknown.status_code == 200
default_ids = [j["job_id"] for j in default.json()["jobs"]]
unknown_ids = [j["job_id"] for j in unknown.json()["jobs"]]
assert unknown_ids == default_ids

async def test_invalid_sort_order_rejected(self, client: AsyncClient) -> None:
"""sort_order outside {asc,desc} is rejected with 422 via the Query regex."""
response = await client.get("/jobs?sort_order=sideways")
assert response.status_code == 422


class TestGetJobEndpoint:
"""Tests for GET /jobs/{job_id} endpoint."""

async def test_get_job_success(self, client: AsyncClient, sample_jobs_multi: list[Job]) -> None:
"""GET /jobs/{job_id} returns the job."""
job_id = sample_jobs_multi[0].job_id
response = await client.get(f"/jobs/{job_id}")
assert response.status_code == 200
assert response.json()["job_id"] == job_id

async def test_get_job_not_found(self, client: AsyncClient) -> None:
"""GET /jobs/{job_id} returns 404 for an unknown job."""
response = await client.get(f"/jobs/test{uuid.uuid4().hex[:28]}")
assert response.status_code == 404
Loading