diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e4a534..dd305b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,33 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.6.0] - 2026-04-10 + +### Added + +- **FHIR-to-OMOP Concept Resolver** (`client.fhir`): Translate FHIR coded values into OMOP standard concepts, CDM target tables, and optional Phoebe recommendations in a single API call. + - `resolve()`: Resolve a single FHIR `Coding` (system URI + code) or text-only input via semantic search fallback. Returns the standard concept, target CDM table, domain alignment check, and optional mapping quality signal. + - `resolve_batch()`: Batch-resolve up to 100 FHIR codings per request with inline per-item error reporting. Failed items do not fail the batch. + - `resolve_codeable_concept()`: Resolve a FHIR `CodeableConcept` with multiple codings. Automatically picks the best match per OHDSI vocabulary preference (SNOMED > RxNorm > LOINC > CVX > ICD-10). Falls back to the `text` field via semantic search when no coding resolves. +- New TypedDict types for FHIR resolver: `FhirResolveResult`, `FhirResolution`, `FhirBatchResult`, `FhirBatchSummary`, `FhirCodeableConceptResult`, `ResolvedConcept`, `RecommendedConceptOutput`. +- Both sync (`OMOPHub`) and async (`AsyncOMOPHub`) clients support FHIR resolver methods via `client.fhir.*`. + +### Changed + +- **Extracted shared response parsing** (`_request.py`): The duplicated JSON decode / error-handling / rate-limit-retry logic across `Request._parse_response`, `Request._parse_response_raw`, `AsyncRequest._parse_response`, and `AsyncRequest._parse_response_raw` (4 copies of ~50 lines each) is now a single `_parse_and_raise()` module-level function. All four methods delegate to it, eliminating the risk of divergence bugs. +- **Fixed `paginate_async` signature** (`_pagination.py`): The type hint now correctly declares `Callable[[int, int], Awaitable[tuple[...]]]` instead of `Callable[[int, int], tuple[...]]`, and the runtime `hasattr(__await__)` duck-typing hack has been replaced with a clean `await`. +- **`AsyncSearch.semantic_iter`** now delegates to `paginate_async` instead of manually reimplementing the pagination loop, matching the sync `semantic_iter` which already uses `paginate_sync`. + +### Fixed + +- Python prerequisite in CONTRIBUTING.md corrected from `3.9+` to `3.10+` (matching `pyproject.toml`). +- `__all__` in `types/__init__.py` sorted per RUF022. + ## [1.5.1] - 2026-04-08 ### Fixed -- **Rate-limit handling**: HTTP client now respects the `Retry-After` header on `429 Too Many Requests` responses and applies exponential backoff with jitter on retries. Previous versions retried only on `502/503/504` with a fixed `2^attempt * 0.5s` schedule and did not back off on `429` at all, so a client that hit the server's rate limit at high volume could burn through thousands of failed requests in a tight loop. The new behavior: +- **Rate-limit handling**: HTTP client now respects the `Retry-After` header on `429 Too Many Requests` responses and applies exponential backoff with jitter on retries. Previous versions retried only on `502/503/504` with a fixed `2^attempt * 0.5s` schedule and did not back off on `429` at all, so a client that hit the server's rate limit at high volume could burn through thousands of failed requests in a tight loop. The client now honors `Retry-After`, uses exponential backoff with jitter, respects the configured `max_retries`, and caps backoff at 30 seconds. - Updated `examples/search_concepts.py` to reflect current API. ## [1.5.0] - 2026-03-26 @@ -117,7 +139,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Full type hints and PEP 561 compliance - HTTP/2 support via httpx -[Unreleased]: https://github.com/omopHub/omophub-python/compare/v1.4.1...HEAD +[Unreleased]: https://github.com/omopHub/omophub-python/compare/v1.6.0...HEAD +[1.6.0]: https://github.com/omopHub/omophub-python/compare/v1.5.1...v1.6.0 +[1.5.1]: https://github.com/omopHub/omophub-python/compare/v1.5.0...v1.5.1 +[1.5.0]: https://github.com/omopHub/omophub-python/compare/v1.4.1...v1.5.0 [1.4.1]: https://github.com/omopHub/omophub-python/compare/v1.4.0...v1.4.1 [1.4.0]: https://github.com/omopHub/omophub-python/compare/v1.3.1...v1.4.0 [1.3.1]: https://github.com/omopHub/omophub-python/compare/v1.3.0...v1.3.1 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index dba1590..4957456 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,7 +53,7 @@ Feature requests are welcome! Please open an issue with: ### Prerequisites -- Python 3.9+ +- Python 3.10+ - pip ### Installation diff --git a/README.md b/README.md index 9cff18a..339bcae 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,46 @@ mappings = client.mappings.get_by_code("ICD10CM", "E11.9", target_vocabulary="SN ancestors = client.hierarchy.ancestors(201826, max_levels=3) ``` +## FHIR-to-OMOP Resolution + +Resolve FHIR coded values to OMOP standard concepts in one call: + +```python +# Single FHIR Coding → OMOP concept + CDM target table +result = client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + resource_type="Condition", +) +print(result["resolution"]["target_table"]) # "condition_occurrence" +print(result["resolution"]["mapping_type"]) # "direct" + +# ICD-10-CM → traverses "Maps to" automatically +result = client.fhir.resolve( + system="http://hl7.org/fhir/sid/icd-10-cm", + code="E11.9", +) +print(result["resolution"]["standard_concept"]["vocabulary_id"]) # "SNOMED" + +# Batch resolve up to 100 codings +batch = client.fhir.resolve_batch([ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://loinc.org", "code": "2339-0"}, + {"system": "http://www.nlm.nih.gov/research/umls/rxnorm", "code": "197696"}, +]) +print(f"Resolved {batch['summary']['resolved']}/{batch['summary']['total']}") + +# CodeableConcept with vocabulary preference (SNOMED wins over ICD-10) +result = client.fhir.resolve_codeable_concept( + coding=[ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + ], + resource_type="Condition", +) +print(result["best_match"]["resolution"]["source_concept"]["vocabulary_id"]) # "SNOMED" +``` + ## Semantic Search Use natural language queries to find concepts using neural embeddings: @@ -200,6 +240,7 @@ suggestions = client.concepts.suggest("diab", vocabulary_ids=["SNOMED"], page_si | `mappings` | Cross-vocabulary mappings | `get()`, `map()` | | `vocabularies` | Vocabulary metadata | `list()`, `get()`, `stats()` | | `domains` | Domain information | `list()`, `get()`, `concepts()` | +| `fhir` | FHIR-to-OMOP resolution | `resolve()`, `resolve_batch()`, `resolve_codeable_concept()` | ## Configuration diff --git a/src/omophub/_client.py b/src/omophub/_client.py index b9e1d0f..338e857 100644 --- a/src/omophub/_client.py +++ b/src/omophub/_client.py @@ -17,6 +17,7 @@ from ._request import AsyncRequest, Request from .resources.concepts import AsyncConcepts, Concepts from .resources.domains import AsyncDomains, Domains +from .resources.fhir import AsyncFhir, Fhir from .resources.hierarchy import AsyncHierarchy, Hierarchy from .resources.mappings import AsyncMappings, Mappings from .resources.relationships import AsyncRelationships, Relationships @@ -97,6 +98,14 @@ def __init__( self._mappings: Mappings | None = None self._vocabularies: Vocabularies | None = None self._domains: Domains | None = None + self._fhir: Fhir | None = None + + @property + def fhir(self) -> Fhir: + """Access the FHIR resolver resource.""" + if self._fhir is None: + self._fhir = Fhir(self._request) + return self._fhir @property def concepts(self) -> Concepts: @@ -228,6 +237,14 @@ def __init__( self._mappings: AsyncMappings | None = None self._vocabularies: AsyncVocabularies | None = None self._domains: AsyncDomains | None = None + self._fhir: AsyncFhir | None = None + + @property + def fhir(self) -> AsyncFhir: + """Access the FHIR resolver resource.""" + if self._fhir is None: + self._fhir = AsyncFhir(self._request) + return self._fhir @property def concepts(self) -> AsyncConcepts: diff --git a/src/omophub/_pagination.py b/src/omophub/_pagination.py index ba80bbb..a81aade 100644 --- a/src/omophub/_pagination.py +++ b/src/omophub/_pagination.py @@ -6,7 +6,7 @@ from urllib.parse import urlencode if TYPE_CHECKING: - from collections.abc import AsyncIterator, Callable, Iterator + from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from ._types import PaginationMeta @@ -106,7 +106,7 @@ def paginate_sync( async def paginate_async( - fetch_page: Callable[[int, int], tuple[list[T], PaginationMeta | None]], + fetch_page: Callable[[int, int], Awaitable[tuple[list[T], PaginationMeta | None]]], page_size: int = DEFAULT_PAGE_SIZE, ) -> AsyncIterator[T]: """Create an async iterator that auto-paginates through results. @@ -121,12 +121,7 @@ async def paginate_async( page = 1 while True: - # Note: fetch_page should be an async function - result = fetch_page(page, page_size) - if hasattr(result, "__await__"): - items, meta = await result # type: ignore - else: - items, meta = result + items, meta = await fetch_page(page, page_size) for item in items: yield item diff --git a/src/omophub/_request.py b/src/omophub/_request.py index 61b3cf9..4649bc0 100644 --- a/src/omophub/_request.py +++ b/src/omophub/_request.py @@ -17,6 +17,66 @@ T = TypeVar("T") +def _parse_and_raise( + content: bytes, + status_code: int, + headers: Mapping[str, str], +) -> dict[str, Any]: + """Parse JSON response body and raise on HTTP errors. + + Shared by both sync and async request classes to avoid duplicating + the JSON-decode, error-extraction, and rate-limit-retry logic. + + Returns: + The parsed JSON dict (caller decides whether to unwrap ``data``). + + Raises: + OMOPHubError: On invalid JSON from a successful response. + APIError / RateLimitError / etc.: On HTTP error status codes. + """ + request_id = headers.get("X-Request-Id") or headers.get("x-request-id") + + try: + data = json.loads(content) if content else {} + except json.JSONDecodeError as exc: + if status_code >= 400: + raise_for_status( + status_code, + f"Request failed with status {status_code}", + request_id=request_id, + ) + raise OMOPHubError( + f"Invalid JSON response: {content[:200].decode(errors='replace')}" + ) from exc + + if status_code >= 400: + error_response: ErrorResponse = data # type: ignore[assignment] + error = error_response.get("error", {}) + message = error.get("message", f"Request failed with status {status_code}") + error_code = error.get("code") + details = error.get("details") + + retry_after = None + if status_code == 429: + retry_after_header = headers.get("Retry-After") or headers.get( + "retry-after" + ) + if retry_after_header: + with contextlib.suppress(ValueError): + retry_after = int(retry_after_header) + + raise_for_status( + status_code, + message, + request_id=request_id, + error_code=error_code, + details=details, + retry_after=retry_after, + ) + + return data + + class Request(Generic[T]): """Handles API request execution and response parsing.""" @@ -50,50 +110,8 @@ def _parse_response( status_code: int, headers: Mapping[str, str], ) -> T: - """Parse API response and handle errors.""" - request_id = headers.get("X-Request-Id") or headers.get("x-request-id") - - try: - data = json.loads(content) if content else {} - except json.JSONDecodeError as exc: - if status_code >= 400: - raise_for_status( - status_code, - f"Request failed with status {status_code}", - request_id=request_id, - ) - raise OMOPHubError( - f"Invalid JSON response: {content[:200].decode(errors='replace')}" - ) from exc - - # Handle error responses - if status_code >= 400: - error_response: ErrorResponse = data # type: ignore[assignment] - error = error_response.get("error", {}) - message = error.get("message", f"Request failed with status {status_code}") - error_code = error.get("code") - details = error.get("details") - - # Check for rate limit retry-after - retry_after = None - if status_code == 429: - retry_after_header = headers.get("Retry-After") or headers.get( - "retry-after" - ) - if retry_after_header: - with contextlib.suppress(ValueError): - retry_after = int(retry_after_header) - - raise_for_status( - status_code, - message, - request_id=request_id, - error_code=error_code, - details=details, - retry_after=retry_after, - ) - - # Return successful response data + """Parse API response, raise on errors, return the ``data`` field.""" + data = _parse_and_raise(content, status_code, headers) response: APIResponse = data # type: ignore[assignment] return response.get("data", data) @@ -103,55 +121,8 @@ def _parse_response_raw( status_code: int, headers: Mapping[str, str], ) -> dict[str, Any]: - """Parse API response and return full response dict with meta. - - Unlike _parse_response which extracts just the 'data' field, - this method returns the complete response including 'meta' for pagination. - """ - request_id = headers.get("X-Request-Id") or headers.get("x-request-id") - - try: - data = json.loads(content) if content else {} - except json.JSONDecodeError as exc: - if status_code >= 400: - raise_for_status( - status_code, - f"Request failed with status {status_code}", - request_id=request_id, - ) - raise OMOPHubError( - f"Invalid JSON response: {content[:200].decode(errors='replace')}" - ) from exc - - # Handle error responses - if status_code >= 400: - error_response: ErrorResponse = data # type: ignore[assignment] - error = error_response.get("error", {}) - message = error.get("message", f"Request failed with status {status_code}") - error_code = error.get("code") - details = error.get("details") - - # Check for rate limit retry-after - retry_after = None - if status_code == 429: - retry_after_header = headers.get("Retry-After") or headers.get( - "retry-after" - ) - if retry_after_header: - with contextlib.suppress(ValueError): - retry_after = int(retry_after_header) - - raise_for_status( - status_code, - message, - request_id=request_id, - error_code=error_code, - details=details, - retry_after=retry_after, - ) - - # Return full response dict (includes 'data' and 'meta') - return data + """Parse API response, raise on errors, return the full dict with ``meta``.""" + return _parse_and_raise(content, status_code, headers) def get( self, @@ -238,50 +209,8 @@ def _parse_response( status_code: int, headers: Mapping[str, str], ) -> T: - """Parse API response and handle errors.""" - request_id = headers.get("X-Request-Id") or headers.get("x-request-id") - - try: - data = json.loads(content) if content else {} - except json.JSONDecodeError as exc: - if status_code >= 400: - raise_for_status( - status_code, - f"Request failed with status {status_code}", - request_id=request_id, - ) - raise OMOPHubError( - f"Invalid JSON response: {content[:200].decode(errors='replace')}" - ) from exc - - # Handle error responses - if status_code >= 400: - error_response: ErrorResponse = data # type: ignore[assignment] - error = error_response.get("error", {}) - message = error.get("message", f"Request failed with status {status_code}") - error_code = error.get("code") - details = error.get("details") - - # Check for rate limit retry-after - retry_after = None - if status_code == 429: - retry_after_header = headers.get("Retry-After") or headers.get( - "retry-after" - ) - if retry_after_header: - with contextlib.suppress(ValueError): - retry_after = int(retry_after_header) - - raise_for_status( - status_code, - message, - request_id=request_id, - error_code=error_code, - details=details, - retry_after=retry_after, - ) - - # Return successful response data + """Parse API response, raise on errors, return the ``data`` field.""" + data = _parse_and_raise(content, status_code, headers) response: APIResponse = data # type: ignore[assignment] return response.get("data", data) @@ -291,55 +220,8 @@ def _parse_response_raw( status_code: int, headers: Mapping[str, str], ) -> dict[str, Any]: - """Parse API response and return full response dict with meta. - - Unlike _parse_response which extracts just the 'data' field, - this method returns the complete response including 'meta' for pagination. - """ - request_id = headers.get("X-Request-Id") or headers.get("x-request-id") - - try: - data = json.loads(content) if content else {} - except json.JSONDecodeError as exc: - if status_code >= 400: - raise_for_status( - status_code, - f"Request failed with status {status_code}", - request_id=request_id, - ) - raise OMOPHubError( - f"Invalid JSON response: {content[:200].decode(errors='replace')}" - ) from exc - - # Handle error responses - if status_code >= 400: - error_response: ErrorResponse = data # type: ignore[assignment] - error = error_response.get("error", {}) - message = error.get("message", f"Request failed with status {status_code}") - error_code = error.get("code") - details = error.get("details") - - # Check for rate limit retry-after - retry_after = None - if status_code == 429: - retry_after_header = headers.get("Retry-After") or headers.get( - "retry-after" - ) - if retry_after_header: - with contextlib.suppress(ValueError): - retry_after = int(retry_after_header) - - raise_for_status( - status_code, - message, - request_id=request_id, - error_code=error_code, - details=details, - retry_after=retry_after, - ) - - # Return full response dict (includes 'data' and 'meta') - return data + """Parse API response, raise on errors, return the full dict with ``meta``.""" + return _parse_and_raise(content, status_code, headers) async def get( self, diff --git a/src/omophub/resources/fhir.py b/src/omophub/resources/fhir.py new file mode 100644 index 0000000..5f8560e --- /dev/null +++ b/src/omophub/resources/fhir.py @@ -0,0 +1,272 @@ +"""FHIR Resolver resource implementation.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .._request import AsyncRequest, Request + from ..types.fhir import ( + FhirBatchResult, + FhirCodeableConceptResult, + FhirResolveResult, + ) + + +def _build_resolve_body( + *, + system: str | None = None, + code: str | None = None, + display: str | None = None, + vocabulary_id: str | None = None, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, +) -> dict[str, Any]: + body: dict[str, Any] = {} + if system is not None: + body["system"] = system + if code is not None: + body["code"] = code + if display is not None: + body["display"] = display + if vocabulary_id is not None: + body["vocabulary_id"] = vocabulary_id + if resource_type is not None: + body["resource_type"] = resource_type + if include_recommendations: + body["include_recommendations"] = True + body["recommendations_limit"] = recommendations_limit + if include_quality: + body["include_quality"] = True + return body + + +class Fhir: + """Synchronous FHIR resolver resource. + + Provides access to the FHIR-to-OMOP Concept Resolver endpoints that + translate FHIR coded values into OMOP standard concepts, CDM target + tables, and optional Phoebe recommendations. + + Example: + >>> result = client.fhir.resolve( + ... system="http://snomed.info/sct", + ... code="44054006", + ... resource_type="Condition", + ... ) + >>> print(result["resolution"]["target_table"]) + "condition_occurrence" + """ + + def __init__(self, request: Request[Any]) -> None: + self._request = request + + def resolve( + self, + *, + system: str | None = None, + code: str | None = None, + display: str | None = None, + vocabulary_id: str | None = None, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirResolveResult: + """Resolve a single FHIR Coding to an OMOP standard concept. + + Provide at least one of (``system`` + ``code``), + (``vocabulary_id`` + ``code``), or ``display``. + + Args: + system: FHIR code system URI (e.g. ``http://snomed.info/sct``) + code: Code value from the FHIR Coding + display: Human-readable text (semantic search fallback) + vocabulary_id: Direct OMOP vocabulary_id, bypasses URI resolution + resource_type: FHIR resource type for domain alignment check + include_recommendations: Include Phoebe recommendations + recommendations_limit: Max recommendations to return (1-20) + include_quality: Include mapping quality signal + + Returns: + Resolution result with source concept, standard concept, + target CDM table, and optional enrichments. + """ + body = _build_resolve_body( + system=system, + code=code, + display=display, + vocabulary_id=vocabulary_id, + resource_type=resource_type, + include_recommendations=include_recommendations, + recommendations_limit=recommendations_limit, + include_quality=include_quality, + ) + return self._request.post("/fhir/resolve", json_data=body) + + def resolve_batch( + self, + codings: list[dict[str, str | None]], + *, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirBatchResult: + """Batch-resolve up to 100 FHIR Codings. + + Failed items are reported inline without failing the batch. + + Args: + codings: List of coding dicts, each with optional keys + ``system``, ``code``, ``display``, ``vocabulary_id``. + resource_type: FHIR resource type applied to all codings + include_recommendations: Include Phoebe recommendations + recommendations_limit: Max recommendations per item (1-20) + include_quality: Include mapping quality signal + + Returns: + Batch result with per-item results and a summary. + """ + body: dict[str, Any] = {"codings": codings} + if resource_type is not None: + body["resource_type"] = resource_type + if include_recommendations: + body["include_recommendations"] = True + body["recommendations_limit"] = recommendations_limit + if include_quality: + body["include_quality"] = True + return self._request.post("/fhir/resolve/batch", json_data=body) + + def resolve_codeable_concept( + self, + coding: list[dict[str, str]], + *, + text: str | None = None, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirCodeableConceptResult: + """Resolve a FHIR CodeableConcept with vocabulary preference. + + Picks the best match across multiple codings using the OHDSI + vocabulary preference order (SNOMED > RxNorm > LOINC > CVX > + ICD-10). Falls back to ``text`` via semantic search if no + coding resolves. + + Args: + coding: List of structured codings, each with ``system``, + ``code``, and optional ``display``. + text: CodeableConcept.text for semantic search fallback + resource_type: FHIR resource type for domain alignment + include_recommendations: Include Phoebe recommendations + recommendations_limit: Max recommendations (1-20) + include_quality: Include mapping quality signal + + Returns: + Result with ``best_match``, ``alternatives``, and + ``unresolved`` lists. + """ + body: dict[str, Any] = {"coding": coding} + if text is not None: + body["text"] = text + if resource_type is not None: + body["resource_type"] = resource_type + if include_recommendations: + body["include_recommendations"] = True + body["recommendations_limit"] = recommendations_limit + if include_quality: + body["include_quality"] = True + return self._request.post("/fhir/resolve/codeable-concept", json_data=body) + + +class AsyncFhir: + """Asynchronous FHIR resolver resource. + + Async counterpart of :class:`Fhir`. All methods are coroutines. + """ + + def __init__(self, request: AsyncRequest[Any]) -> None: + self._request = request + + async def resolve( + self, + *, + system: str | None = None, + code: str | None = None, + display: str | None = None, + vocabulary_id: str | None = None, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirResolveResult: + """Resolve a single FHIR Coding to an OMOP standard concept. + + See :meth:`Fhir.resolve` for full documentation. + """ + body = _build_resolve_body( + system=system, + code=code, + display=display, + vocabulary_id=vocabulary_id, + resource_type=resource_type, + include_recommendations=include_recommendations, + recommendations_limit=recommendations_limit, + include_quality=include_quality, + ) + return await self._request.post("/fhir/resolve", json_data=body) + + async def resolve_batch( + self, + codings: list[dict[str, str | None]], + *, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirBatchResult: + """Batch-resolve up to 100 FHIR Codings. + + See :meth:`Fhir.resolve_batch` for full documentation. + """ + body: dict[str, Any] = {"codings": codings} + if resource_type is not None: + body["resource_type"] = resource_type + if include_recommendations: + body["include_recommendations"] = True + body["recommendations_limit"] = recommendations_limit + if include_quality: + body["include_quality"] = True + return await self._request.post("/fhir/resolve/batch", json_data=body) + + async def resolve_codeable_concept( + self, + coding: list[dict[str, str]], + *, + text: str | None = None, + resource_type: str | None = None, + include_recommendations: bool = False, + recommendations_limit: int = 5, + include_quality: bool = False, + ) -> FhirCodeableConceptResult: + """Resolve a FHIR CodeableConcept with vocabulary preference. + + See :meth:`Fhir.resolve_codeable_concept` for full documentation. + """ + body: dict[str, Any] = {"coding": coding} + if text is not None: + body["text"] = text + if resource_type is not None: + body["resource_type"] = resource_type + if include_recommendations: + body["include_recommendations"] = True + body["recommendations_limit"] = recommendations_limit + if include_quality: + body["include_quality"] = True + return await self._request.post( + "/fhir/resolve/codeable-concept", json_data=body + ) diff --git a/src/omophub/resources/search.py b/src/omophub/resources/search.py index 3a03744..e63eefb 100644 --- a/src/omophub/resources/search.py +++ b/src/omophub/resources/search.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, Literal, TypedDict -from .._pagination import DEFAULT_PAGE_SIZE, paginate_sync +from .._pagination import DEFAULT_PAGE_SIZE, paginate_async, paginate_sync if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterator @@ -670,13 +670,14 @@ async def semantic_iter( page_size: int = DEFAULT_PAGE_SIZE, ) -> AsyncIterator[SemanticSearchResult]: """Iterate through all semantic search results with auto-pagination.""" - page = 1 - while True: + async def fetch_page( + page: int, size: int + ) -> tuple[list[SemanticSearchResult], PaginationMeta | None]: params: dict[str, Any] = { "query": query, "page": page, - "page_size": page_size, + "page_size": size, } if vocabulary_ids: params["vocabulary_ids"] = ",".join(vocabulary_ids) @@ -694,18 +695,13 @@ async def semantic_iter( ) data = result.get("data", []) - results: list[SemanticSearchResult] = ( - data.get("results", data) if isinstance(data, dict) else data - ) - meta: PaginationMeta | None = result.get("meta", {}).get("pagination") - - for item in results: - yield item - - if meta is None or not meta.get("has_next", False): - break + results = data.get("results", data) if isinstance(data, dict) else data + meta = result.get("meta", {}).get("pagination") + return results, meta - page += 1 + item: SemanticSearchResult + async for item in paginate_async(fetch_page, page_size): + yield item async def bulk_basic( self, diff --git a/src/omophub/types/__init__.py b/src/omophub/types/__init__.py index 1ffe8ed..7b0842c 100644 --- a/src/omophub/types/__init__.py +++ b/src/omophub/types/__init__.py @@ -16,6 +16,15 @@ Synonym, ) from .domain import Domain, DomainCategory, DomainStats, DomainSummary +from .fhir import ( + FhirBatchResult, + FhirBatchSummary, + FhirCodeableConceptResult, + FhirResolution, + FhirResolveResult, + RecommendedConceptOutput, + ResolvedConcept, +) from .hierarchy import ( Ancestor, Descendant, @@ -82,6 +91,11 @@ "DomainSummary", "ErrorDetail", "ErrorResponse", + "FhirBatchResult", + "FhirBatchSummary", + "FhirCodeableConceptResult", + "FhirResolution", + "FhirResolveResult", "HierarchyPath", "HierarchySummary", "Mapping", @@ -91,10 +105,12 @@ "PaginationMeta", "PaginationParams", "QueryEnhancement", + "RecommendedConceptOutput", "RelatedConcept", "Relationship", "RelationshipSummary", "RelationshipType", + "ResolvedConcept", "ResponseMeta", "SearchFacet", "SearchFacets", diff --git a/src/omophub/types/fhir.py b/src/omophub/types/fhir.py new file mode 100644 index 0000000..26f60c6 --- /dev/null +++ b/src/omophub/types/fhir.py @@ -0,0 +1,80 @@ +"""FHIR Resolver type definitions.""" + +from __future__ import annotations + +from typing import Any, TypedDict + +from typing_extensions import NotRequired + + +class ResolvedConcept(TypedDict): + """Concept shape returned by the FHIR resolver.""" + + concept_id: int + concept_name: str + concept_code: str + vocabulary_id: str + domain_id: str + concept_class_id: str + standard_concept: str | None + + +class RecommendedConceptOutput(TypedDict): + """Phoebe recommendation returned when include_recommendations is true.""" + + concept_id: int + concept_name: str + vocabulary_id: str + domain_id: str + concept_class_id: str + standard_concept: str | None + relationship_id: str + + +class FhirResolution(TypedDict): + """The ``resolution`` block inside a single-resolve response.""" + + vocabulary_id: str | None + source_concept: ResolvedConcept + standard_concept: ResolvedConcept + mapping_type: str + target_table: str | None + domain_resource_alignment: str + relationship_id: NotRequired[str] + similarity_score: NotRequired[float] + alignment_note: NotRequired[str] + mapping_quality: NotRequired[str] + quality_note: NotRequired[str] + alternative_standard_concepts: NotRequired[list[ResolvedConcept]] + recommendations: NotRequired[list[RecommendedConceptOutput]] + + +class FhirResolveResult(TypedDict): + """Response from ``POST /v1/fhir/resolve``.""" + + input: dict[str, Any] + resolution: FhirResolution + + +class FhirBatchSummary(TypedDict): + """Summary block inside a batch-resolve response.""" + + total: int + resolved: int + failed: int + + +class FhirBatchResult(TypedDict): + """Response from ``POST /v1/fhir/resolve/batch``.""" + + results: list[dict[str, Any]] + summary: FhirBatchSummary + + +class FhirCodeableConceptResult(TypedDict): + """Response from ``POST /v1/fhir/resolve/codeable-concept``.""" + + input: dict[str, Any] + best_match: FhirResolveResult | None + alternatives: list[FhirResolveResult] + unresolved: list[dict[str, Any]] diff --git a/tests/integration/test_fhir.py b/tests/integration/test_fhir.py new file mode 100644 index 0000000..100d0e6 --- /dev/null +++ b/tests/integration/test_fhir.py @@ -0,0 +1,84 @@ +"""Integration tests for the FHIR resolver resource.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from omophub import OMOPHub + + +@pytest.mark.integration +class TestFhirIntegration: + """Integration tests for FHIR resolver against the production API.""" + + def test_resolve_snomed_live(self, integration_client: OMOPHub) -> None: + """Resolve SNOMED 44054006 (Type 2 diabetes) to OMOP concept.""" + result = integration_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + resource_type="Condition", + ) + + res = result["resolution"] + assert res["mapping_type"] == "direct" + assert res["target_table"] == "condition_occurrence" + assert res["standard_concept"]["vocabulary_id"] == "SNOMED" + assert res["standard_concept"]["standard_concept"] == "S" + assert res["domain_resource_alignment"] == "aligned" + + def test_resolve_icd10cm_live(self, integration_client: OMOPHub) -> None: + """Resolve ICD-10-CM E11.9 — non-standard, should traverse Maps to.""" + result = integration_client.fhir.resolve( + system="http://hl7.org/fhir/sid/icd-10-cm", + code="E11.9", + ) + + res = result["resolution"] + assert res["vocabulary_id"] == "ICD10CM" + assert res["source_concept"]["vocabulary_id"] == "ICD10CM" + assert res["standard_concept"]["standard_concept"] == "S" + assert res["target_table"] == "condition_occurrence" + + def test_resolve_batch_live(self, integration_client: OMOPHub) -> None: + """Batch resolve 3 mixed codings.""" + result = integration_client.fhir.resolve_batch( + [ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://loinc.org", "code": "2339-0"}, + {"system": "http://www.nlm.nih.gov/research/umls/rxnorm", "code": "197696"}, + ] + ) + + assert result["summary"]["total"] == 3 + assert result["summary"]["resolved"] + result["summary"]["failed"] == 3 + assert len(result["results"]) == 3 + + def test_resolve_codeable_concept_live(self, integration_client: OMOPHub) -> None: + """CodeableConcept: SNOMED should win over ICD-10-CM.""" + result = integration_client.fhir.resolve_codeable_concept( + coding=[ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + ], + resource_type="Condition", + ) + + assert result["best_match"] is not None + best = result["best_match"]["resolution"] + assert best["source_concept"]["vocabulary_id"] == "SNOMED" + assert best["target_table"] == "condition_occurrence" + + def test_resolve_with_quality_live(self, integration_client: OMOPHub) -> None: + """Mapping quality signal is returned when requested.""" + result = integration_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + include_quality=True, + ) + + res = result["resolution"] + assert "mapping_quality" in res + assert res["mapping_quality"] in ("high", "medium", "low", "manual_review") diff --git a/tests/unit/resources/test_fhir.py b/tests/unit/resources/test_fhir.py new file mode 100644 index 0000000..17aa2dc --- /dev/null +++ b/tests/unit/resources/test_fhir.py @@ -0,0 +1,575 @@ +"""Tests for the FHIR resolver resource.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +import respx +from httpx import Response + +if TYPE_CHECKING: + from omophub import AsyncOMOPHub, OMOPHub + + +# -- Fixtures ---------------------------------------------------------------- + +SNOMED_RESOLVE_RESPONSE = { + "success": True, + "data": { + "input": { + "system": "http://snomed.info/sct", + "code": "44054006", + "resource_type": "Condition", + }, + "resolution": { + "vocabulary_id": "SNOMED", + "source_concept": { + "concept_id": 201826, + "concept_name": "Type 2 diabetes mellitus", + "concept_code": "44054006", + "vocabulary_id": "SNOMED", + "domain_id": "Condition", + "concept_class_id": "Clinical Finding", + "standard_concept": "S", + }, + "standard_concept": { + "concept_id": 201826, + "concept_name": "Type 2 diabetes mellitus", + "concept_code": "44054006", + "vocabulary_id": "SNOMED", + "domain_id": "Condition", + "concept_class_id": "Clinical Finding", + "standard_concept": "S", + }, + "mapping_type": "direct", + "target_table": "condition_occurrence", + "domain_resource_alignment": "aligned", + }, + }, + "meta": {"request_id": "test", "timestamp": "2026-04-10T00:00:00Z", "vocab_release": "2025.2"}, +} + +ICD10_MAPPED_RESPONSE = { + "success": True, + "data": { + "input": {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + "resolution": { + "vocabulary_id": "ICD10CM", + "source_concept": { + "concept_id": 45576876, + "concept_name": "Type 2 diabetes mellitus without complications", + "concept_code": "E11.9", + "vocabulary_id": "ICD10CM", + "domain_id": "Condition", + "concept_class_id": "5-char billing code", + "standard_concept": None, + }, + "standard_concept": { + "concept_id": 201826, + "concept_name": "Type 2 diabetes mellitus", + "concept_code": "44054006", + "vocabulary_id": "SNOMED", + "domain_id": "Condition", + "concept_class_id": "Clinical Finding", + "standard_concept": "S", + }, + "mapping_type": "mapped", + "relationship_id": "Maps to", + "target_table": "condition_occurrence", + "domain_resource_alignment": "not_checked", + "mapping_quality": "high", + }, + }, +} + +BATCH_RESPONSE = { + "success": True, + "data": { + "results": [SNOMED_RESOLVE_RESPONSE["data"]], + "summary": {"total": 1, "resolved": 1, "failed": 0}, + }, +} + +CODEABLE_CONCEPT_RESPONSE = { + "success": True, + "data": { + "input": { + "coding": [ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + ], + "resource_type": "Condition", + }, + "best_match": SNOMED_RESOLVE_RESPONSE["data"], + "alternatives": [ICD10_MAPPED_RESPONSE["data"]], + "unresolved": [], + }, +} + + +# -- Sync tests -------------------------------------------------------------- + + +class TestFhirSync: + """Tests for the synchronous Fhir resource.""" + + @respx.mock + def test_resolve_snomed_direct(self, sync_client: OMOPHub, base_url: str) -> None: + """SNOMED direct resolution returns correct shape.""" + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=SNOMED_RESOLVE_RESPONSE) + ) + + result = sync_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + resource_type="Condition", + ) + + assert result["resolution"]["mapping_type"] == "direct" + assert result["resolution"]["target_table"] == "condition_occurrence" + assert result["resolution"]["standard_concept"]["concept_id"] == 201826 + + @respx.mock + def test_resolve_icd10_mapped(self, sync_client: OMOPHub, base_url: str) -> None: + """ICD-10-CM maps to a standard SNOMED concept.""" + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=ICD10_MAPPED_RESPONSE) + ) + + result = sync_client.fhir.resolve( + system="http://hl7.org/fhir/sid/icd-10-cm", + code="E11.9", + include_quality=True, + ) + + assert result["resolution"]["mapping_type"] == "mapped" + assert result["resolution"]["relationship_id"] == "Maps to" + assert result["resolution"]["mapping_quality"] == "high" + + @respx.mock + def test_resolve_text_only(self, sync_client: OMOPHub, base_url: str) -> None: + """Display-only input triggers semantic search fallback.""" + semantic_response = { + "success": True, + "data": { + "input": {"display": "Blood Sugar", "resource_type": "Observation"}, + "resolution": { + "vocabulary_id": None, + "source_concept": { + "concept_id": 3004501, + "concept_name": "Glucose [Mass/volume] in Blood", + "concept_code": "2339-0", + "vocabulary_id": "LOINC", + "domain_id": "Measurement", + "concept_class_id": "Lab Test", + "standard_concept": "S", + }, + "standard_concept": { + "concept_id": 3004501, + "concept_name": "Glucose [Mass/volume] in Blood", + "concept_code": "2339-0", + "vocabulary_id": "LOINC", + "domain_id": "Measurement", + "concept_class_id": "Lab Test", + "standard_concept": "S", + }, + "mapping_type": "semantic_match", + "similarity_score": 0.91, + "target_table": "measurement", + "domain_resource_alignment": "aligned", + }, + }, + } + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=semantic_response) + ) + + result = sync_client.fhir.resolve(display="Blood Sugar", resource_type="Observation") + + assert result["resolution"]["mapping_type"] == "semantic_match" + assert result["resolution"]["similarity_score"] == 0.91 + + @respx.mock + def test_resolve_with_recommendations(self, sync_client: OMOPHub, base_url: str) -> None: + """Recommendations are included when requested.""" + recs_response = {**SNOMED_RESOLVE_RESPONSE} + recs_response["data"] = { + **SNOMED_RESOLVE_RESPONSE["data"], + "resolution": { + **SNOMED_RESOLVE_RESPONSE["data"]["resolution"], + "recommendations": [ + { + "concept_id": 4193704, + "concept_name": "Hyperglycemia", + "vocabulary_id": "SNOMED", + "domain_id": "Condition", + "concept_class_id": "Clinical Finding", + "standard_concept": "S", + "relationship_id": "Has finding", + } + ], + }, + } + route = respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=recs_response) + ) + + result = sync_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + include_recommendations=True, + recommendations_limit=3, + ) + + assert len(result["resolution"]["recommendations"]) == 1 + # Verify the request body included the flags + import json + + body = json.loads(route.calls[0].request.content) + assert body["include_recommendations"] is True + assert body["recommendations_limit"] == 3 + + @respx.mock + def test_resolve_unknown_system_400(self, sync_client: OMOPHub, base_url: str) -> None: + """Unknown URI raises an API error.""" + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response( + 400, + json={ + "success": False, + "error": { + "code": "unknown_system", + "message": "Unknown FHIR code system URI", + "details": {"suggestion": "http://snomed.info/sct"}, + }, + }, + ) + ) + + with pytest.raises(Exception): + sync_client.fhir.resolve(system="http://snomed.info/sc", code="44054006") + + @respx.mock + def test_resolve_cpt4_403(self, sync_client: OMOPHub, base_url: str) -> None: + """CPT4 raises a 403 restricted error.""" + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response( + 403, + json={ + "success": False, + "error": { + "code": "vocabulary_restricted", + "message": "CPT4 is excluded", + }, + }, + ) + ) + + with pytest.raises(Exception): + sync_client.fhir.resolve(system="http://www.ama-assn.org/go/cpt", code="99213") + + @respx.mock + def test_resolve_batch(self, sync_client: OMOPHub, base_url: str) -> None: + """Batch resolution returns results and summary.""" + respx.post(f"{base_url}/fhir/resolve/batch").mock( + return_value=Response(200, json=BATCH_RESPONSE) + ) + + result = sync_client.fhir.resolve_batch( + [{"system": "http://snomed.info/sct", "code": "44054006"}] + ) + + assert result["summary"]["total"] == 1 + assert result["summary"]["resolved"] == 1 + assert len(result["results"]) == 1 + + @respx.mock + def test_resolve_codeable_concept(self, sync_client: OMOPHub, base_url: str) -> None: + """CodeableConcept resolution returns best_match and alternatives.""" + respx.post(f"{base_url}/fhir/resolve/codeable-concept").mock( + return_value=Response(200, json=CODEABLE_CONCEPT_RESPONSE) + ) + + result = sync_client.fhir.resolve_codeable_concept( + coding=[ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + ], + resource_type="Condition", + ) + + assert result["best_match"] is not None + assert result["best_match"]["resolution"]["source_concept"]["vocabulary_id"] == "SNOMED" + assert len(result["alternatives"]) == 1 + + @respx.mock + def test_resolve_batch_with_all_options(self, sync_client: OMOPHub, base_url: str) -> None: + """Batch passes resource_type, include_recommendations, and include_quality.""" + route = respx.post(f"{base_url}/fhir/resolve/batch").mock( + return_value=Response(200, json=BATCH_RESPONSE) + ) + + sync_client.fhir.resolve_batch( + [{"system": "http://snomed.info/sct", "code": "44054006"}], + resource_type="Condition", + include_recommendations=True, + recommendations_limit=3, + include_quality=True, + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert body["resource_type"] == "Condition" + assert body["include_recommendations"] is True + assert body["recommendations_limit"] == 3 + assert body["include_quality"] is True + + @respx.mock + def test_resolve_codeable_concept_with_all_options( + self, sync_client: OMOPHub, base_url: str + ) -> None: + """CodeableConcept passes text, resource_type, and enrichment flags.""" + route = respx.post(f"{base_url}/fhir/resolve/codeable-concept").mock( + return_value=Response(200, json=CODEABLE_CONCEPT_RESPONSE) + ) + + sync_client.fhir.resolve_codeable_concept( + coding=[{"system": "http://snomed.info/sct", "code": "44054006"}], + text="Type 2 diabetes", + resource_type="Condition", + include_recommendations=True, + recommendations_limit=5, + include_quality=True, + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert body["text"] == "Type 2 diabetes" + assert body["resource_type"] == "Condition" + assert body["include_recommendations"] is True + assert body["include_quality"] is True + + @respx.mock + def test_resolve_codeable_concept_minimal(self, sync_client: OMOPHub, base_url: str) -> None: + """CodeableConcept with no optional flags (covers False branches).""" + route = respx.post(f"{base_url}/fhir/resolve/codeable-concept").mock( + return_value=Response(200, json=CODEABLE_CONCEPT_RESPONSE) + ) + + sync_client.fhir.resolve_codeable_concept( + coding=[{"system": "http://snomed.info/sct", "code": "44054006"}], + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert "text" not in body + assert "resource_type" not in body + assert "include_recommendations" not in body + assert "include_quality" not in body + + @respx.mock + def test_resolve_sends_correct_body(self, sync_client: OMOPHub, base_url: str) -> None: + """Verify the POST body includes only non-None parameters.""" + route = respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=SNOMED_RESOLVE_RESPONSE) + ) + + sync_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + resource_type="Condition", + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert body == { + "system": "http://snomed.info/sct", + "code": "44054006", + "resource_type": "Condition", + } + # Ensure optional flags are NOT sent when they're default False + assert "include_recommendations" not in body + assert "include_quality" not in body + + +# -- Async tests ------------------------------------------------------------- + + +class TestFhirAsync: + """Tests for the asynchronous AsyncFhir resource.""" + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve(self, async_client: AsyncOMOPHub, base_url: str) -> None: + """Async resolve returns the same shape as sync.""" + respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=SNOMED_RESOLVE_RESPONSE) + ) + + result = await async_client.fhir.resolve( + system="http://snomed.info/sct", + code="44054006", + ) + + assert result["resolution"]["mapping_type"] == "direct" + assert result["resolution"]["target_table"] == "condition_occurrence" + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_batch(self, async_client: AsyncOMOPHub, base_url: str) -> None: + """Async batch resolve returns results and summary.""" + respx.post(f"{base_url}/fhir/resolve/batch").mock( + return_value=Response(200, json=BATCH_RESPONSE) + ) + + result = await async_client.fhir.resolve_batch( + [{"system": "http://snomed.info/sct", "code": "44054006"}], + resource_type="Condition", + include_quality=True, + ) + + assert result["summary"]["total"] == 1 + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_codeable_concept( + self, async_client: AsyncOMOPHub, base_url: str + ) -> None: + """Async codeable concept resolve returns best_match.""" + respx.post(f"{base_url}/fhir/resolve/codeable-concept").mock( + return_value=Response(200, json=CODEABLE_CONCEPT_RESPONSE) + ) + + result = await async_client.fhir.resolve_codeable_concept( + coding=[ + {"system": "http://snomed.info/sct", "code": "44054006"}, + {"system": "http://hl7.org/fhir/sid/icd-10-cm", "code": "E11.9"}, + ], + text="Type 2 diabetes", + resource_type="Condition", + include_recommendations=True, + include_quality=True, + ) + + assert result["best_match"] is not None + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_vocabulary_id_bypass( + self, async_client: AsyncOMOPHub, base_url: str + ) -> None: + """Async resolve with vocabulary_id exercises the bypass branch.""" + route = respx.post(f"{base_url}/fhir/resolve").mock( + return_value=Response(200, json=ICD10_MAPPED_RESPONSE) + ) + + result = await async_client.fhir.resolve( + vocabulary_id="ICD10CM", + code="E11.9", + include_recommendations=True, + recommendations_limit=3, + include_quality=True, + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert body["vocabulary_id"] == "ICD10CM" + assert body["include_recommendations"] is True + assert body["recommendations_limit"] == 3 + assert body["include_quality"] is True + assert "resolution" in result + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_batch_all_flags( + self, async_client: AsyncOMOPHub, base_url: str + ) -> None: + """Async batch with include_recommendations exercises that branch.""" + route = respx.post(f"{base_url}/fhir/resolve/batch").mock( + return_value=Response(200, json=BATCH_RESPONSE) + ) + + await async_client.fhir.resolve_batch( + [{"system": "http://snomed.info/sct", "code": "44054006"}], + resource_type="Condition", + include_recommendations=True, + recommendations_limit=5, + include_quality=True, + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert body["include_recommendations"] is True + assert body["recommendations_limit"] == 5 + assert body["include_quality"] is True + + @respx.mock + @pytest.mark.anyio + async def test_async_fhir_property_cached(self, async_client: AsyncOMOPHub, base_url: str) -> None: + """Accessing client.fhir twice returns the same cached instance.""" + fhir1 = async_client.fhir + fhir2 = async_client.fhir + assert fhir1 is fhir2 + + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_codeable_minimal( + self, async_client: AsyncOMOPHub, base_url: str + ) -> None: + """Async codeable concept with no optional flags (covers False branches).""" + route = respx.post(f"{base_url}/fhir/resolve/codeable-concept").mock( + return_value=Response(200, json=CODEABLE_CONCEPT_RESPONSE) + ) + + await async_client.fhir.resolve_codeable_concept( + coding=[{"system": "http://snomed.info/sct", "code": "44054006"}], + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert "text" not in body + assert "resource_type" not in body + assert "include_recommendations" not in body + assert "include_quality" not in body + + @respx.mock + @pytest.mark.anyio + async def test_async_resolve_batch_minimal( + self, async_client: AsyncOMOPHub, base_url: str + ) -> None: + """Async batch with no optional flags (covers False branches).""" + route = respx.post(f"{base_url}/fhir/resolve/batch").mock( + return_value=Response(200, json=BATCH_RESPONSE) + ) + + await async_client.fhir.resolve_batch( + [{"system": "http://snomed.info/sct", "code": "44054006"}], + ) + + import json + + body = json.loads(route.calls[0].request.content) + assert "resource_type" not in body + assert "include_recommendations" not in body + assert "include_quality" not in body + + +class TestFhirPropertyCaching: + """Test lazy-property cache hit on both client types.""" + + @respx.mock + def test_sync_fhir_property_cached(self, sync_client: OMOPHub) -> None: + """Accessing client.fhir twice returns the same cached instance.""" + fhir1 = sync_client.fhir + fhir2 = sync_client.fhir + assert fhir1 is fhir2 diff --git a/tests/unit/test_pagination.py b/tests/unit/test_pagination.py index e213972..97413f6 100644 --- a/tests/unit/test_pagination.py +++ b/tests/unit/test_pagination.py @@ -199,12 +199,12 @@ async def fetch_page(page: int, page_size: int) -> tuple: assert result == items @pytest.mark.asyncio - async def test_sync_callable_fallback(self) -> None: - """Test paginate_async works with sync callables too.""" + async def test_async_callable_required(self) -> None: + """Test paginate_async requires an async callable (not sync).""" items = [{"id": 1}, {"id": 2}] - meta = {"page": 1, "has_next": False} + meta: dict[str, object] = {"page": 1, "has_next": False} - def fetch_page(page: int, page_size: int) -> tuple: + async def fetch_page(page: int, page_size: int) -> tuple: return items, meta result = [item async for item in paginate_async(fetch_page)]