diff --git a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py index 364d823..e3b60e8 100644 --- a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py +++ b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py @@ -6,6 +6,7 @@ from collections.abc import Callable from collections.abc import Iterable from typing import Any +from typing import Literal from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer @@ -23,14 +24,22 @@ logger = logging.getLogger("amgi-aiokafka.error") +AutoOffsetReset = Literal["earliest", "latest", "none"] + + def run( app: AMGIApplication, *topics: str, bootstrap_servers: str | list[str] = "localhost", group_id: str | None = None, + auto_offset_reset: AutoOffsetReset = "latest", ) -> None: server = Server( - app, *topics, bootstrap_servers=bootstrap_servers, group_id=group_id + app, + *topics, + bootstrap_servers=bootstrap_servers, + group_id=group_id, + auto_offset_reset=auto_offset_reset, ) server_serve(server) @@ -40,12 +49,14 @@ def _run_cli( topics: list[str], bootstrap_servers: list[str] | None = None, group_id: str | None = None, + auto_offset_reset: AutoOffsetReset = "latest", ) -> None: run( app, *topics, bootstrap_servers=bootstrap_servers or ["localhost"], group_id=group_id, + auto_offset_reset=auto_offset_reset, ) @@ -100,11 +111,13 @@ def __init__( *topics: str, bootstrap_servers: str | list[str], group_id: str | None, + auto_offset_reset: AutoOffsetReset = "latest", ) -> None: self._app = app self._topics = topics self._bootstrap_servers = bootstrap_servers self._group_id = group_id + self._auto_offset_reset = auto_offset_reset self._ackable_consumer = self._group_id is not None self._producer: AIOKafkaProducer | None = None self._producer_lock = Lock() @@ -116,6 +129,7 @@ async def serve(self) -> None: bootstrap_servers=self._bootstrap_servers, group_id=self._group_id, enable_auto_commit=False, + auto_offset_reset=self._auto_offset_reset, ) async with self._consumer: async with Lifespan(self._app) as state: diff --git a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py index 8a55a6b..d2293df 100644 --- a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py +++ b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py @@ -4,6 +4,9 @@ import pytest from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer +from aiokafka.admin import AIOKafkaAdminClient +from aiokafka.admin import NewTopic +from aiokafka.errors import TopicAlreadyExistsError from amgi_aiokafka import _run_cli from amgi_aiokafka import run from amgi_aiokafka import Server @@ -24,19 +27,43 @@ def bootstrap_server(kafka_container: KafkaContainer) -> str: return kafka_container.get_bootstrap_server() # type: ignore +async def create_topic(bootstrap_server: str, topic: str) -> str: + admin = AIOKafkaAdminClient(bootstrap_servers=bootstrap_server) + await admin.start() + + try: + await admin.create_topics( + [NewTopic(name=topic, num_partitions=1, replication_factor=1)] + ) + except TopicAlreadyExistsError: # pragma: no cover + pass + finally: + await admin.close() + + return topic + + @pytest.fixture -def topic() -> str: - return f"receive-{uuid4()}" +async def receive_topic(bootstrap_server: str) -> str: + return await create_topic(bootstrap_server, f"receive-{uuid4()}") @pytest.fixture -async def app(bootstrap_server: str, topic: str) -> AsyncGenerator[MockApp, None]: +async def send_topic(bootstrap_server: str) -> str: + return await create_topic(bootstrap_server, f"send-{uuid4()}") + + +@pytest.fixture +async def app( + bootstrap_server: str, receive_topic: str +) -> AsyncGenerator[MockApp, None]: app = MockApp() server = Server( app, - topic, + receive_topic, bootstrap_servers=bootstrap_server, group_id=str(uuid4()), + auto_offset_reset="earliest", ) async with app.lifespan(server=server): @@ -44,14 +71,16 @@ async def app(bootstrap_server: str, topic: str) -> AsyncGenerator[MockApp, None @pytest.mark.integration -async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None: +async def test_message(bootstrap_server: str, app: MockApp, receive_topic: str) -> None: producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server) await producer.start() - await producer.send_and_wait(topic, b"value", b"key", headers=[("test", b"test")]) + await producer.send_and_wait( + receive_topic, b"value", b"key", headers=[("test", b"test")] + ) async with app.call() as (scope, receive, send): assert scope == { - "address": topic, + "address": receive_topic, "amgi": {"spec_version": "1.0", "version": "1.0"}, "type": "message", "state": {}, @@ -61,7 +90,7 @@ async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None: assert message_receive["type"] == "message.receive" assert message_receive == { "headers": [(b"test", b"test")], - "id": f"{topic}:0:0", + "id": f"{receive_topic}:0:0", "more_messages": False, "payload": b"value", "bindings": {"kafka": {"key": b"key"}}, @@ -78,12 +107,13 @@ async def test_message(bootstrap_server: str, app: MockApp, topic: str) -> None: @pytest.mark.integration -async def test_message_send(bootstrap_server: str, app: MockApp, topic: str) -> None: +async def test_message_send( + bootstrap_server: str, app: MockApp, receive_topic: str, send_topic: str +) -> None: producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server) await producer.start() - await producer.send_and_wait(topic, b"") - send_topic = f"send-{uuid4()}" + await producer.send_and_wait(receive_topic, b"") async with AIOKafkaConsumer( send_topic, bootstrap_servers=bootstrap_server @@ -108,13 +138,12 @@ async def test_message_send(bootstrap_server: str, app: MockApp, topic: str) -> @pytest.mark.integration async def test_message_send_kafka_key( - bootstrap_server: str, app: MockApp, topic: str + bootstrap_server: str, app: MockApp, receive_topic: str, send_topic: str ) -> None: producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server) await producer.start() - await producer.send_and_wait(topic, b"") - send_topic = f"send-{uuid4()}" + await producer.send_and_wait(receive_topic, b"") async with AIOKafkaConsumer( send_topic, bootstrap_servers=bootstrap_server @@ -137,11 +166,11 @@ async def test_message_send_kafka_key( @pytest.mark.integration -async def test_lifespan(bootstrap_server: str, topic: str) -> None: +async def test_lifespan(bootstrap_server: str, receive_topic: str) -> None: app = MockApp() server = Server( app, - topic, + receive_topic, bootstrap_servers=bootstrap_server, group_id=None, ) @@ -152,12 +181,12 @@ async def test_lifespan(bootstrap_server: str, topic: str) -> None: async with app.lifespan({"item": state_item}, server): await producer.send_and_wait( - topic, + receive_topic, b"", ) async with app.call() as (scope, receive, send): assert scope == { - "address": topic, + "address": receive_topic, "amgi": {"spec_version": "1.0", "version": "1.0"}, "type": "message", "state": {"item": state_item}, @@ -165,10 +194,12 @@ async def test_lifespan(bootstrap_server: str, topic: str) -> None: @pytest.mark.integration -def test_run(bootstrap_server: str, topic: str) -> None: - assert_run_can_terminate(run, topic, bootstrap_servers=bootstrap_server) +def test_run(bootstrap_server: str, receive_topic: str) -> None: + assert_run_can_terminate(run, receive_topic, bootstrap_servers=bootstrap_server) @pytest.mark.integration -def test_run_cli(bootstrap_server: str, topic: str) -> None: - assert_run_can_terminate(_run_cli, [topic], bootstrap_servers=bootstrap_server) +def test_run_cli(bootstrap_server: str, receive_topic: str) -> None: + assert_run_can_terminate( + _run_cli, [receive_topic], bootstrap_servers=bootstrap_server + )