Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api.civicpatch.org/AI_CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ tests/
- Services are thin — they call DB or external APIs and return typed results
- Side effects (network calls, DB writes) live here, not in routers

## Database (dev)

The dev database is exposed on `127.0.0.1:6000`. Connect with:

```sh
psql postgres://civicpatch:development_password@127.0.0.1:6000/development_db
# or via mise:
mise run psql
```

## Migrations

- Migration files live in `database_operations/migrations/` and are named `NNN_description.up.sql` / `NNN_description.down.sql`
Expand Down
47 changes: 47 additions & 0 deletions api.civicpatch.org/src/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,53 @@ async def bulk_close_stale_prs(request_ids: List[str]):
)


async def get_requests_for_export(
state: str,
from_date: str | None,
to_date: str | None,
) -> list[dict]:
state_prefix = f"ocd-jurisdiction/country:us/state:{state.lower()}%"
params: list = [state_prefix]
date_clauses = ""
if from_date:
params.append(from_date)
date_clauses += f" AND r.created_at >= %s"
if to_date:
params.append(to_date)
date_clauses += f" AND r.created_at <= %s"

pool = await get_pool()
rows = []
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
f"""
SELECT r.id, r.jurisdiction_ocdid, r.created_at, r.result_data, r.review_json
FROM requests r
JOIN pull_requests pr ON pr.request_id = r.id
WHERE r.jurisdiction_ocdid LIKE %s
AND pr.status = 'open'
{date_clauses}
ORDER BY r.created_at DESC
""",
params,
)
while True:
batch = await cur.fetchmany(200)
if not batch:
break
rows.extend(batch)
return [
{
"request_id": str(r[0]),
"jurisdiction_ocdid": r[1],
"created_at": r[2].isoformat() if r[2] else None,
"result_data": r[3] or [],
"review_json": r[4] or {},
}
for r in rows
]


async def deactivate_jurisdictions_by_ocdids(ocdids: List[str]):
pool = await get_pool()
async with pool.connection() as conn:
Expand Down
14 changes: 10 additions & 4 deletions api.civicpatch.org/src/database/review_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,14 @@ async def navigate_to_entry(
# Frontier: check goal, then allocate the next card at this position
await cur.execute(
"""
SELECT COUNT(*) FILTER (WHERE status = 'resolved') AS resolved
SELECT COUNT(*) FILTER (
WHERE status = 'resolved'
AND (created_at AT TIME ZONE %s)::date = (NOW() AT TIME ZONE %s)::date
) AS resolved
FROM review_session_entries
WHERE review_session_id = %s
""",
(review_session_id,),
(STREAK_TIMEZONE, STREAK_TIMEZONE, review_session_id),
)
counts_row = await cur.fetchone()
if counts_row.resolved >= session_row.daily_goal:
Expand All @@ -233,11 +236,14 @@ async def navigate_to_entry(

await cur.execute(
"""
SELECT COUNT(*) FILTER (WHERE status = 'resolved') AS resolved_count
SELECT COUNT(*) FILTER (
WHERE status = 'resolved'
AND (created_at AT TIME ZONE %s)::date = (NOW() AT TIME ZONE %s)::date
) AS resolved_count
FROM review_session_entries
WHERE review_session_id = %s
""",
(review_session_id,),
(STREAK_TIMEZONE, STREAK_TIMEZONE, review_session_id),
)
counts = await cur.fetchone()

Expand Down
7 changes: 7 additions & 0 deletions api.civicpatch.org/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import routers.api.jurisdictions as api_jurisdictions_router
import routers.api.people as api_people_router
import routers.api.pull_requests as api_pull_requests_router
import routers.api.requests as api_requests_router
import routers.api.review_sessions as api_review_sessions_router
import routers.api.user as api_user_router
import services.github.pull_request_sync_service
Expand Down Expand Up @@ -205,6 +206,12 @@ async def home(request: Request, user: Identity = Depends(get_optional_user)):
dependencies=[Depends(require_route_access(RouteCategory.PUBLIC))],
)

app.include_router(
api_requests_router.get_router(api_key_header),
prefix="/api/v1/requests",
tags=["requests"],
)

app.include_router(
api_review_sessions_router.get_router(),
prefix="/api/v1/review-sessions",
Expand Down
224 changes: 208 additions & 16 deletions api.civicpatch.org/src/routers/api/requests.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,152 @@
import asyncio
import json
import csv
import io
import logging
import time
from datetime import date
from typing import Any, Optional
import database.database
import database.people
import database.requests
import services.github.github_api_service as github_service
import shared.utils.id_utils
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from schemas.common import Identity, Role, RouteCategory
from utils.auth_utils import require_route_access

logger = logging.getLogger(__name__)

_DIFF_FIELDS = [
"name",
"office.name",
"office.division_ocdid",
"phones",
"emails",
"urls",
"start_date",
"end_date",
]

CSV_FIELDNAMES = [
"request_id",
"jurisdiction_ocdid",
"created_at",
"review_issues",
"diff_status",
"changed_fields",
"id",
"name",
"other_names",
"office_name",
"office_division_ocdid",
"phones",
"emails",
"urls",
"source_urls",
"start_date",
"end_date",
"updated_at",
]


def _get_field(official: dict, key: str) -> str:
if key == "office.name":
val = (official.get("office") or {}).get("name", "")
elif key == "office.division_ocdid":
val = (official.get("office") or {}).get("division_ocdid", "")
else:
val = official.get(key, "")
if isinstance(val, list):
return " | ".join(str(v) for v in val)
return str(val or "")


def _normalize(val: str) -> str:
return val.lower().strip()


def _changed_field_names(existing: dict, pr: dict) -> list[str]:
return [k for k in _DIFF_FIELDS if _normalize(_get_field(existing, k)) != _normalize(_get_field(pr, k))]


def _flatten_official(
request_id: str,
jurisdiction_ocdid: str,
created_at: str | None,
review_issues: str,
official: dict,
diff_status: str,
changed_fields: list[str],
) -> dict:
office = official.get("office") or {}
return {
"request_id": request_id,
"jurisdiction_ocdid": jurisdiction_ocdid,
"created_at": created_at or "",
"review_issues": review_issues,
"diff_status": diff_status,
"changed_fields": " | ".join(changed_fields),
"id": official.get("id", ""),
"name": official.get("name", ""),
"other_names": " | ".join(official.get("other_names") or []),
"office_name": office.get("name", ""),
"office_division_ocdid": office.get("division_ocdid", ""),
"phones": " | ".join(official.get("phones") or []),
"emails": " | ".join(official.get("emails") or []),
"urls": " | ".join(official.get("urls") or []),
"source_urls": " | ".join(official.get("source_urls") or []),
"start_date": official.get("start_date") or "",
"end_date": official.get("end_date") or "",
"updated_at": official.get("updated_at") or "",
}


def _request_to_rows(
request: dict,
existing_people: list[dict],
include_unchanged: bool,
) -> list[dict]:
review_issues = " | ".join((request["review_json"] or {}).get("issues") or [])
result_data = request["result_data"] or []

existing_map = {p["id"]: p for p in existing_people if p.get("id")}
pr_map = {p["id"]: p for p in result_data if p.get("id")}
all_ids = set(existing_map) | set(pr_map)

rows = []
for oid in all_ids:
existing = existing_map.get(oid)
pr = pr_map.get(oid)

if not existing:
diff_status = "added"
official = pr
changed = []
elif not pr:
diff_status = "removed"
official = existing
changed = []
else:
changed = _changed_field_names(existing, pr)
diff_status = "changed" if changed else "unchanged"
official = pr

if diff_status == "unchanged" and not include_unchanged:
continue

rows.append(_flatten_official(
request["request_id"],
request["jurisdiction_ocdid"],
request["created_at"],
review_issues,
official,
diff_status,
changed,
))

return rows


class CreateRegisterRequest(BaseModel):
request_id: str
Expand All @@ -27,20 +166,20 @@ def get_router(api_key_header):
description="Register a new request in the system.",
include_in_schema=False,
)
async def register_people_job_endpoint(
request: CreateRegisterRequest,
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
):
print(
f"Registering request: {request.request_id} by user {user.provider_user_id} from provider {user.provider}"
)
_response = await database.requests.register(
requested_by_user_id=user.id,
request_id=request.request_id,
request_type="people",
arguments_json=request.arguments,
)
return {"request_id": request.request_id, "status": "pending"}
async def register_people_job_endpoint(
request: CreateRegisterRequest,
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
):
print(
f"Registering request: {request.request_id} by user {user.provider_user_id} from provider {user.provider}"
)
_response = await database.requests.register(
requested_by_user_id=user.id,
request_id=request.request_id,
request_type="people",
arguments_json=request.arguments,
)
return {"request_id": request.request_id, "status": "pending"}

@router.post(
"/{request_id}/result",
Expand Down Expand Up @@ -78,4 +217,57 @@ async def post_job_result_endpoint(

return {"request_id": request_id, "errors": errors}

@router.get(
"/export.csv",
include_in_schema=False,
)
async def export_requests_csv(
state: str = Query(..., description="Two-letter state code, e.g. 'tx'"),
from_date: Optional[str] = Query(None),
to_date: Optional[str] = Query(None),
include_unchanged: bool = Query(False),
user: Identity = Depends(require_route_access(RouteCategory.TEAM_REQUIRED, [Role.MAINTAINERS])),
):
requests_data = await database.database.get_requests_for_export(state, from_date, to_date)

uncached = [r for r in requests_data if not r["result_data"]]
if uncached:
async def _fetch_result_data(r):
folder = shared.utils.id_utils.jurisdiction_ocdid_to_folder(r["jurisdiction_ocdid"])
data = await github_service.get_pull_request_file_yaml(
r["request_id"], r["jurisdiction_ocdid"], f"data/{folder}.yml"
)
r["result_data"] = data or []

await asyncio.gather(*[_fetch_result_data(r) for r in uncached])

unique_ocdids = list({r["jurisdiction_ocdid"] for r in requests_data})
existing_by_ocdid: dict[str, list] = {}
if unique_ocdids:
results = await asyncio.gather(
*[database.people.get_people_by_jurisdiction_ocdid(ocdid) for ocdid in unique_ocdids]
)
existing_by_ocdid = dict(zip(unique_ocdids, results))

def _generate():
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
writer.writeheader()
yield buf.getvalue()

for request in requests_data:
existing = existing_by_ocdid.get(request["jurisdiction_ocdid"], [])
for row in _request_to_rows(request, existing, include_unchanged):
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
writer.writerow(row)
yield buf.getvalue()

filename = f"requests_export_{state}_{date.today().isoformat()}.csv"
return StreamingResponse(
_generate(),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)

return router
Loading
Loading