Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,21 @@ At its core, AMGI defines a minimal, low-level callable interface, similar in sp

```python
async def app(scope, receive, send):
message = await receive()

await send(
{
"type": "message.ack",
"id": message["id"],
}
)
try:
# Do some message handling here!
await send(
{
"type": "message.ack",
}
)
except Exception as e:
await send({"type": "message.nack", "message": str(e)})
```

AMGI provides:

- A standard application callable
- A structured message scope
- A receive mechanism
- A send mechanism

Frameworks like AsyncFast can be built on top of this interface.
Expand Down
27 changes: 2 additions & 25 deletions docs/extensions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,5 @@
Extensions
############

The ASGI specification provides for server-specific extensions to be used outside of the core ASGI specification. This
document specifies some common extensions.

******************************
Acknowledgement Out Of Order
******************************

This is sent by the server to indicate that acknowledgements can be sent out of order.

.. code::

"scope": {
...
"extensions": {
"message.ack.out_of_order": {}
}
}

When this extension is present, the application MAY send ``message.ack`` and ``message.nack`` events for received
messages in any order. The server MUST accept out-of-order acknowledgements and MUST NOT treat them as a protocol error.

When this extension is absent, applications SHOULD assume that acknowledgement ordering is constrained by the server and
MAY be required to follow message delivery order.

This extension does not change the semantics of message delivery or batching, and only affects acknowledgement ordering.
The ASGI specification provides for server-specific extensions to be used outside of the core ASGI specification. There
are currently no extensions, but it is left here for future use.
16 changes: 16 additions & 0 deletions docs/specifications/lifespan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# mypy: ignore-errors


async def app(scope, receive, send):
if scope["type"] == "lifespan":
while True:
message = await receive()
if message["type"] == "lifespan.startup":
... # Do some startup here!
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
... # Do some shutdown here!
await send({"type": "lifespan.shutdown.complete"})
return
else:
pass # Handle other types
17 changes: 2 additions & 15 deletions docs/specifications/lifespan.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,9 @@ coordinate resource allocation, background tasks, and graceful shutdown.

A simple implementation would be:

.. code:: python

async def app(scope, receive, send):
if scope["type"] == "lifespan":
while True:
message = await receive()
if message["type"] == "lifespan.startup":
... # Do some startup here!
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
... # Do some shutdown here!
await send({"type": "lifespan.shutdown.complete"})
return
else:
pass # Handle other types

.. literalinclude:: lifespan.py
:lines: 4-

**********
Lifespan
Expand Down
24 changes: 24 additions & 0 deletions docs/specifications/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# mypy: ignore-errors


async def app(scope, receive, send):
if scope["type"] == "message":
try:
headers = scope["headers"]
payload = scope.get("payload")
bindings = scope.get("bindings", {})
... # Do some message handling here!
await send(
{
"type": "message.ack",
}
)
except Exception as e:
await send(
{
"type": "message.nack",
"message": str(e),
}
)
else:
pass # Handle other types
46 changes: 3 additions & 43 deletions docs/specifications/message.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,20 @@ agnosticism.

A simple implementation would be:

.. code:: python

async def app(scope, receive, send):
if scope["type"] == "message":
more_messages = True
while more_messages:
message = await receive()
message_id = message["id"]
try:
headers = message["headers"]
payload = message.get("payload")
bindings = message.get("bindings", {})
... # Do some message handling here!
await send(
{
"type": "message.ack",
"id": message_id,
}
)
except Exception as e:
await send(
{
"type": "message.nack",
"id": message_id,
"message": str(e),
}
)
more_messages = message.get("more_messages")
else:
pass # Handle other types

.. literalinclude:: message.py
:lines: 4-

*********
Message
*********

A message batch has a single message scope. Your application will be called once per batch. For protocols that do not
support batched consumption a batch of one message should be sent to the application.
A message has a single message scope. Your application will be called once per message.

The message scope information passed in scope contains:

.. typeddict:: amgi_types.MessageScope
:type: scope

********************************************
Receive message - :py:func:`receive` event
********************************************

Sent to the application to indicate an incoming message in the batch.

Keys:

.. typeddict:: amgi_types.MessageReceiveEvent

**********************************************
Response message ack - :py:func:`send` event
**********************************************
Expand Down
76 changes: 45 additions & 31 deletions packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import sys
from collections import deque
from collections.abc import Awaitable
from collections.abc import Callable
from collections.abc import Generator
Expand All @@ -17,8 +16,8 @@
from amgi_common import server_serve
from amgi_common import Stoppable
from amgi_types import AMGIApplication
from amgi_types import AMGIReceiveEvent
from amgi_types import AMGISendEvent
from amgi_types import MessageReceiveEvent
from amgi_types import MessageScope
from amgi_types import MessageSendEvent

Expand Down Expand Up @@ -82,22 +81,8 @@ def _encode_message_attributes(
yield name.encode(), encoded_value


class _Receive:
def __init__(self, messages: Iterable[Any]) -> None:
self._deque = deque(messages)

async def __call__(self) -> MessageReceiveEvent:
message = self._deque.popleft()
encoded_headers = list(
_encode_message_attributes(message.get("MessageAttributes", {}))
)
return {
"type": "message.receive",
"id": message["ReceiptHandle"],
"headers": encoded_headers,
"payload": message["Body"].encode(),
"more_messages": len(self._deque) != 0,
}
async def _receive() -> AMGIReceiveEvent:
raise RuntimeError("Receive should not be called")


async def _get_queue_url(client: Any, queue_name: str) -> str:
Expand Down Expand Up @@ -264,10 +249,12 @@ async def __aexit__(
class _Send:
def __init__(
self,
receipt_handle: str,
delete_batcher: _DeleteBatcher,
queue_url: str,
message_send: _MessageSendT,
) -> None:
self._receipt_handle = receipt_handle
self._queue_url = queue_url
self._delete_batcher = delete_batcher
self._message_send = message_send
Expand All @@ -276,7 +263,7 @@ async def __call__(self, event: AMGISendEvent) -> None:
if event["type"] == "message.ack":
await self._delete_batcher.delete_message(
self._queue_url,
event["id"],
self._receipt_handle,
)
if event["type"] == "message.send":
await self._message_send(event)
Expand Down Expand Up @@ -357,19 +344,46 @@ async def _queue_loop(
MessageAttributeNames=["All"],
):
messages = messages_response.get("Messages", ())
if messages:
scope: MessageScope = {
"type": "message",
"amgi": {"version": "1.0", "spec_version": "1.0"},
"address": queue_name,
"state": state.copy(),
"extensions": {"message.ack.out_of_order": {}},
}
await self._app(
scope,
_Receive(messages),
_Send(delete_batcher, queue_url, message_send),
await asyncio.gather(
*(
self._call_message(
message,
queue_url,
queue_name,
delete_batcher,
message_send,
state,
)
for message in messages
)
)

async def _call_message(
self,
message: Any,
queue_url: str,
queue_name: str,
delete_batcher: _DeleteBatcher,
message_send: _MessageSendT,
state: dict[str, Any],
) -> None:
encoded_headers = list(
_encode_message_attributes(message.get("MessageAttributes", {}))
)

scope: MessageScope = {
"type": "message",
"amgi": {"version": "2.0", "spec_version": "2.0"},
"address": queue_name,
"headers": encoded_headers,
"payload": message["Body"].encode(),
"state": state.copy(),
}
await self._app(
scope,
_receive,
_Send(message["ReceiptHandle"], delete_batcher, queue_url, message_send),
)

def stop(self) -> None:
self._stoppable.stop()
Loading