Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api.civicpatch.org/AI_CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mise run psql
- Every migration must be wrapped in `BEGIN` / `COMMIT`
- Down migrations must exactly reverse the up migration — test that the round-trip is clean
- Create a new migration file whenever you add, rename, or drop a column, table, or index — never edit an existing migration
- **After every migration, update the Mermaid schema diagram in `README.md`** — the diagram must always reflect the current state of the database

## Background tasks

Expand Down
81 changes: 79 additions & 2 deletions api.civicpatch.org/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,83 @@ An hourly background task also reconciles DB state against GitHub's open-PR list

To register the webhook in GitHub, set the payload URL to `https://api.civicpatch.org/webhooks/github` and set the secret to match `GITHUB_WEBHOOK_SECRET`.

## Code standards
## Database schema

> **Keep this diagram in sync with migrations.** Whenever a migration adds, renames, or drops a column or table, update the chart below.

```mermaid
erDiagram
jurisdictions {
text jurisdiction_ocdid PK
text state
text status
jsonb data
text file_path
text git_commit
timestamp updated_at
}

requests {
uuid id PK
text status
text request_type
text jurisdiction_ocdid FK
jsonb arguments_json
jsonb result_data
jsonb review_json
timestamptz created_at
timestamptz updated_at
}

jobs {
int id PK
uuid request_id FK
text requested_by_provider
text requested_by_provider_user_id
int progress
text status
text server_source
text run_url
text pull_request_review_state_to_delete
timestamptz created_at
timestamptz updated_at
}

pull_requests {
uuid id PK
uuid request_id FK
int pr_number
text url
text status
text review_state
timestamptz closed_at
timestamptz merged_at
timestamptz created_at
timestamptz updated_at
}

people {
uuid id PK
text jurisdiction_ocdid FK
jsonb data
text file_path
text git_commit
text status
timestamp updated_at
}

jurisdictions ||--o{ requests : "jurisdiction_ocdid"
jurisdictions ||--o{ people : "jurisdiction_ocdid"
requests ||--o| jobs : "request_id"
requests ||--o| pull_requests : "request_id"
```

**Notes:**
- `requests.result_data` — array of scraped `Official` objects returned by the civicpatch pipeline
- `requests.review_json` — pipeline review output (`issues`, `warnings`, etc.)
- `people.data` — full `Official` JSONB blob; the canonical record for a jurisdiction's current officials
- `jurisdictions.data` — jurisdiction metadata (name, geoid, etc.)
- `jobs` and `pull_requests` each have a unique constraint on `request_id` (one-to-one with `requests`)
- `people` has no FK to `requests` — it is updated independently when a PR is merged
- `people` has no FK to `requests` — it is updated independently when a PR is merged

See [AI_CONTEXT.md](./AI_CONTEXT.md).
23 changes: 23 additions & 0 deletions api.civicpatch.org/src/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,29 @@ async def get_requests_for_export(
]


async def get_people_by_state(state: str) -> list[dict]:
state_prefix = f"ocd-jurisdiction/country:us/state:{state.lower()}%"
pool = await get_pool()
rows = []
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT jurisdiction_ocdid, data
FROM people
WHERE jurisdiction_ocdid LIKE %s
AND status = 'current'
ORDER BY jurisdiction_ocdid
""",
(state_prefix,),
)
while True:
batch = await cur.fetchmany(200)
if not batch:
break
rows.extend(batch)
return [{"jurisdiction_ocdid": r[0], **r[1]} for r in rows]


async def deactivate_jurisdictions_by_ocdids(ocdids: List[str]):
pool = await get_pool()
async with pool.connection() as conn:
Expand Down
214 changes: 41 additions & 173 deletions api.civicpatch.org/src/routers/api/requests.py
Original file line number Diff line number Diff line change
@@ -1,157 +1,28 @@
import asyncio
import csv
import io
import logging
import re
from datetime import date
from typing import Any, Optional
import database.database
import database.people
import database.requests
import services.github.github_api_service as github_service
import shared.utils.id_utils
from fastapi import APIRouter, Depends, Query
import services.csv_service as csv_service
import services.requests_export_service as requests_export_service
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from schemas.common import Identity, Role, RouteCategory
from utils.auth_utils import require_route_access

logger = logging.getLogger(__name__)

_DIFF_FIELDS = [
"name",
"office.name",
"office.division_ocdid",
"phones",
"emails",
"urls",
"start_date",
"end_date",
]

CSV_FIELDNAMES = [
"request_id",
"jurisdiction_ocdid",
"created_at",
"review_issues",
"diff_status",
"changed_fields",
"id",
"name",
"other_names",
"office_name",
"office_division_ocdid",
"phones",
"emails",
"urls",
"source_urls",
"start_date",
"end_date",
"updated_at",
]


def _get_field(official: dict, key: str) -> str:
if key == "office.name":
val = (official.get("office") or {}).get("name", "")
elif key == "office.division_ocdid":
val = (official.get("office") or {}).get("division_ocdid", "")
else:
val = official.get(key, "")
if isinstance(val, list):
return " | ".join(str(v) for v in val)
return str(val or "")


def _normalize(val: str) -> str:
return val.lower().strip()


def _changed_field_names(existing: dict, pr: dict) -> list[str]:
return [k for k in _DIFF_FIELDS if _normalize(_get_field(existing, k)) != _normalize(_get_field(pr, k))]


def _flatten_official(
request_id: str,
jurisdiction_ocdid: str,
created_at: str | None,
review_issues: str,
official: dict,
diff_status: str,
changed_fields: list[str],
) -> dict:
office = official.get("office") or {}
return {
"request_id": request_id,
"jurisdiction_ocdid": jurisdiction_ocdid,
"created_at": created_at or "",
"review_issues": review_issues,
"diff_status": diff_status,
"changed_fields": " | ".join(changed_fields),
"id": official.get("id", ""),
"name": official.get("name", ""),
"other_names": " | ".join(official.get("other_names") or []),
"office_name": office.get("name", ""),
"office_division_ocdid": office.get("division_ocdid", ""),
"phones": " | ".join(official.get("phones") or []),
"emails": " | ".join(official.get("emails") or []),
"urls": " | ".join(official.get("urls") or []),
"source_urls": " | ".join(official.get("source_urls") or []),
"start_date": official.get("start_date") or "",
"end_date": official.get("end_date") or "",
"updated_at": official.get("updated_at") or "",
}


def _request_to_rows(
request: dict,
existing_people: list[dict],
include_unchanged: bool,
) -> list[dict]:
review_issues = " | ".join((request["review_json"] or {}).get("issues") or [])
result_data = request["result_data"] or []

existing_map = {p["id"]: p for p in existing_people if p.get("id")}
pr_map = {p["id"]: p for p in result_data if p.get("id")}
all_ids = set(existing_map) | set(pr_map)

rows = []
for oid in all_ids:
existing = existing_map.get(oid)
pr = pr_map.get(oid)

if not existing:
diff_status = "added"
official = pr
changed = []
elif not pr:
diff_status = "removed"
official = existing
changed = []
else:
changed = _changed_field_names(existing, pr)
diff_status = "changed" if changed else "unchanged"
official = pr

if diff_status == "unchanged" and not include_unchanged:
continue

rows.append(_flatten_official(
request["request_id"],
request["jurisdiction_ocdid"],
request["created_at"],
review_issues,
official,
diff_status,
changed,
))

return rows
_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")


class CreateRegisterRequest(BaseModel):
request_id: str
arguments: dict


class PostResultRequest(BaseModel):
pull_request_url: Optional[str] = None
data: Optional[Any] = None
Expand All @@ -161,11 +32,11 @@ def get_router(api_key_header):
router = APIRouter()

@router.post(
"/register",
summary="Register a new request",
description="Register a new request in the system.",
include_in_schema=False,
)
"/register",
summary="Register a new request",
description="Register a new request in the system.",
include_in_schema=False,
)
async def register_people_job_endpoint(
request: CreateRegisterRequest,
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
Expand Down Expand Up @@ -228,46 +99,43 @@ async def export_requests_csv(
include_unchanged: bool = Query(False),
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
):
requests_data = await database.database.get_requests_for_export(state, from_date, to_date)

uncached = [r for r in requests_data if not r["result_data"]]
if uncached:
async def _fetch_result_data(r):
folder = shared.utils.id_utils.jurisdiction_ocdid_to_folder(r["jurisdiction_ocdid"])
data = await github_service.get_pull_request_file_yaml(
r["request_id"], r["jurisdiction_ocdid"], f"data/{folder}.yml"
)
r["result_data"] = data or []
if not re.fullmatch(r"[a-z]{2}", state.lower()):
raise HTTPException(status_code=400, detail="state must be a two-letter code, e.g. 'tx'")
state = state.lower()
if from_date and not _DATE_RE.match(from_date):
raise HTTPException(status_code=400, detail="from_date must be ISO format: YYYY-MM-DD")
if to_date and not _DATE_RE.match(to_date):
raise HTTPException(status_code=400, detail="to_date must be ISO format: YYYY-MM-DD")

await asyncio.gather(*[_fetch_result_data(r) for r in uncached])
requests_data, existing_by_ocdid = await requests_export_service.fetch_export_data(state, from_date, to_date)
rows = requests_export_service.get_export_rows(requests_data, existing_by_ocdid, include_unchanged)

unique_ocdids = list({r["jurisdiction_ocdid"] for r in requests_data})
existing_by_ocdid: dict[str, list] = {}
if unique_ocdids:
results = await asyncio.gather(
*[database.people.get_people_by_jurisdiction_ocdid(ocdid) for ocdid in unique_ocdids]
)
existing_by_ocdid = dict(zip(unique_ocdids, results))
filename = f"requests_export_{state}_{date.today().isoformat()}.csv"
return StreamingResponse(
csv_service.generate_csv(rows, requests_export_service.CSV_FIELDNAMES),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)

def _generate():
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
writer.writeheader()
yield buf.getvalue()
@router.get(
"/people-export.csv",
include_in_schema=False,
)
async def export_people_csv(
state: str = Query(..., description="Two-letter state code, e.g. 'tx'"),
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
):
if not re.fullmatch(r"[a-z]{2}", state.lower()):
raise HTTPException(status_code=400, detail="state must be a two-letter code, e.g. 'tx'")
state = state.lower()

for request in requests_data:
existing = existing_by_ocdid.get(request["jurisdiction_ocdid"], [])
for row in _request_to_rows(request, existing, include_unchanged):
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
writer.writerow(row)
yield buf.getvalue()
rows = await requests_export_service.fetch_people_export_rows(state)

filename = f"requests_export_{state}_{date.today().isoformat()}.csv"
filename = f"people_export_{state}_{date.today().isoformat()}.csv"
return StreamingResponse(
_generate(),
csv_service.generate_csv(rows, requests_export_service.PEOPLE_CSV_FIELDNAMES),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)

return router
Loading
Loading