diff --git a/README.md b/README.md index 1499b51..7604bca 100644 --- a/README.md +++ b/README.md @@ -115,21 +115,21 @@ At its core, AMGI defines a minimal, low-level callable interface, similar in sp ```python async def app(scope, receive, send): - message = await receive() - - await send( - { - "type": "message.ack", - "id": message["id"], - } - ) + try: + # Do some message handling here! + await send( + { + "type": "message.ack", + } + ) + except Exception as e: + await send({"type": "message.nack", "message": str(e)}) ``` AMGI provides: - A standard application callable - A structured message scope -- A receive mechanism - A send mechanism Frameworks like AsyncFast can be built on top of this interface. diff --git a/docs/extensions.rst b/docs/extensions.rst index 0d6bf69..5400838 100644 --- a/docs/extensions.rst +++ b/docs/extensions.rst @@ -2,28 +2,5 @@ Extensions ############ -The ASGI specification provides for server-specific extensions to be used outside of the core ASGI specification. This -document specifies some common extensions. - -****************************** - Acknowledgement Out Of Order -****************************** - -This is sent by the server to indicate that acknowledgements can be sent out of order. - -.. code:: - - "scope": { - ... - "extensions": { - "message.ack.out_of_order": {} - } - } - -When this extension is present, the application MAY send ``message.ack`` and ``message.nack`` events for received -messages in any order. The server MUST accept out-of-order acknowledgements and MUST NOT treat them as a protocol error. - -When this extension is absent, applications SHOULD assume that acknowledgement ordering is constrained by the server and -MAY be required to follow message delivery order. - -This extension does not change the semantics of message delivery or batching, and only affects acknowledgement ordering. +The ASGI specification provides for server-specific extensions to be used outside of the core ASGI specification. There +are currently no extensions, but it is left here for future use. \ No newline at end of file diff --git a/docs/specifications/lifespan.py b/docs/specifications/lifespan.py new file mode 100644 index 0000000..0b22caa --- /dev/null +++ b/docs/specifications/lifespan.py @@ -0,0 +1,16 @@ +# mypy: ignore-errors + + +async def app(scope, receive, send): + if scope["type"] == "lifespan": + while True: + message = await receive() + if message["type"] == "lifespan.startup": + ... # Do some startup here! + await send({"type": "lifespan.startup.complete"}) + elif message["type"] == "lifespan.shutdown": + ... # Do some shutdown here! + await send({"type": "lifespan.shutdown.complete"}) + return + else: + pass # Handle other types diff --git a/docs/specifications/lifespan.rst b/docs/specifications/lifespan.rst index 38b7343..fbdb0cd 100644 --- a/docs/specifications/lifespan.rst +++ b/docs/specifications/lifespan.rst @@ -9,22 +9,9 @@ coordinate resource allocation, background tasks, and graceful shutdown. A simple implementation would be: -.. code:: python - - async def app(scope, receive, send): - if scope["type"] == "lifespan": - while True: - message = await receive() - if message["type"] == "lifespan.startup": - ... # Do some startup here! - await send({"type": "lifespan.startup.complete"}) - elif message["type"] == "lifespan.shutdown": - ... # Do some shutdown here! - await send({"type": "lifespan.shutdown.complete"}) - return - else: - pass # Handle other types +.. literalinclude:: lifespan.py + :lines: 4- ********** Lifespan diff --git a/docs/specifications/message.py b/docs/specifications/message.py new file mode 100644 index 0000000..389d67a --- /dev/null +++ b/docs/specifications/message.py @@ -0,0 +1,24 @@ +# mypy: ignore-errors + + +async def app(scope, receive, send): + if scope["type"] == "message": + try: + headers = scope["headers"] + payload = scope.get("payload") + bindings = scope.get("bindings", {}) + ... # Do some message handling here! + await send( + { + "type": "message.ack", + } + ) + except Exception as e: + await send( + { + "type": "message.nack", + "message": str(e), + } + ) + else: + pass # Handle other types diff --git a/docs/specifications/message.rst b/docs/specifications/message.rst index 7c4b418..e06d163 100644 --- a/docs/specifications/message.rst +++ b/docs/specifications/message.rst @@ -9,60 +9,20 @@ agnosticism. A simple implementation would be: -.. code:: python - - async def app(scope, receive, send): - if scope["type"] == "message": - more_messages = True - while more_messages: - message = await receive() - message_id = message["id"] - try: - headers = message["headers"] - payload = message.get("payload") - bindings = message.get("bindings", {}) - ... # Do some message handling here! - await send( - { - "type": "message.ack", - "id": message_id, - } - ) - except Exception as e: - await send( - { - "type": "message.nack", - "id": message_id, - "message": str(e), - } - ) - more_messages = message.get("more_messages") - else: - pass # Handle other types - +.. literalinclude:: message.py + :lines: 4- ********* Message ********* -A message batch has a single message scope. Your application will be called once per batch. For protocols that do not -support batched consumption a batch of one message should be sent to the application. +A message has a single message scope. Your application will be called once per message. The message scope information passed in scope contains: .. typeddict:: amgi_types.MessageScope :type: scope -******************************************** - Receive message - :py:func:`receive` event -******************************************** - -Sent to the application to indicate an incoming message in the batch. - -Keys: - -.. typeddict:: amgi_types.MessageReceiveEvent - ********************************************** Response message ack - :py:func:`send` event ********************************************** diff --git a/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py b/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py index 7fb85dc..6c36b83 100644 --- a/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py +++ b/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py @@ -1,6 +1,5 @@ import asyncio import sys -from collections import deque from collections.abc import Awaitable from collections.abc import Callable from collections.abc import Generator @@ -17,8 +16,8 @@ from amgi_common import server_serve from amgi_common import Stoppable from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import MessageSendEvent @@ -82,22 +81,8 @@ def _encode_message_attributes( yield name.encode(), encoded_value -class _Receive: - def __init__(self, messages: Iterable[Any]) -> None: - self._deque = deque(messages) - - async def __call__(self) -> MessageReceiveEvent: - message = self._deque.popleft() - encoded_headers = list( - _encode_message_attributes(message.get("MessageAttributes", {})) - ) - return { - "type": "message.receive", - "id": message["ReceiptHandle"], - "headers": encoded_headers, - "payload": message["Body"].encode(), - "more_messages": len(self._deque) != 0, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") async def _get_queue_url(client: Any, queue_name: str) -> str: @@ -264,10 +249,12 @@ async def __aexit__( class _Send: def __init__( self, + receipt_handle: str, delete_batcher: _DeleteBatcher, queue_url: str, message_send: _MessageSendT, ) -> None: + self._receipt_handle = receipt_handle self._queue_url = queue_url self._delete_batcher = delete_batcher self._message_send = message_send @@ -276,7 +263,7 @@ async def __call__(self, event: AMGISendEvent) -> None: if event["type"] == "message.ack": await self._delete_batcher.delete_message( self._queue_url, - event["id"], + self._receipt_handle, ) if event["type"] == "message.send": await self._message_send(event) @@ -357,19 +344,46 @@ async def _queue_loop( MessageAttributeNames=["All"], ): messages = messages_response.get("Messages", ()) - if messages: - scope: MessageScope = { - "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, - "address": queue_name, - "state": state.copy(), - "extensions": {"message.ack.out_of_order": {}}, - } - await self._app( - scope, - _Receive(messages), - _Send(delete_batcher, queue_url, message_send), + await asyncio.gather( + *( + self._call_message( + message, + queue_url, + queue_name, + delete_batcher, + message_send, + state, + ) + for message in messages ) + ) + + async def _call_message( + self, + message: Any, + queue_url: str, + queue_name: str, + delete_batcher: _DeleteBatcher, + message_send: _MessageSendT, + state: dict[str, Any], + ) -> None: + encoded_headers = list( + _encode_message_attributes(message.get("MessageAttributes", {})) + ) + + scope: MessageScope = { + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": queue_name, + "headers": encoded_headers, + "payload": message["Body"].encode(), + "state": state.copy(), + } + await self._app( + scope, + _receive, + _Send(message["ReceiptHandle"], delete_batcher, queue_url, message_send), + ) def stop(self) -> None: self._stoppable.stop() diff --git a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py index 8123179..33d59bb 100644 --- a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py +++ b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py @@ -1,6 +1,7 @@ from collections.abc import AsyncGenerator from typing import Any from typing import cast +from typing import Generator from uuid import uuid4 import pytest @@ -21,7 +22,7 @@ def __eq__(self, other: Any) -> bool: @pytest.fixture(scope="module") -async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: +def localstack_container() -> Generator[LocalStackContainer, None, None]: with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services( "sqs" ) as localstack_container: @@ -87,25 +88,15 @@ async def test_message( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": queue_name, - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - message_receive = await receive() - - assert message_receive["type"] == "message.receive" - assert message_receive == { - "type": "message.receive", - "id": _StrMatcher(), "headers": [(b"string-value", b"string"), (b"bytes-value", b"bytes")], "payload": b"value", - "more_messages": False, + "state": {}, } message_ack_event: MessageAckEvent = { "type": "message.ack", - "id": message_receive["id"], } await send(message_ack_event) @@ -131,25 +122,15 @@ async def test_message_nack( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": queue_name, - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - message_receive = await receive() - - assert message_receive["type"] == "message.receive" - assert message_receive == { - "type": "message.receive", - "id": _StrMatcher(), "headers": [(b"string-value", b"string"), (b"bytes-value", b"bytes")], "payload": b"value", - "more_messages": False, + "state": {}, } message_ack_event: MessageNackEvent = { "type": "message.nack", - "id": message_receive["id"], "message": "", } await send(message_ack_event) @@ -295,10 +276,11 @@ async def test_lifespan( async with app.call() as (scope, receive, send): assert scope == { "address": queue_name, - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "headers": [(b"string-value", b"string"), (b"bytes-value", b"bytes")], + "payload": b"value", + "amgi": {"version": "2.0", "spec_version": "2.0"}, "type": "message", "state": {"item": state_item}, - "extensions": {"message.ack.out_of_order": {}}, } @@ -324,3 +306,20 @@ def test_run_cli(queue_name: str, localstack_container: LocalStackContainer) -> aws_access_key_id="testcontainers-localstack", aws_secret_access_key="testcontainers-localstack", ) + + +@pytest.mark.integration +async def test_message_receive_not_callable( + app: MockApp, queue_url: str, queue_name: str, sqs_client: Any +) -> None: + sqs_client.send_message( + QueueUrl=queue_url, + MessageBody="value", + MessageAttributes={ + "string-value": {"StringValue": "string", "DataType": "StringValue"}, + "bytes-value": {"BinaryValue": b"bytes", "DataType": "BinaryValue"}, + }, + ) + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() diff --git a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py index 8f2a7e4..789cfac 100644 --- a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py +++ b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py @@ -1,10 +1,8 @@ import asyncio import logging import sys -from collections import deque from collections.abc import Awaitable from collections.abc import Callable -from collections.abc import Iterable from types import TracebackType from typing import Any from typing import AsyncContextManager @@ -18,8 +16,8 @@ from amgi_common import server_serve from amgi_common import Stoppable from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import MessageSendEvent @@ -73,44 +71,33 @@ def _run_cli( ) -class _Receive: - def __init__( - self, - records: Iterable[ConsumerRecord], - ) -> None: - self._deque = deque(records) - - async def __call__(self) -> MessageReceiveEvent: - record = self._deque.popleft() - encoded_headers = [(key.encode(), value) for key, value in record.headers] - - return { - "type": "message.receive", - "id": f"{record.topic}:{record.partition}:{record.offset}", - "headers": encoded_headers, - "payload": record.value, - "bindings": {"kafka": {"key": record.key}}, - "more_messages": len(self._deque) != 0, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") class _Send: def __init__( self, consumer: AIOKafkaConsumer, - message_receive_ids: dict[str, dict[TopicPartition, int]], + record: ConsumerRecord, ackable_consumer: bool, message_send: _MessageSendT, ) -> None: self._consumer = consumer self._message_send = message_send - self._message_receive_ids = message_receive_ids + self._record = record self._ackable_consumer = ackable_consumer + self.acked = False async def __call__(self, event: AMGISendEvent) -> None: - if event["type"] == "message.ack" and self._ackable_consumer: - offsets = self._message_receive_ids.pop(event["id"]) - await self._consumer.commit(offsets) + if event["type"] == "message.ack": + self.acked = True + if self._ackable_consumer: + topic_partition = TopicPartition( + self._record.topic, self._record.partition + ) + offset = self._record.offset + 1 + await self._consumer.commit({topic_partition: offset}) if event["type"] == "message.send": await self._message_send(event) @@ -199,31 +186,40 @@ async def _handle_partition_records( message_send: _MessageSendT, state: dict[str, Any], ) -> None: - if records: - scope: MessageScope = { - "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, - "address": topic_partition.topic, - "state": state.copy(), - } - - message_receive_ids = { - f"{record.topic}:{record.partition}:{record.offset}": { - TopicPartition(record.topic, record.partition): record.offset + 1 - } - for record in records - } - - await self._app( - scope, - _Receive(records), - _Send( - self._consumer, - message_receive_ids, - self._ackable_consumer, - message_send, - ), - ) + for record in records: + if not await self._handle_record(record, message_send, state): + break + + async def _handle_record( + self, + record: ConsumerRecord, + message_send: _MessageSendT, + state: dict[str, Any], + ) -> bool: + encoded_headers = [(key.encode(), value) for key, value in record.headers] + + scope: MessageScope = { + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": record.topic, + "headers": encoded_headers, + "payload": record.value, + "bindings": {"kafka": {"key": record.key}}, + "state": state.copy(), + } + + send = _Send( + self._consumer, + record, + self._ackable_consumer, + message_send, + ) + await self._app( + scope, + _receive, + send, + ) + return send.acked def stop(self) -> None: self._stoppable.stop() diff --git a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py index dc147bc..e279739 100644 --- a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py +++ b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py @@ -1,4 +1,5 @@ from collections.abc import AsyncGenerator +from typing import Generator from uuid import uuid4 import pytest @@ -17,7 +18,7 @@ @pytest.fixture(scope="module") -async def kafka_container() -> AsyncGenerator[KafkaContainer, None]: +def kafka_container() -> Generator[KafkaContainer, None, None]: with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container: yield kafka_container @@ -81,25 +82,16 @@ async def test_message(bootstrap_server: str, app: MockApp, receive_topic: str) async with app.call() as (scope, receive, send): assert scope == { "address": receive_topic, - "amgi": {"spec_version": "1.0", "version": "1.0"}, - "type": "message", - "state": {}, - } - - message_receive = await receive() - assert message_receive["type"] == "message.receive" - assert message_receive == { + "bindings": {"kafka": {"key": b"key"}}, "headers": [(b"test", b"test")], - "id": f"{receive_topic}:0:0", - "more_messages": False, "payload": b"value", - "bindings": {"kafka": {"key": b"key"}}, - "type": "message.receive", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "type": "message", + "state": {}, } message_ack_event: MessageAckEvent = { "type": "message.ack", - "id": message_receive["id"], } await send(message_ack_event) @@ -187,7 +179,10 @@ async def test_lifespan(bootstrap_server: str, receive_topic: str) -> None: async with app.call() as (scope, receive, send): assert scope == { "address": receive_topic, - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "bindings": {"kafka": {"key": None}}, + "headers": [], + "payload": b"", + "amgi": {"version": "2.0", "spec_version": "2.0"}, "type": "message", "state": {"item": state_item}, } @@ -203,3 +198,16 @@ def test_run_cli(bootstrap_server: str, receive_topic: str) -> None: assert_run_can_terminate( _run_cli, [receive_topic], bootstrap_servers=bootstrap_server ) + + +@pytest.mark.integration +async def test_message_receive_not_callable( + bootstrap_server: str, app: MockApp, receive_topic: str, send_topic: str +) -> None: + producer = AIOKafkaProducer(bootstrap_servers=bootstrap_server) + await producer.start() + + await producer.send_and_wait(receive_topic, b"") + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() diff --git a/packages/amgi-common/src/amgi_common/__init__.py b/packages/amgi-common/src/amgi_common/__init__.py index e98f49e..7c8f66a 100644 --- a/packages/amgi-common/src/amgi_common/__init__.py +++ b/packages/amgi-common/src/amgi_common/__init__.py @@ -82,7 +82,7 @@ async def __aenter__(self) -> dict[str, Any]: async def _main(self) -> None: scope: LifespanScope = { "type": "lifespan", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "state": self._state, } try: diff --git a/packages/amgi-common/tests_amgi_common/test_lifespan.py b/packages/amgi-common/tests_amgi_common/test_lifespan.py index 9a77fe8..a7889be 100644 --- a/packages/amgi-common/tests_amgi_common/test_lifespan.py +++ b/packages/amgi-common/tests_amgi_common/test_lifespan.py @@ -25,7 +25,7 @@ async def test_lifespan() -> None: async with app.call() as (scope, receive, send): assert scope == { - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "state": {}, "type": "lifespan", } @@ -57,7 +57,7 @@ async def test_lifespan_should_use_supplied_state() -> None: async with app.call() as (scope, receive, send): assert scope == { - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "state": {}, "type": "lifespan", } diff --git a/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py b/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py index c3cf6f5..2d616f1 100644 --- a/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py +++ b/packages/amgi-kafka-event-source-mapping/src/amgi_kafka_event_source_mapping/__init__.py @@ -5,7 +5,6 @@ import signal import sys from asyncio import Task -from collections import deque from collections.abc import Awaitable from collections.abc import Callable from collections.abc import Iterable @@ -20,8 +19,8 @@ from amgi_aiokafka import MessageSend as AioKafkaMessageSend from amgi_common import Lifespan from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import MessageSendEvent from typing_extensions import NotRequired @@ -73,17 +72,16 @@ class _KafkaEventSourceMapping(TypedDict): class _Send: - def __init__( - self, record_nacks: dict[str, _RecordNack], message_send: _MessageSendT - ) -> None: + def __init__(self, record_nack: _RecordNack, message_send: _MessageSendT) -> None: self._message_send = message_send - self.record_nacks = record_nacks + self.record_nack: _RecordNack | None = record_nack async def __call__(self, event: AMGISendEvent) -> None: if event["type"] == "message.ack": - self.record_nacks.pop(event["id"]) + self.record_nack = None if event["type"] == "message.nack": - self.record_nacks[event["id"]].message = event["message"] + assert self.record_nack is not None + self.record_nack.message = event["message"] if event["type"] == "message.send": await self._message_send(event) @@ -104,30 +102,8 @@ def _record_id(message: _KafkaRecord) -> str: return f"{topic}:{partition}:{offset}" -class _Receive: - def __init__(self, records: Iterable[_KafkaRecord]) -> None: - self._deque = deque(records) - - async def __call__(self) -> MessageReceiveEvent: - message = self._deque.popleft() - headers = message.get("headers", []) - encoded_headers = list(_encode_record_headers(headers)) - - value = message.get("value") - key = message.get("key") - - record_id = _record_id(message) - - return { - "type": "message.receive", - "id": record_id, - "headers": encoded_headers, - "payload": None if value is None else base64.b64decode(value), - "bindings": { - "kafka": {"key": None if key is None else base64.b64decode(key)} - }, - "more_messages": len(self._deque) != 0, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") class _MessageSender: @@ -266,27 +242,54 @@ async def _call_source_batch( records: Iterable[_KafkaRecord], message_sender: _MessageSender | _MessageSendWrapper, ) -> Iterable[_RecordNack]: + record_nacks = await asyncio.gather( + *( + self._call_record(record, bootstrap_servers, topic, message_sender) + for record in records + ) + ) + + return [record_nack for record_nack in record_nacks if record_nack is not None] + + async def _call_record( + self, + record: _KafkaRecord, + bootstrap_servers: str, + topic: str, + message_sender: _MessageSender | _MessageSendWrapper, + ) -> _RecordNack | None: + + headers = record.get("headers", []) + encoded_headers = list(_encode_record_headers(headers)) + + value = record.get("value") + key = record.get("key") + scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": topic, + "headers": encoded_headers, + "payload": None if value is None else base64.b64decode(value), + "bindings": { + "kafka": {"key": None if key is None else base64.b64decode(key)} + }, "state": self._state.copy(), - "extensions": {"message.ack.out_of_order": {}}, } message_send = await message_sender.get_message_send(bootstrap_servers) - record_nacks = { - _record_id(record): _RecordNack( + + send = _Send( + _RecordNack( record["topic"], record["partition"], record["offset"], "Ack not received", - ) - for record in records - } - send = _Send(record_nacks, message_send) - await self._app(scope, _Receive(records), send) + ), + message_send, + ) + await self._app(scope, _receive, send) - return send.record_nacks.values() + return send.record_nack def _sigterm_handler(self) -> None: self._loop.run_until_complete(self._shutdown()) diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py index cb6df7c..5f147da 100644 --- a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py +++ b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler.py @@ -62,24 +62,17 @@ async def test_kafka_event_source_mapping_handler_records() -> None: async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { "bindings": {"kafka": {"key": b"key"}}, "headers": [(b"headerKey", b"headerValue")], - "id": "mytopic:0:15", - "more_messages": False, "payload": b"Hello, this is a test.", - "type": "message.receive", + "state": {}, } + await send( { "type": "message.ack", - "id": "mytopic:0:15", } ) @@ -134,24 +127,17 @@ async def test_kafka_event_source_mapping_handler_error_nack() -> None: async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { "bindings": {"kafka": {"key": b"key"}}, "headers": [(b"headerKey", b"headerValue")], - "id": "mytopic:0:15", - "more_messages": False, "payload": b"Hello, this is a test.", - "type": "message.receive", + "state": {}, } + await send( { "type": "message.nack", - "id": "mytopic:0:15", "message": "Failed to process record", } ) @@ -211,24 +197,17 @@ async def test_kafka_event_source_mapping_handler_log_nack( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { "bindings": {"kafka": {"key": b"key"}}, "headers": [(b"headerKey", b"headerValue")], - "id": "mytopic:0:15", - "more_messages": False, "payload": b"Hello, this is a test.", - "type": "message.receive", + "state": {}, } + await send( { "type": "message.nack", - "id": "mytopic:0:15", "message": "Failed to process record", } ) @@ -303,10 +282,12 @@ async def test_lifespan() -> None: async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", + "bindings": {"kafka": {"key": b"key"}}, + "headers": [(b"headerKey", b"headerValue")], + "payload": b"Hello, this is a test.", "state": {"item": state_item}, - "extensions": {"message.ack.out_of_order": {}}, } await call_task @@ -409,10 +390,12 @@ async def test_kafka_event_source_mapping_handler_message_send() -> None: async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", + "bindings": {"kafka": {"key": b"key"}}, + "headers": [(b"headerKey", b"headerValue")], + "payload": b"Hello, this is a test.", "state": {}, - "extensions": {"message.ack.out_of_order": {}}, } await send( @@ -434,3 +417,55 @@ async def test_kafka_event_source_mapping_handler_message_send() -> None: "payload": b"test", } ) + + +async def test_kafka_event_source_mapping_receive_not_callable() -> None: + app = MockApp() + kafka_event_source_mapping_handler = KafkaEventSourceMappingHandler( + app, lifespan=False, message_send=AsyncMock() + ) + + call_task = asyncio.get_running_loop().create_task( + kafka_event_source_mapping_handler._call( + { + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "a2V5", + "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101, + ] + } + ], + }, + ] + }, + }, + ) + ) + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() + + await call_task diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py index f004a7f..78ea71b 100644 --- a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py +++ b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py @@ -1,5 +1,5 @@ import asyncio -from typing import AsyncGenerator +from typing import Generator from unittest.mock import Mock from uuid import uuid4 @@ -15,7 +15,7 @@ @pytest.fixture(scope="module") -async def kafka_container() -> AsyncGenerator[KafkaContainer, None]: +def kafka_container() -> Generator[KafkaContainer, None, None]: with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container: yield kafka_container @@ -84,10 +84,12 @@ async def test_kafka_event_source_mapping_handler_message_send( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "mytopic", + "bindings": {"kafka": {"key": None}}, + "headers": [], + "payload": b"Hello, this is a test.", "state": {}, - "extensions": {"message.ack.out_of_order": {}}, } await send( diff --git a/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py b/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py index ea85127..34b7274 100644 --- a/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py +++ b/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py @@ -13,8 +13,8 @@ from amgi_common import Lifespan from amgi_common import server_serve from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from paho.mqtt.client import Client from paho.mqtt.client import ConnectFlags @@ -46,17 +46,8 @@ class PublishError(OSError): """Raised when publishing fails.""" -class _Receive: - def __init__(self, message: MQTTMessage) -> None: - self._message = message - - async def __call__(self) -> MessageReceiveEvent: - return { - "type": "message.receive", - "id": str(self._message.mid), - "headers": [], - "payload": self._message.payload, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") class Server: @@ -116,11 +107,13 @@ def _on_message(self, client: Client, userdata: Any, message: MQTTMessage) -> No async def _handle_message(self, message: MQTTMessage) -> None: scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": message.topic, + "headers": [], + "payload": message.payload, "state": self._state.copy(), } - await self._app(scope, _Receive(message), self._send) + await self._app(scope, _receive, self._send) def _on_disconnect( self, diff --git a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py index b9f6ca4..1576e52 100644 --- a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py +++ b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py @@ -3,6 +3,7 @@ from pathlib import Path from threading import Event from typing import Any +from typing import Generator from uuid import uuid4 import pytest @@ -24,7 +25,7 @@ def topic() -> str: @pytest.fixture(scope="module") -async def mosquitto_container() -> AsyncGenerator[MosquittoContainer, None]: +def mosquitto_container() -> Generator[MosquittoContainer, None, None]: mosquitto_container = MosquittoContainer( image="ghcr.io/asyncfast/eclipse-mosquitto:2.0.22" ).with_volume_mapping( @@ -64,17 +65,11 @@ async def test_message( async with app.call() as (scope, receive, send): assert scope == { "address": topic, - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "type": "message", - "state": {}, - } - - message_receive = await receive() - assert message_receive == { "headers": [], - "id": "0", "payload": b"test", - "type": "message.receive", + "state": {}, } @@ -149,7 +144,9 @@ async def test_lifespan(topic: str, mosquitto_container: MosquittoContainer) -> async with app.call() as (scope, receive, send): assert scope == { "address": topic, - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "headers": [], + "payload": b"", + "amgi": {"version": "2.0", "spec_version": "2.0"}, "type": "message", "state": {"item": state_item}, } @@ -182,3 +179,14 @@ def test_run(topic: str, mosquitto_container: MosquittoContainer) -> None: host=mosquitto_container.get_container_host_ip(), port=mosquitto_container.get_exposed_port(mosquitto_container.MQTT_PORT), ) + + +@pytest.mark.integration +async def test_message_receive_not_callable( + app: MockApp, topic: str, mosquitto_container: MosquittoContainer +) -> None: + mosquitto_container.publish_message(topic, "test") + + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() diff --git a/packages/amgi-redis/src/amgi_redis/__init__.py b/packages/amgi-redis/src/amgi_redis/__init__.py index d9ec3a5..12ccf7f 100644 --- a/packages/amgi-redis/src/amgi_redis/__init__.py +++ b/packages/amgi-redis/src/amgi_redis/__init__.py @@ -11,8 +11,8 @@ from amgi_common import server_serve from amgi_common import Stoppable from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import MessageSendEvent from redis.asyncio import from_url @@ -44,18 +44,8 @@ def _run_cli( run(app, *channels, url=url) -class _Receive: - def __init__(self, message: dict[str, Any]) -> None: - self._message = message - - async def __call__(self) -> MessageReceiveEvent: - return { - "type": "message.receive", - "id": "", - "headers": [], - "payload": self._message["data"], - "more_messages": False, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") class _Send: @@ -131,11 +121,13 @@ async def _handle_message( ) -> None: scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": message["channel"].decode(), + "headers": [], + "payload": message["data"], "state": state.copy(), } - await self._app(scope, _Receive(message), _Send(message_send)) + await self._app(scope, _receive, _Send(message_send)) def stop(self) -> None: self._stoppable.stop() diff --git a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py index bd6071a..a835b47 100644 --- a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py +++ b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py @@ -1,5 +1,6 @@ from collections.abc import AsyncGenerator from typing import Any +from typing import Generator from uuid import uuid4 import pytest @@ -13,7 +14,7 @@ @pytest.fixture(scope="module") -async def redis_container() -> AsyncGenerator[AsyncRedisContainer, None]: +def redis_container() -> Generator[AsyncRedisContainer, None, None]: with AsyncRedisContainer(image="ghcr.io/asyncfast/redis:8.2.2") as redis_container: yield redis_container @@ -48,18 +49,11 @@ async def test_message( async with app.call() as (scope, receive, send): assert scope == { "address": channel, - "amgi": {"spec_version": "1.0", "version": "1.0"}, - "type": "message", - "state": {}, - } - - message_receive = await receive() - assert message_receive == { + "amgi": {"version": "2.0", "spec_version": "2.0"}, "headers": [], - "id": "", - "more_messages": False, "payload": b"value", - "type": "message.receive", + "state": {}, + "type": "message", } @@ -120,9 +114,11 @@ async def test_lifespan(redis_container: AsyncRedisContainer, channel: str) -> N async with app.call() as (scope, receive, send): assert scope == { "address": channel, - "amgi": {"spec_version": "1.0", "version": "1.0"}, - "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "headers": [], + "payload": b"", "state": {"item": state_item}, + "type": "message", } @@ -140,3 +136,16 @@ def test_run_cli(redis_container: AsyncRedisContainer, channel: str) -> None: port = redis_container.get_exposed_port(redis_container.port) assert_run_can_terminate(_run_cli, [channel], url=f"redis://{host}:{port}") + + +@pytest.mark.integration +async def test_message_receive_not_callable( + app: MockApp, channel: str, redis_container: AsyncRedisContainer +) -> None: + client = await redis_container.get_async_client() + + await client.publish(channel, "") + + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() diff --git a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py index d9fd2cc..bf6f6f6 100644 --- a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py +++ b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py @@ -7,7 +7,6 @@ import sys import warnings from collections import defaultdict -from collections import deque from collections.abc import Awaitable from collections.abc import Callable from collections.abc import Generator @@ -25,8 +24,8 @@ from amgi_common import OperationBatcher from amgi_common import OperationCacher from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import MessageSendEvent @@ -91,22 +90,8 @@ def _encode_message_attributes( yield name.encode(), encoded_value -class _Receive: - def __init__(self, records: Iterable[_Record]) -> None: - self._deque = deque(records) - - async def __call__(self) -> MessageReceiveEvent: - message = self._deque.popleft() - encoded_headers = list( - _encode_message_attributes(message.get("messageAttributes", {})) - ) - return { - "type": "message.receive", - "id": message["messageId"], - "headers": encoded_headers, - "payload": message["body"].encode(), - "more_messages": len(self._deque) != 0, - } +async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") class _QueueUrlCache: @@ -251,13 +236,13 @@ def _send_batcher(self) -> _SendBatcher: class _Send: - def __init__(self, message_ids: set[str], message_send: _MessageSendT) -> None: - self.message_ids = message_ids + def __init__(self, message_id: str, message_send: _MessageSendT) -> None: + self.message_id: str | None = message_id self._message_send = message_send async def __call__(self, event: AMGISendEvent) -> None: if event["type"] == "message.ack": - self.message_ids.discard(event["id"]) + self.message_id = None if event["type"] == "message.send": await self._message_send(event) @@ -337,20 +322,35 @@ async def _call_source_batch( message_send: _MessageSendT, ) -> Iterable[str]: event_source_arn_match = EVENT_SOURCE_ARN_PATTERN.match(event_source_arn) - message_ids = {record["messageId"] for record in records} if event_source_arn_match is None: - return message_ids + return {record["messageId"] for record in records} + + failures = await asyncio.gather( + *( + self._call_record(event_source_arn_match["queue"], record, message_send) + for record in records + ) + ) + return {failure for failure in failures if failure is not None} + + async def _call_record( + self, queue: str, record: _Record, message_send: _MessageSendT + ) -> str | None: + encoded_headers = list( + _encode_message_attributes(record.get("messageAttributes", {})) + ) scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, - "address": event_source_arn_match["queue"], + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": queue, + "headers": encoded_headers, + "payload": record["body"].encode(), "state": self._state.copy(), - "extensions": {"message.ack.out_of_order": {}}, } - records_send = _Send(message_ids, message_send) - await self._app(scope, _Receive(records), records_send) - return records_send.message_ids + records_send = _Send(record["messageId"], message_send) + await self._app(scope, _receive, records_send) + return records_send.message_id def _sigterm_handler(self) -> None: self._loop.run_until_complete(self._shutdown()) diff --git a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler.py b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler.py index 4a2f562..c808de3 100644 --- a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler.py +++ b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler.py @@ -87,23 +87,7 @@ async def test_sqs_handler_records( "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2", - }, - { - "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", - "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", - "body": "Test message.", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082650636", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082650649", - }, - "messageAttributes": {}, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", - "awsRegion": "us-east-2", - }, + } ] }, ) @@ -111,36 +95,15 @@ async def test_sqs_handler_records( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "my-queue", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { - "type": "message.receive", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", "headers": [(b"myAttribute", b"myValue")], "payload": b"Test message.", - "more_messages": True, - } - await send( - { - "type": "message.ack", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", - } - ) - assert await receive() == { - "type": "message.receive", - "id": "2e1424d4-f796-459a-8184-9c92662be6da", - "headers": [], - "payload": b"Test message.", - "more_messages": False, + "state": {}, } await send( { "type": "message.ack", - "id": "2e1424d4-f796-459a-8184-9c92662be6da", } ) @@ -185,23 +148,16 @@ async def test_sqs_handler_record_nack( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "my-queue", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { - "type": "message.receive", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", "headers": [(b"myAttribute", b"myValue")], "payload": b"Test message.", - "more_messages": False, + "state": {}, } + await send( { "type": "message.nack", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", "message": "failed to process", } ) @@ -251,18 +207,11 @@ async def test_sqs_handler_record_unacked( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "my-queue", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { - "type": "message.receive", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", "headers": [(b"myAttribute", b"myValue")], "payload": b"Test message.", - "more_messages": False, + "state": {}, } batch_item_failures = await call_task @@ -309,23 +258,16 @@ async def test_sqs_handler_record_message_attribute_binary_value( async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "my-queue", - "state": {}, - "extensions": {"message.ack.out_of_order": {}}, - } - - assert await receive() == { - "type": "message.receive", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", "headers": [(b"myAttribute", b"myValue")], "payload": b"Test message.", - "more_messages": False, + "state": {}, } + await send( { "type": "message.ack", - "id": "059f36b4-87a3-44ab-83d2-661975830a7d", } ) @@ -425,10 +367,11 @@ async def test_lifespan() -> None: async with app.call() as (scope, receive, send): assert scope == { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "my-queue", + "headers": [(b"myAttribute", b"myValue")], + "payload": b"Test message.", "state": {"item": state_item}, - "extensions": {"message.ack.out_of_order": {}}, } await call_task @@ -519,3 +462,44 @@ def test_sqs_handler_attribute_is_deprecated() -> None: def test_unknown_attribute_raises_attribute_error() -> None: with pytest.raises(AttributeError): amgi_sqs_event_source_mapping.unknown + + +async def test_sqs_handler_records_receive_not_callable( + app: MockApp, sqs_event_source_mapping_handler: SqsEventSourceMappingHandler +) -> None: + call_task = asyncio.get_running_loop().create_task( + sqs_event_source_mapping_handler._call( + { + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + }, + "messageAttributes": { + "myAttribute": { + "stringValue": "myValue", + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2", + } + ] + }, + ) + ) + async with app.call() as (scope, receive, send): + with pytest.raises(RuntimeError, match="Receive should not be called"): + await receive() + + await call_task diff --git a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py index 0ef12ab..6401b5c 100644 --- a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py +++ b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py @@ -1,5 +1,5 @@ import asyncio -from collections.abc import AsyncGenerator +from typing import Generator from uuid import uuid4 import pytest @@ -11,7 +11,7 @@ @pytest.fixture(scope="module") -async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: +def localstack_container() -> Generator[LocalStackContainer, None, None]: with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services( "sqs" ) as localstack_container: diff --git a/packages/amgi-types/src/amgi_types/__init__.py b/packages/amgi-types/src/amgi_types/__init__.py index c052303..6f1e30e 100644 --- a/packages/amgi-types/src/amgi_types/__init__.py +++ b/packages/amgi-types/src/amgi_types/__init__.py @@ -20,12 +20,19 @@ class AMGIVersions(TypedDict): """ spec_version: str - version: Literal["1.0"] + version: Literal["2.0"] class MessageScope(TypedDict): """ :var address: The address of the batch of messages, for example, in Kafka this would be the topic + :var headers: Includes the headers of the message + :var payload: + Payload of the message, which can be :py:obj:`None` or :py:obj:`bytes`. If missing, it defaults to + :py:obj:`None` + :var bindings: + Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key: + ``{"kafka": {"key": b"key"}}`` :var state: A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server does not support this feature. @@ -37,6 +44,9 @@ class MessageScope(TypedDict): type: Literal["message"] amgi: AMGIVersions address: str + headers: Sequence[tuple[bytes, bytes]] + payload: NotRequired[bytes | None] + bindings: NotRequired[dict[str, dict[str, Any]]] state: NotRequired[dict[str, Any]] extensions: NotRequired[dict[str, dict[str, Any]]] @@ -79,46 +89,12 @@ class LifespanShutdownFailedEvent(TypedDict): message: str -class MessageReceiveEvent(TypedDict): - """ - :var id: A unique id for the message, used to ack, or nack the message - :var headers: Includes the headers of the message - :var payload: - Payload of the message, which can be :py:obj:`None` or :py:obj:`bytes`. If missing, it defaults to - :py:obj:`None` - :var bindings: - Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key: - ``{"kafka": {"key": b"key"}}`` - :var more_messages: - Indicates there are more messages to process in the batch. The application should keep receiving until it - receives :py:obj:`False`. If missing it defaults to :py:obj:`False` - """ - - type: Literal["message.receive"] - id: str - headers: Sequence[tuple[bytes, bytes]] - payload: NotRequired[bytes | None] - bindings: NotRequired[dict[str, dict[str, Any]]] - more_messages: NotRequired[bool] - - class MessageAckEvent(TypedDict): - """ - :var id: The unique id of the message - """ - type: Literal["message.ack"] - id: str class MessageNackEvent(TypedDict): - """ - :var id: The unique id of the message - :var message: A message indicating why the message could not be processed - """ - type: Literal["message.nack"] - id: str message: str @@ -143,9 +119,7 @@ class MessageSendEvent(TypedDict): Scope = Union[MessageScope, LifespanScope] -AMGIReceiveEvent = Union[ - LifespanStartupEvent, LifespanShutdownEvent, MessageReceiveEvent -] +AMGIReceiveEvent = Union[LifespanStartupEvent, LifespanShutdownEvent] AMGISendEvent = Union[ LifespanStartupCompleteEvent, LifespanStartupFailedEvent, diff --git a/packages/asyncfast/src/asyncfast/_asyncfast.py b/packages/asyncfast/src/asyncfast/_asyncfast.py index 92dac75..e19beb9 100644 --- a/packages/asyncfast/src/asyncfast/_asyncfast.py +++ b/packages/asyncfast/src/asyncfast/_asyncfast.py @@ -1,5 +1,3 @@ -import asyncio -from collections.abc import AsyncGenerator from collections.abc import Callable from collections.abc import Generator from collections.abc import Iterable @@ -17,7 +15,6 @@ from amgi_types import LifespanStartupCompleteEvent from amgi_types import MessageAckEvent from amgi_types import MessageNackEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from amgi_types import Scope from asyncfast._asyncapi import ChannelDefinition @@ -110,7 +107,7 @@ async def __call__( for channel in self._channels: parameters = channel.match(address) if parameters is not None: - await channel(scope, receive, send, parameters) + await channel(scope, send, parameters) return raise ChannelNotFoundError(address) @@ -172,17 +169,6 @@ def _generate_inputs( ).core_schema -async def _receive_messages( - receive: AMGIReceiveCallable, -) -> AsyncGenerator[MessageReceiveEvent, None]: - more_messages = True - while more_messages: - message = await receive() - assert message["type"] == "message.receive" - yield message - more_messages = message.get("more_messages", False) - - class _Channel: def __init__( self, @@ -208,43 +194,19 @@ def match(self, address: str) -> dict[str, str] | None: async def __call__( self, scope: MessageScope, - receive: AMGIReceiveCallable, send: AMGISendCallable, parameters: dict[str, str], - ) -> None: - ack_out_of_order = "message.ack.out_of_order" in scope.get("extensions", {}) - if ack_out_of_order: - await asyncio.gather( - *[ - self._handle_message(message, parameters, send) - async for message in _receive_messages(receive) - ] - ) - else: - async for message in _receive_messages(receive): - await self._handle_message(message, parameters, send) - - async def _handle_message( - self, - message: MessageReceiveEvent, - parameters: dict[str, str], - send: AMGISendCallable, ) -> None: try: - - await self._channel_invoker.invoke( - MessageReceive(message, parameters), send - ) + await self._channel_invoker.invoke(MessageReceive(scope, parameters), send) message_ack_event: MessageAckEvent = { "type": "message.ack", - "id": message["id"], } await send(message_ack_event) except Exception as e: message_nack_event: MessageNackEvent = { "type": "message.nack", - "id": message["id"], "message": str(e), } await send(message_nack_event) diff --git a/packages/asyncfast/src/asyncfast/_channel.py b/packages/asyncfast/src/asyncfast/_channel.py index cd12726..f81e3ba 100644 --- a/packages/asyncfast/src/asyncfast/_channel.py +++ b/packages/asyncfast/src/asyncfast/_channel.py @@ -27,7 +27,7 @@ from typing import TypeVar from amgi_types import AMGISendCallable -from amgi_types import MessageReceiveEvent +from amgi_types import MessageScope from amgi_types import MessageSendEvent from asyncfast.bindings import Binding from pydantic import TypeAdapter @@ -113,7 +113,7 @@ class Depends: @dataclass(frozen=True) class MessageReceive: - message: MessageReceiveEvent + message: MessageScope address_parameters: dict[str, str] @cached_property diff --git a/packages/asyncfast/tests_asyncfast/test_benchmarks.py b/packages/asyncfast/tests_asyncfast/test_benchmarks.py index f7baaf8..7569030 100644 --- a/packages/asyncfast/tests_asyncfast/test_benchmarks.py +++ b/packages/asyncfast/tests_asyncfast/test_benchmarks.py @@ -10,8 +10,8 @@ import pytest from amgi_types import AMGIApplication +from amgi_types import AMGIReceiveEvent from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from asyncfast import AsyncFast from asyncfast import Header @@ -23,7 +23,7 @@ from pytest_benchmark.fixture import BenchmarkFixture AppBenchmark = Callable[ - [AMGIApplication, MessageScope, MessageReceiveEvent], + [AMGIApplication, MessageScope], None, ] @@ -36,12 +36,11 @@ async def _send(message: AMGISendEvent) -> None: def _app_benchmark( app: AMGIApplication, message_scope: MessageScope, - message_receive_event: MessageReceiveEvent, ) -> None: loop = asyncio.new_event_loop() - async def _receive() -> MessageReceiveEvent: - return message_receive_event + async def _receive() -> AMGIReceiveEvent: + raise RuntimeError("Receive should not be called") # pragma: no cover benchmark( lambda: loop.run_until_complete( @@ -70,12 +69,8 @@ async def topic_handler(payload: MessagePayload) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', }, @@ -96,12 +91,8 @@ async def topic_handler(payload: MessagePayload | None) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -121,12 +112,8 @@ def topic_handler(payload: MessagePayload) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', }, @@ -144,12 +131,8 @@ async def topic_handler(etag: Annotated[str, Header(alias="ETag")]) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [(b"ETag", b"33a64df551425fcc55e4d42a148795d9f25f89d4")], }, ) @@ -166,12 +149,8 @@ async def topic_handler(id: Annotated[int, Header()]) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [(b"id", b"10")], }, ) @@ -190,12 +169,8 @@ async def topic_handler( app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [(b"idempotency-key", b"8e03978e-40d5-43e8-bc93-6894a57f9324")], }, ) @@ -215,12 +190,8 @@ async def topic_handler( app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [ (b"id", b"10"), (b"etag", b"33a64df551425fcc55e4d42a148795d9f25f89d4"), @@ -246,12 +217,8 @@ async def topic_handler(id: Annotated[Optional[str], Header()] = None) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": headers, }, ) @@ -279,12 +246,8 @@ async def topic_handler( app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": headers, }, ) @@ -305,12 +268,8 @@ async def topic_handler() -> AsyncGenerator[dict[str, Any], None]: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -331,12 +290,8 @@ async def topic_handler(payload: MessagePayload) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', }, @@ -354,12 +309,8 @@ async def topic_handler(payload: Annotated[int, Payload()]) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b"123", }, @@ -377,12 +328,8 @@ async def order_handler(user_id: str) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "order.1234", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -404,12 +351,8 @@ async def topic_handler() -> AsyncGenerator[SendMessage, None]: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -434,12 +377,8 @@ async def topic_handler() -> AsyncGenerator[SendMessage, None]: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -456,12 +395,8 @@ async def topic_handler() -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) @@ -478,12 +413,8 @@ async def topic_handler(key: Annotated[int, KafkaKey()]) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "bindings": {"kafka": {"key": b"1234"}}, }, @@ -504,12 +435,8 @@ async def topic_handler(key: Annotated[Optional[int], KafkaKey()] = None) -> Non app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], "bindings": bindings, }, @@ -532,12 +459,8 @@ async def topic_handler(message_sender: MessageSender[SendMessage]) -> None: app, { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - }, - { - "type": "message.receive", - "id": "id-1", "headers": [], }, ) diff --git a/packages/asyncfast/tests_asyncfast/test_channel.py b/packages/asyncfast/tests_asyncfast/test_channel.py index 68b629c..8f2d1c2 100644 --- a/packages/asyncfast/tests_asyncfast/test_channel.py +++ b/packages/asyncfast/tests_asyncfast/test_channel.py @@ -24,8 +24,9 @@ def func(i: int) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "payload": b"1", }, @@ -46,8 +47,9 @@ def func(header: Annotated[str, Header()]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header", b"value")], }, {}, @@ -67,8 +69,9 @@ def func(header: Annotated[str, Header()] = "value") -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], }, {}, @@ -88,8 +91,9 @@ def func(header_name: Annotated[str, Header()]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header-name", b"value")], }, {}, @@ -109,8 +113,9 @@ def func(etag: Annotated[str, Header(alias="ETag")]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"ETag", b"9e30981e-02d5-11f1-9648-e323315723e1")], }, {}, @@ -130,8 +135,9 @@ def func(user: str) -> None: await channel(func, {"user"}).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel.54a08cc6-02db-11f1-afbf-f3f4688d5de4", "headers": [], }, {"user": "54a08cc6-02db-11f1-afbf-f3f4688d5de4"}, @@ -151,8 +157,9 @@ def func(key: Annotated[int, KafkaKey()]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "bindings": {"kafka": {"key": b"123"}}, }, @@ -173,8 +180,9 @@ def func(key: Annotated[int, KafkaKey()] = 123) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], }, {}, @@ -194,8 +202,9 @@ async def func(i: int) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "payload": b"1", }, @@ -220,8 +229,9 @@ async def func() -> AsyncGenerator[Mapping[str, Any], None]: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "payload": b"1", }, @@ -253,8 +263,9 @@ def func() -> Generator[Mapping[str, Any], None, None]: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "payload": b"1", }, @@ -288,8 +299,9 @@ async def func(message_sender: MessageSender[Mapping[str, Any]]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], "payload": b"1", }, @@ -325,8 +337,9 @@ def func(headers: Annotated[dict[str, int], Depends(dependency)]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header1", b"1"), (b"header2", b"2")], }, {}, @@ -354,8 +367,9 @@ def func(headers: Annotated[dict[str, int], Depends(dependency)]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header1", b"1"), (b"header2", b"2")], }, {}, @@ -382,8 +396,9 @@ def func( await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], }, {}, @@ -413,8 +428,9 @@ def func( await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [], }, {}, @@ -452,8 +468,9 @@ def func(headers: Annotated[dict[str, int], Depends(dependency)]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header1", b"1"), (b"header2", b"2")], }, {}, @@ -491,8 +508,9 @@ def func(headers: Annotated[dict[str, int], Depends(dependency)]) -> None: await channel(func, set()).invoke( MessageReceive( { - "type": "message.receive", - "id": "id", + "type": "message", + "amgi": {"version": "2.0", "spec_version": "2.0"}, + "address": "channel", "headers": [(b"header1", b"1"), (b"header2", b"2")], }, {}, diff --git a/packages/asyncfast/tests_asyncfast/test_lifespan.py b/packages/asyncfast/tests_asyncfast/test_lifespan.py index 32a51ed..472bb67 100644 --- a/packages/asyncfast/tests_asyncfast/test_lifespan.py +++ b/packages/asyncfast/tests_asyncfast/test_lifespan.py @@ -19,7 +19,7 @@ async def test_lifespan() -> None: lifespan_scope: LifespanScope = { "type": "lifespan", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, } await app( lifespan_scope, @@ -54,7 +54,7 @@ async def test_lifespan_context_manager() -> None: lifespan_scope: LifespanScope = { "type": "lifespan", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, } await app( lifespan_scope, diff --git a/packages/asyncfast/tests_asyncfast/test_message.py b/packages/asyncfast/tests_asyncfast/test_message.py index cd47207..a47ac3c 100644 --- a/packages/asyncfast/tests_asyncfast/test_message.py +++ b/packages/asyncfast/tests_asyncfast/test_message.py @@ -1,4 +1,3 @@ -from asyncio import Event from collections.abc import AsyncGenerator from collections.abc import Generator from dataclasses import dataclass @@ -14,7 +13,6 @@ import pytest from amgi_types import AMGISendEvent -from amgi_types import MessageReceiveEvent from amgi_types import MessageScope from asyncfast import AsyncFast from asyncfast import ChannelNotFoundError @@ -45,18 +43,14 @@ async def topic_handler(payload: MessagePayload) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -77,17 +71,13 @@ async def topic_handler(payload: MessagePayload | None) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -108,18 +98,14 @@ def topic_handler(payload: MessagePayload) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -137,17 +123,13 @@ async def topic_handler(etag: Annotated[str, Header(alias="ETag")]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [(b"ETag", b"33a64df551425fcc55e4d42a148795d9f25f89d4")], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -165,17 +147,13 @@ async def topic_handler(id: Annotated[int, Header()]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [(b"id", b"10")], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -193,17 +171,13 @@ async def topic_handler(idempotency_key: Annotated[UUID, Header()]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [(b"idempotency-key", b"8e03978e-40d5-43e8-bc93-6894a57f9324")], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -224,12 +198,8 @@ async def topic_handler( message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [ (b"id", b"10"), (b"etag", b"33a64df551425fcc55e4d42a148795d9f25f89d4"), @@ -237,7 +207,7 @@ async def topic_handler( } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -267,17 +237,13 @@ async def topic_handler(id: Annotated[Optional[str], Header()] = None) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": headers, } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -312,17 +278,13 @@ async def topic_handler( message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": headers, } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -344,17 +306,13 @@ async def topic_handler() -> AsyncGenerator[dict[str, Any], None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -387,18 +345,14 @@ async def topic_handler(payload: MessagePayload) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b'{"id":1}', } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -416,18 +370,14 @@ async def topic_handler(payload: Annotated[int, Payload()]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b"123", } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -445,17 +395,13 @@ async def order_handler(user_id: str) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "order.1234", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -478,17 +424,13 @@ async def topic_handler() -> AsyncGenerator[SendMessage, None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -525,17 +467,13 @@ async def topic_handler() -> AsyncGenerator[SendMessage, None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -564,23 +502,19 @@ async def topic_handler() -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } send_mock = AsyncMock() await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) test_mock.assert_called_once() - send_mock.assert_awaited_once_with({"type": "message.ack", "id": "id-1"}) + send_mock.assert_awaited_once_with({"type": "message.ack"}) async def test_message_nack() -> None: @@ -592,24 +526,18 @@ async def topic_handler() -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } send_mock = AsyncMock() await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) - send_mock.assert_awaited_once_with( - {"type": "message.nack", "id": "id-1", "message": "test"} - ) + send_mock.assert_awaited_once_with({"type": "message.nack", "message": "test"}) async def test_message_sending_dict_error() -> None: @@ -636,18 +564,14 @@ async def topic_handler() -> AsyncGenerator[dict[str, Any], None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } send_mock = AsyncMock(side_effect=send_mock) await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -680,18 +604,14 @@ async def topic_handler() -> AsyncGenerator[dict[str, Any], None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } send_mock = AsyncMock(side_effect=send_mock) await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -713,7 +633,7 @@ async def topic_handler() -> AsyncGenerator[dict[str, Any], None]: "payload": b"1", } ), - call({"type": "message.ack", "id": "id-1"}), + call({"type": "message.ack"}), ] ) @@ -742,18 +662,14 @@ def topic_handler() -> Generator[dict[str, Any], None, None]: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } send_mock = AsyncMock(side_effect=send_mock) await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -767,7 +683,7 @@ def topic_handler() -> Generator[dict[str, Any], None, None]: "payload": b"1", } ), - call({"type": "message.ack", "id": "id-1"}), + call({"type": "message.ack"}), ] ) after_exception_mock.assert_called_with(send_exception) @@ -782,24 +698,20 @@ async def topic_handler(id: int) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "payload": b"not_an_int", } send_mock = AsyncMock() await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) send_mock.assert_awaited_once_with( - {"type": "message.nack", "id": "id-1", "message": IsStrMatcher()} + {"type": "message.nack", "message": IsStrMatcher()} ) @@ -814,18 +726,14 @@ async def topic_handler(key: Annotated[int, KafkaKey()]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "bindings": {"kafka": {"key": b"1234"}}, } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -848,18 +756,14 @@ async def topic_handler(key: Annotated[Optional[int], KafkaKey()] = None) -> Non message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], "bindings": bindings, } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) @@ -882,17 +786,13 @@ async def topic_handler(message_sender: MessageSender[SendMessage]) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), send_mock, ) @@ -910,45 +810,6 @@ async def topic_handler(message_sender: MessageSender[SendMessage]) -> None: ) -async def test_message_ack_out_of_order() -> None: - app = AsyncFast() - - received = set() - block_event = Event() - - @app.channel("topic") - async def topic_handler(i: int) -> None: - received.add(i) - if received == {1, 2}: - block_event.set() - await block_event.wait() - - message_scope: MessageScope = { - "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, - "address": "topic", - "extensions": {"message.ack.out_of_order": {}}, - } - message_receive_event1: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", - "headers": [], - "payload": b"1", - "more_messages": True, - } - message_receive_event2: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-2", - "payload": b"2", - "headers": [], - } - await app( - message_scope, - AsyncMock(side_effect=[message_receive_event1, message_receive_event2]), - AsyncMock(), - ) - - async def test_message_non_existant_channel() -> None: app = AsyncFast() @@ -958,12 +819,8 @@ async def topic_handler(id: int) -> None: message_scope: MessageScope = { "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "address": "not_topic", - } - message_receive_event: MessageReceiveEvent = { - "type": "message.receive", - "id": "id-1", "headers": [], } with pytest.raises( @@ -971,6 +828,6 @@ async def topic_handler(id: int) -> None: ): await app( message_scope, - AsyncMock(side_effect=[message_receive_event]), + AsyncMock(), AsyncMock(), ) diff --git a/packages/test-utils/src/test_utils/__init__.py b/packages/test-utils/src/test_utils/__init__.py index bf66e99..b0ca0dd 100644 --- a/packages/test-utils/src/test_utils/__init__.py +++ b/packages/test-utils/src/test_utils/__init__.py @@ -48,7 +48,7 @@ async def lifespan( async with self.call() as (scope, receive, send): assert scope == { - "amgi": {"spec_version": "1.0", "version": "1.0"}, + "amgi": {"version": "2.0", "spec_version": "2.0"}, "type": "lifespan", "state": {}, }