From bd28d8d49cf8964bb31b03583b57589064b42725 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 4 Feb 2026 17:24:36 +0000 Subject: [PATCH 1/2] refactor(amgi-aiokafka): eagerly start producer and await send completion --- .../src/amgi_aiokafka/__init__.py | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py index 6c6ebbd..8f2a7e4 100644 --- a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py +++ b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py @@ -1,7 +1,6 @@ import asyncio import logging import sys -from asyncio import Lock from collections import deque from collections.abc import Awaitable from collections.abc import Callable @@ -119,18 +118,17 @@ async def __call__(self, event: AMGISendEvent) -> None: class MessageSend: def __init__(self, bootstrap_servers: str | list[str]) -> None: self._bootstrap_servers = bootstrap_servers - self._producer = None - self._producer_lock = Lock() async def __aenter__(self) -> Self: + self._producer = AIOKafkaProducer(bootstrap_servers=self._bootstrap_servers) + await self._producer.start() return self async def __call__(self, event: MessageSendEvent) -> None: - producer = await self._get_producer() encoded_headers = [(key.decode(), value) for key, value in event["headers"]] key = event.get("bindings", {}).get("kafka", {}).get("key") - await producer.send( + await self._producer.send_and_wait( event["address"], headers=encoded_headers, value=event.get("payload"), @@ -143,19 +141,7 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: - if self._producer is not None: - await self._producer.stop() - - async def _get_producer(self) -> AIOKafkaProducer: - if self._producer is None: - async with self._producer_lock: - if self._producer is None: - producer = AIOKafkaProducer( - bootstrap_servers=self._bootstrap_servers - ) - await producer.start() - self._producer = producer - return self._producer + await self._producer.stop() class Server: From a04c29b732f8c9379f682f8c2f5aa16e01746f9f Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Thu, 29 Jan 2026 20:06:31 +0000 Subject: [PATCH 2/2] feat: add kafka event source mapping support --- .../amgi-kafka-event-source-mapping/LICENSE | 21 + .../amgi-kafka-event-source-mapping/README.md | 96 ++++ .../pyproject.toml | 49 ++ .../__init__.py | 298 ++++++++++++ .../amgi_kafka_event_source_mapping/py.typed | 0 .../__init__.py | 0 ...test_kafka_event_source_mapping_handler.py | 436 ++++++++++++++++++ ...vent_source_mapping_handler_integration.py | 139 ++++++ tox.ini | 16 +- uv.lock | 36 ++ 10 files changed, 1089 insertions(+), 2 deletions(-) create mode 100644 packages/amgi-kafka-event-source-mapping/LICENSE create mode 100644 packages/amgi-kafka-event-source-mapping/README.md create mode 100644 packages/amgi-kafka-event-source-mapping/pyproject.toml create mode 100644 packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py create mode 100644 packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/py.typed create mode 100644 packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/__init__.py create mode 100644 packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py create mode 100644 packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py diff --git a/packages/amgi-kafka-event-source-mapping/LICENSE b/packages/amgi-kafka-event-source-mapping/LICENSE new file mode 100644 index 0000000..b0a6eef --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Jack Burridge + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/packages/amgi-kafka-event-source-mapping/README.md b/packages/amgi-kafka-event-source-mapping/README.md new file mode 100644 index 0000000..a1ad9f3 --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/README.md @@ -0,0 +1,96 @@ +# amgi-kafka-event-source-mapping + +amgi-kafka-event-source-mapping is an adaptor for [AMGI](https://amgi.readthedocs.io/en/latest/) applications to run in +a Kafka event source mapped environment. + +## Installation + +```bash +pip install amgi-kafka-event-source-mapping==0.32.0 +``` + +## Example + +This example uses [AsyncFast](https://pypi.org/project/asyncfast/): + +```python +from dataclasses import dataclass + +from amgi_kafka_event_source_mapping import KafkaEventSourceMappingHandler +from asyncfast import AsyncFast + +app = AsyncFast() + + +@dataclass +class Order: + item_ids: list[str] + + +@app.channel("orders") +async def orders(order: Order) -> None: + # Makes an order + ... + + +handler = KafkaEventSourceMappingHandler(app) +``` + +## What it does + +- Converts Kafka batch events into AMGI `message.receive` events +- Uses the Kafka topic name as the AMGI message address +- Supports partial batch failures so only failed records are reported +- Sends outbound messages to Kafka using an async producer +- Outbound messages are sent via the same Kafka broker (bootstrap servers) that the records were received from +- Optionally manages application startup and shutdown via AMGI lifespan + +## Record handling + +- Record values and keys are passed to your app as bytes +- Kafka record headers become AMGI headers +- Records are only acknowledged when your app emits `message.ack` +- Records that emit `message.nack` or are not acknowledged are treated as failures + +## Nack handling + +By default, records that are negatively acknowledged, or not acknowledged are logged: + +```python +handler = KafkaEventSourceMappingHandler(app, on_nack="log") +``` + +To fail the invocation when any record is nacked, configure the handler to raise an error instead: + +```python +handler = KafkaEventSourceMappingHandler(app, on_nack="error") +``` + +This is useful when running in environments where a failed invocation should trigger a retry, or alert. + +When using this mode, handlers **must be idempotent**. Kafka event source mappings may re-deliver records after +failures, restarts, or rebalances, and your application logic should be safe to execute more than once for the same +record. + +## Lifespan + +Lifespan support is enabled by default. + +- Startup runs once per Lambda execution environment +- Shutdown is attempted when the environment is terminated + +Shutdown handling relies on `signal.SIGTERM`, which is supported by Python 3.12 and later Lambda runtimes. + +To use fully stateless, per-invocation behavior, disable lifespan: + +```python +handler = KafkaEventSourceMappingHandler(app, lifespan=False) +``` + +## Contact + +For questions or suggestions, please contact [jack.burridge@mail.com](mailto:jack.burridge@mail.com). + +## License + +Copyright 2026 AMGI diff --git a/packages/amgi-kafka-event-source-mapping/pyproject.toml b/packages/amgi-kafka-event-source-mapping/pyproject.toml new file mode 100644 index 0000000..a303ab5 --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/pyproject.toml @@ -0,0 +1,49 @@ +[build-system] +build-backend = "uv_build" +requires = [ + "uv-build>=0.9.26,<0.10.0", +] + +[project] +name = "amgi-kafka-event-source-mapping" +version = "0.32.0" +description = "Kafka event source mapping adaptor for AMGI applications" +readme = "README.md" +license = "MIT" +license-files = [ + "LICENSE", +] +authors = [ + { name = "jack.burridge", email = "jack.burridge@mail.com" }, +] +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "amgi-aiokafka==0.32.0", + "typing-extensions>=4.15.0; python_full_version<'3.11'", + +] + +[dependency-groups] +dev = [ + "pytest>=8.4.1", + "pytest-asyncio>=1.3.0", + "pytest-cov>=7.0.0", + "pytest-timeout>=2.4.0", + "test-utils", + "testcontainers[kafka]>=4.13.0", + +] + +[tool.uv.sources.amgi-aiokafka] +workspace = true + +[tool.uv.sources.test-utils] +workspace = true diff --git a/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py b/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py new file mode 100644 index 0000000..c3cf6f5 --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py @@ -0,0 +1,298 @@ +import asyncio +import base64 +import itertools +import logging +import signal +import sys +from asyncio import Task +from collections import deque +from collections.abc import Awaitable +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Sequence +from contextlib import AsyncExitStack +from dataclasses import dataclass +from types import TracebackType +from typing import Any +from typing import AsyncContextManager +from typing import Literal + +from amgi_aiokafka import MessageSend as AioKafkaMessageSend +from amgi_common import Lifespan +from amgi_types import AMGIApplication +from amgi_types import AMGISendEvent +from amgi_types import MessageReceiveEvent +from amgi_types import MessageScope +from amgi_types import MessageSendEvent +from typing_extensions import NotRequired +from typing_extensions import TypedDict + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + + +_logger = logging.getLogger("amgi-kafka-event-source-mapping.error") + + +_MessageSendT = Callable[[MessageSendEvent], Awaitable[None]] +_MessageSendManagerT = AsyncContextManager[_MessageSendT] + + +_RecordHeaders = list[dict[str, list[int]]] + + +@dataclass +class _RecordNack: + topic: str + partition: int + offset: int + message: str + + def __str__(self) -> str: + return f"Failed to process record topic={self.topic}, partition={self.partition}, offset={self.offset}, message={self.message}" + + +class _KafkaRecord(TypedDict): + topic: str + partition: int + offset: int + timestamp: int + timestampType: str + key: NotRequired[str | None] + value: NotRequired[str | None] + headers: NotRequired[_RecordHeaders] + + +class _KafkaEventSourceMapping(TypedDict): + eventSource: str + eventSourceArn: NotRequired[str] + bootstrapServers: str + records: dict[str, list[_KafkaRecord]] + + +class _Send: + def __init__( + self, record_nacks: dict[str, _RecordNack], message_send: _MessageSendT + ) -> None: + self._message_send = message_send + self.record_nacks = record_nacks + + async def __call__(self, event: AMGISendEvent) -> None: + if event["type"] == "message.ack": + self.record_nacks.pop(event["id"]) + if event["type"] == "message.nack": + self.record_nacks[event["id"]].message = event["message"] + if event["type"] == "message.send": + await self._message_send(event) + + +def _encode_record_headers( + headers: _RecordHeaders, +) -> Iterable[tuple[bytes, bytes]]: + for header in headers: + for header_name, header_value in header.items(): + yield header_name.encode(), bytes(header_value) + + +def _record_id(message: _KafkaRecord) -> str: + topic = message["topic"] + partition = message["partition"] + offset = message["offset"] + + return f"{topic}:{partition}:{offset}" + + +class _Receive: + def __init__(self, records: Iterable[_KafkaRecord]) -> None: + self._deque = deque(records) + + async def __call__(self) -> MessageReceiveEvent: + message = self._deque.popleft() + headers = message.get("headers", []) + encoded_headers = list(_encode_record_headers(headers)) + + value = message.get("value") + key = message.get("key") + + record_id = _record_id(message) + + return { + "type": "message.receive", + "id": record_id, + "headers": encoded_headers, + "payload": None if value is None else base64.b64decode(value), + "bindings": { + "kafka": {"key": None if key is None else base64.b64decode(key)} + }, + "more_messages": len(self._deque) != 0, + } + + +class _MessageSender: + def __init__(self) -> None: + self._send_tasks: dict[str, Task[_MessageSendT]] = {} + self._async_exit_stack = AsyncExitStack() + + async def __aenter__(self) -> Self: + await self._async_exit_stack.__aenter__() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self._async_exit_stack.__aexit__(exc_type, exc_val, exc_tb) + + async def _create_sender(self, bootstrap_servers: list[str]) -> _MessageSendT: + return await self._async_exit_stack.enter_async_context( + AioKafkaMessageSend(bootstrap_servers=bootstrap_servers) + ) + + async def get_message_send(self, bootstrap_servers: str) -> _MessageSendT: + task = self._send_tasks.get(bootstrap_servers) + if task is None: + task = asyncio.create_task( + self._create_sender(bootstrap_servers.split(",")) + ) + self._send_tasks[bootstrap_servers] = task + + try: + sender = await task + return sender + except Exception: + if self._send_tasks.get(bootstrap_servers) is task: + del self._send_tasks[bootstrap_servers] + raise + + +class _MessageSendWrapper: + def __init__(self, message_send_manager: _MessageSendManagerT) -> None: + self._message_send_manager = message_send_manager + + async def __aenter__(self) -> Self: + self._message_send = await self._message_send_manager.__aenter__() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self._message_send_manager.__aexit__(exc_type, exc_val, exc_tb) + + async def get_message_send(self, bootstrap_servers: str) -> _MessageSendT: + return self._message_send + + +def _partition_records_topic(records: list[_KafkaRecord]) -> str: + topics = {record["topic"] for record in records} + assert len(topics) == 1, f"Mixed topics: {topics}" + return next(iter(topics)) + + +class NackError(Exception): + def __init__(self, nacks: Sequence[_RecordNack]): + self._nacks = nacks + + def __str__(self) -> str: + return "\n".join(str(nack) for nack in self._nacks) + + +class KafkaEventSourceMappingHandler: + def __init__( + self, + app: AMGIApplication, + lifespan: bool = True, + on_nack: Literal["log", "error"] = "log", + message_send: _MessageSendManagerT | None = None, + ) -> None: + self._app = app + self._on_nack = on_nack + self._lifespan = lifespan + self._loop = asyncio.new_event_loop() + self._message_send_manager = ( + _MessageSender() + if message_send is None + else _MessageSendWrapper(message_send) + ) + self._message_sender: _MessageSender | _MessageSendWrapper | None = None + self._lifespan_context: Lifespan | None = None + self._state: dict[str, Any] = {} + self._client_instantiated = False + + try: + self._loop.add_signal_handler(signal.SIGTERM, self._sigterm_handler) + except NotImplementedError: + # Windows / non-main thread: no signal handlers via asyncio + pass + + def __call__(self, event: _KafkaEventSourceMapping, context: Any) -> None: + return self._loop.run_until_complete(self._call(event)) + + async def _call(self, event: _KafkaEventSourceMapping) -> None: + if not self._lifespan_context and self._lifespan: + self._lifespan_context = Lifespan(self._app, self._state) + await self._lifespan_context.__aenter__() + if self._message_sender is None: + self._message_sender = await self._message_send_manager.__aenter__() + + record_nacks = await asyncio.gather( + *( + self._call_source_batch( + event["bootstrapServers"], + _partition_records_topic(records), + records, + self._message_sender, + ) + for records in event["records"].values() + ) + ) + + all_nacks = tuple(itertools.chain.from_iterable(record_nacks)) + if self._on_nack == "error" and all_nacks: + raise NackError(all_nacks) + for nack in all_nacks: + _logger.error(str(nack)) + + async def _call_source_batch( + self, + bootstrap_servers: str, + topic: str, + records: Iterable[_KafkaRecord], + message_sender: _MessageSender | _MessageSendWrapper, + ) -> Iterable[_RecordNack]: + scope: MessageScope = { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": topic, + "state": self._state.copy(), + "extensions": {"message.ack.out_of_order": {}}, + } + message_send = await message_sender.get_message_send(bootstrap_servers) + record_nacks = { + _record_id(record): _RecordNack( + record["topic"], + record["partition"], + record["offset"], + "Ack not received", + ) + for record in records + } + send = _Send(record_nacks, message_send) + await self._app(scope, _Receive(records), send) + + return send.record_nacks.values() + + def _sigterm_handler(self) -> None: + self._loop.run_until_complete(self._shutdown()) + + async def _shutdown(self) -> None: + if self._lifespan_context: + await self._lifespan_context.__aexit__(None, None, None) + if self._message_sender: + await self._message_send_manager.__aexit__(None, None, None) diff --git a/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/py.typed b/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/__init__.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py new file mode 100644 index 0000000..cb6df7c --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py @@ -0,0 +1,436 @@ +import asyncio +import logging +from queue import Queue +from unittest.mock import AsyncMock +from unittest.mock import Mock +from uuid import uuid4 + +import pytest +from amgi_kafka_event_source_mapping import KafkaEventSourceMappingHandler +from amgi_kafka_event_source_mapping import NackError +from amgi_types import AMGIReceiveCallable +from amgi_types import AMGISendCallable +from amgi_types import Scope +from test_utils import MockApp + + +async def test_kafka_event_source_mapping_handler_records() -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, lifespan=False, message_send=AsyncMock() + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {}, + "extensions": {"message.ack.out_of_order": {}}, + } + + assert await receive() == { + "bindings": {"kafka": {"key": b"key"}}, + "headers": [(b"headerKey", b"headerValue")], + "id": "mytopic:0:15", + "more_messages": False, + "payload": b"Hello, this is a test.", + "type": "message.receive", + } + await send( + { + "type": "message.ack", + "id": "mytopic:0:15", + } + ) + + await call_task + + +async def test_kafka_event_source_mapping_handler_error_nack() -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, on_nack="error", lifespan=False, message_send=AsyncMock() + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {}, + "extensions": {"message.ack.out_of_order": {}}, + } + + assert await receive() == { + "bindings": {"kafka": {"key": b"key"}}, + "headers": [(b"headerKey", b"headerValue")], + "id": "mytopic:0:15", + "more_messages": False, + "payload": b"Hello, this is a test.", + "type": "message.receive", + } + await send( + { + "type": "message.nack", + "id": "mytopic:0:15", + "message": "Failed to process record", + } + ) + + with pytest.raises(NackError, match="Failed to process record"): + await call_task + + +async def test_kafka_event_source_mapping_handler_log_nack( + caplog: pytest.LogCaptureFixture, +) -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, on_nack="log", lifespan=False, message_send=AsyncMock() + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + with caplog.at_level(logging.ERROR, logger="amgi-kafka-event-source-mapping.error"): + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {}, + "extensions": {"message.ack.out_of_order": {}}, + } + + assert await receive() == { + "bindings": {"kafka": {"key": b"key"}}, + "headers": [(b"headerKey", b"headerValue")], + "id": "mytopic:0:15", + "more_messages": False, + "payload": b"Hello, this is a test.", + "type": "message.receive", + } + await send( + { + "type": "message.nack", + "id": "mytopic:0:15", + "message": "Failed to process record", + } + ) + + await call_task + assert any( + "Failed to process record" in record.message for record in caplog.records + ) + + +async def test_lifespan() -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, message_send=AsyncMock() + ) + + loop = asyncio.get_event_loop() + state_item = uuid4() + + lifespan_task = loop.create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": {}, + }, + ) + ) + async with app.lifespan({"item": state_item}): + await lifespan_task + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {"item": state_item}, + "extensions": {"message.ack.out_of_order": {}}, + } + + await call_task + shutdown_task = loop.create_task(kafka_event_source_mapping_handler._shutdown()) + + await shutdown_task + + +def test_lifespan_and_shutdown() -> None: + queue = Queue[Exception | None]() + + async def _app( + scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable + ) -> None: + try: + assert scope["type"] == "lifespan" + lifespan_startup = await receive() + assert lifespan_startup == {"type": "lifespan.startup"} + await send( + { + "type": "lifespan.startup.complete", + } + ) + lifespan_shutdown = await receive() + assert lifespan_shutdown == {"type": "lifespan.shutdown"} + await send( + { + "type": "lifespan.shutdown.complete", + } + ) + queue.put(None) + except Exception as e: # pragma: no cover + queue.put(e) + raise + + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler(_app) + + kafka_event_source_mapping_handler( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": {}, + }, + Mock(), + ) + + kafka_event_source_mapping_handler._sigterm_handler() + + exception = queue.get() + assert exception is None + + +async def test_kafka_event_source_mapping_handler_message_send() -> None: + app = MockApp() + mock_message_send = AsyncMock() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, lifespan=False, message_send=mock_message_send + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {}, + "extensions": {"message.ack.out_of_order": {}}, + } + + await send( + { + "type": "message.send", + "address": "test", + "headers": [(b"test", b"test")], + "payload": b"test", + } + ) + + await call_task + + mock_message_send.__aenter__.return_value.assert_awaited_once_with( + { + "type": "message.send", + "address": "test", + "headers": [(b"test", b"test")], + "payload": b"test", + } + ) diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py new file mode 100644 index 0000000..b751a31 --- /dev/null +++ b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py @@ -0,0 +1,139 @@ +import asyncio +from typing import AsyncGenerator +from unittest.mock import Mock +from uuid import uuid4 + +import pytest +from aiokafka import AIOKafkaConsumer +from aiokafka.admin import AIOKafkaAdminClient +from aiokafka.admin import NewTopic +from aiokafka.errors import KafkaConnectionError +from aiokafka.errors import TopicAlreadyExistsError +from amgi_kafka_event_source_mapping import KafkaEventSourceMappingHandler +from test_utils import MockApp +from testcontainers.kafka import KafkaContainer + + +@pytest.fixture(scope="module") +async def kafka_container() -> AsyncGenerator[KafkaContainer, None]: + with KafkaContainer() as kafka_container: + yield kafka_container + + +@pytest.fixture +def bootstrap_server(kafka_container: KafkaContainer) -> str: + return kafka_container.get_bootstrap_server() # type: ignore + + +async def create_topic(bootstrap_server: str, topic: str) -> str: + admin = AIOKafkaAdminClient(bootstrap_servers=bootstrap_server) + await admin.start() + + try: + await admin.create_topics( + [NewTopic(name=topic, num_partitions=1, replication_factor=1)] + ) + except TopicAlreadyExistsError: # pragma: no cover + pass + finally: + await admin.close() + + return topic + + +@pytest.fixture +async def send_topic(bootstrap_server: str) -> str: + return await create_topic(bootstrap_server, f"send-{uuid4()}") + + +@pytest.mark.integration +async def test_kafka_event_source_mapping_handler_message_send( + bootstrap_server: str, send_topic: str +) -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, lifespan=False + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": bootstrap_server, + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": None, + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [], + }, + ] + }, + }, + ) + ) + async with AIOKafkaConsumer( + send_topic, bootstrap_servers=bootstrap_server, auto_offset_reset="earliest" + ) as consumer: + async with app.call() as (scope, receive, send): + assert scope == { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "mytopic", + "state": {}, + "extensions": {"message.ack.out_of_order": {}}, + } + + await send( + { + "type": "message.send", + "address": send_topic, + "headers": [(b"test", b"test1")], + "payload": b"test1", + } + ) + + await call_task + + message = await consumer.getone() + assert message.topic == send_topic + assert message.value == b"test1" + assert message.headers == (("test", b"test1"),) + + +@pytest.mark.integration +def test_kafka_event_source_mapping_handler_message_send_error() -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, lifespan=False + ) + + with pytest.raises(KafkaConnectionError): + kafka_event_source_mapping_handler( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": str(uuid4()), + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": None, + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [], + }, + ] + }, + }, + Mock(), + ) diff --git a/tox.ini b/tox.ini index 7fb0861..3383355 100644 --- a/tox.ini +++ b/tox.ini @@ -5,9 +5,9 @@ env_list = py314-asyncfast-pydantic212 py313-asyncfast-pydantic2{8-12} clean - py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli} + py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-kafka-event-source-mapping, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli} py3{10-12}-asyncfast-pydantic2{0-12} - py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import + py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-kafka-event-source-mapping, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import [testenv] runner = uv-venv-lock-runner @@ -65,6 +65,11 @@ commands = {[testenv]commands} packages/amgi-common uv_sync_flags = --package=amgi-common +[testenv:py3{10-14}-amgi-kafka-event-source-mapping] +commands = + {[testenv]commands} packages/amgi-kafka-event-source-mapping +uv_sync_flags = --package=amgi-kafka-event-source-mapping + [testenv:py3{10-14}-amgi-paho-mqtt] commands = {[testenv]commands} packages/amgi-paho-mqtt @@ -101,6 +106,13 @@ uv_sync_flags = --package=amgi-common --no-dev +[testenv:py3{10-14}-amgi-kafka-event-source-mapping-import] +commands = + python -c "import amgi_kafka_event_source_mapping" +uv_sync_flags = + --package=amgi-kafka-event-source-mapping + --no-dev + [testenv:py3{10-14}-amgi-paho-mqtt-import] commands = python -c "import amgi_paho_mqtt" diff --git a/uv.lock b/uv.lock index 3573265..e234e3d 100644 --- a/uv.lock +++ b/uv.lock @@ -13,6 +13,7 @@ members = [ "amgi-aiobotocore", "amgi-aiokafka", "amgi-common", + "amgi-kafka-event-source-mapping", "amgi-paho-mqtt", "amgi-redis", "amgi-sqs-event-source-mapping", @@ -412,6 +413,41 @@ dev = [ { name = "test-utils", editable = "packages/test-utils" }, ] +[[package]] +name = "amgi-kafka-event-source-mapping" +version = "0.32.0" +source = { editable = "packages/amgi-kafka-event-source-mapping" } +dependencies = [ + { name = "amgi-aiokafka" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-cov" }, + { name = "pytest-timeout" }, + { name = "test-utils" }, + { name = "testcontainers" }, +] + +[package.metadata] +requires-dist = [ + { name = "amgi-aiokafka", editable = "packages/amgi-aiokafka" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'", specifier = ">=4.15.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.4.1" }, + { name = "pytest-asyncio", specifier = ">=1.3.0" }, + { name = "pytest-cov", specifier = ">=7.0.0" }, + { name = "pytest-timeout", specifier = ">=2.4.0" }, + { name = "test-utils", editable = "packages/test-utils" }, + { name = "testcontainers", extras = ["kafka"], specifier = ">=4.13.0" }, +] + [[package]] name = "amgi-paho-mqtt" version = "0.32.0"