Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ furo>=2025.7.19
sphinx-copybutton>=0.5.2
redis>=7.0.1
packages/asyncfast
packages/amgi-types
packages/amgi-types
packages/amgi-aiobotocore
packages/amgi-aiokafka
69 changes: 69 additions & 0 deletions packages/asyncfast/docs/examples/message_send_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
from dataclasses import dataclass
from typing import Annotated

from amgi_aiobotocore.sqs import MessageSend as SQSMessageSend
from amgi_aiokafka import MessageSend as KafkaMessageSend
from amgi_aiokafka import run
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender
from asyncfast.message_send import MessageSendRouter

BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")

app = AsyncFast()


@dataclass
class Item:
sku_id: str
amount: int


@dataclass
class Order:
items: list[Item]
status: str


@dataclass
class EmailProcessingOrder(Message, address="order-email-processing-order"):
order_id: Annotated[str, Header()]
order: Order


@dataclass
class CancelShipping(Message, address="cancel-shipping"):
order_id: Annotated[str, Header()]


@app.channel("orders")
async def handle_order(
order: Order,
order_id: Annotated[str, Header()],
message_sender: MessageSender[EmailProcessingOrder | CancelShipping],
) -> None:
if order.status == "processing":
await message_sender.send(EmailProcessingOrder(order_id=order_id, order=order))

if order.status == "cancelled":
await message_sender.send(CancelShipping(order_id=order_id))


message_send_router = MessageSendRouter()

message_send_router.add_route(
"order-email-processing-order",
KafkaMessageSend(bootstrap_servers=BOOTSTRAP_SERVERS),
)
message_send_router.add_route("cancel-shipping", SQSMessageSend())

if __name__ == "__main__":
run(
app,
"orders",
bootstrap_servers=BOOTSTRAP_SERVERS,
message_send=message_send_router,
)
1 change: 1 addition & 0 deletions packages/asyncfast/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Taking ideas from:

receiving
sending
message_send_router
dependencies
lifespan
middleware
Expand Down
64 changes: 64 additions & 0 deletions packages/asyncfast/docs/message_send_router.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#####################
Message Send Router
#####################

``MessageSendRouter`` helps AMGI servers route ``message.send`` events to different backends based on the event address.
It manages setup and teardown of the underlying send callables using async context managers. This is useful when you
need to send different messages to different brokers, or you want to keep a single server running while routing
outbound traffic by address.

*************
Basic Usage
*************

The router is an async context manager. Create it, register routes, and then enter it when the server starts so the
message senders can establish connections:

.. async-fast-example:: examples/message_send_router.py

In the example above, one address is routed to Kafka, and another to SQS. The app itself stays the same; routing is
configured at the server boundary.

***********************
Integrating With Run
***********************

AMGI servers expect a ``send`` callable. ``MessageSendRouter`` provides that callable when you enter it, so pass the
router instance to the server and let it manage resource lifetimes:

.. code:: python

from amgi_aiokafka import run
from asyncfast.message_send import MessageSendRouter

message_send_router = MessageSendRouter()
# add routes...

run(
app,
"orders",
message_send=message_send_router,
)

When the server starts, it enters the router and uses the callable it yields. When the server shuts down, it exits the
router and closes all message senders cleanly.

*******************
Address Patterns
*******************

Routes use the same pattern syntax as channel parameters, so ``priority.{id}`` will match addresses like
``priority.123``. Register routes before entering the router so they are included in its setup.

If you need a catch-all pattern, register a default route instead of a broad pattern; this keeps route matching
explicit and easier to reason about.

****************
Default Route
****************

If you pass ``default=``, the router will use that send callable when no route matches. Without a default, you should
ensure every outgoing address has a route registered, otherwise the send will fail at runtime.

The default sender should be an async context manager just like the routed senders. This allows you to share connection
pools or client lifecycles with explicit cleanup on shutdown.
4 changes: 2 additions & 2 deletions packages/asyncfast/src/asyncfast/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from asyncfast._asyncfast import AsyncFast
from asyncfast._asyncfast import Middleware
from asyncfast._channel import ChannelNotFoundError
from asyncfast._channel import Depends
from asyncfast._channel import Header
from asyncfast._channel import InvalidChannelDefinitionError
from asyncfast._channel import MessageSender
from asyncfast._channel import Parameter
from asyncfast._channel import Payload
from asyncfast._message import Message
from asyncfast._utils import ChannelNotFoundError

__all__ = [
"AsyncFast",
"Middleware",
"ChannelNotFoundError",
"Depends",
"Header",
"InvalidChannelDefinitionError",
"MessageSender",
"Parameter",
"Payload",
"Message",
"ChannelNotFoundError",
]
4 changes: 2 additions & 2 deletions packages/asyncfast/src/asyncfast/_asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from asyncfast._channel import BindingResolver
from asyncfast._channel import CallableResolver
from asyncfast._channel import Channel
from asyncfast._channel import ChannelRouter
from asyncfast._channel import HeaderResolver
from asyncfast._channel import MessageSenderResolver
from asyncfast._channel import PayloadResolver
from asyncfast._channel import Resolver
from asyncfast._channel import Router
from asyncfast._message import Message
from pydantic import BaseModel
from pydantic import create_model
Expand Down Expand Up @@ -319,7 +319,7 @@ def get_asyncapi(
*,
title: str,
version: str,
router: Router,
router: ChannelRouter,
) -> dict[str, Any]:
channel_definitions = tuple(
ChannelDefinition(channel) for channel in router.channels
Expand Down
4 changes: 2 additions & 2 deletions packages/asyncfast/src/asyncfast/_asyncfast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from amgi_types import LifespanStartupCompleteEvent
from amgi_types import Scope
from asyncfast._asyncapi import get_asyncapi
from asyncfast._channel import Router
from asyncfast._channel import ChannelRouter
from asyncfast.middleware.errors import ServerErrorMiddleware

P = ParamSpec("P")
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(
self._version = version
self._lifespan_context = lifespan
self._middleware = list(middleware) if middleware else []
self._router = Router()
self._router = ChannelRouter()
self._lifespan: AbstractAsyncContextManager[None] | None = None
self._asyncapi_schema: dict[str, Any] | None = None
self._middleware_stack: AMGIApplication | None = None
Expand Down
57 changes: 13 additions & 44 deletions packages/asyncfast/src/asyncfast/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dataclasses import KW_ONLY
from functools import cached_property
from functools import wraps
from re import Pattern
from typing import Annotated
from typing import Any
from typing import Generic
Expand All @@ -33,7 +32,7 @@
from amgi_types import MessageScope
from amgi_types import MessageSendEvent
from asyncfast._utils import get_address_parameters
from asyncfast._utils import get_address_pattern
from asyncfast._utils import Router
from asyncfast.bindings import Binding
from pydantic import TypeAdapter
from pydantic.fields import FieldInfo
Expand Down Expand Up @@ -98,16 +97,6 @@ class InvalidChannelDefinitionError(ValueError):
"""


class RouteInvariantError(RuntimeError):
"""Raised when a selected route fails to match its address."""


class ChannelNotFoundError(LookupError):
def __init__(self, address: str) -> None:
super().__init__(f"Couldn't resolve address: {address}")
self.address = address


class Header(FieldInfo):
pass

Expand Down Expand Up @@ -379,32 +368,20 @@ def resolve(
@dataclass(frozen=True)
class Channel(CallableResolver, ABC):
address: str
address_pattern: Pattern[str]
parameters: set[str]

async def __call__(
self,
scope: MessageScope,
receive: AMGIReceiveCallable,
send: AMGISendCallable,
parameters: dict[str, str] | None = None,
parameters: dict[str, str],
) -> None:
parameters = self.match(scope["address"]) if parameters is None else parameters
if parameters is None:
raise RouteInvariantError(
f"Selected route did not match address {scope['address']!r}"
)
message_receive = MessageReceive(scope, parameters)
dependency_cache = DependencyCache(asyncio.get_event_loop())
async with AsyncExitStack() as async_exit_stack:
await self.call(message_receive, send, dependency_cache, async_exit_stack)

def match(self, address: str) -> dict[str, str] | None:
match = self.address_pattern.match(address)
if match:
return match.groupdict()
return None


@dataclass(frozen=True)
class SyncChannel(Channel):
Expand Down Expand Up @@ -574,7 +551,6 @@ def resolvers_dependencies(

def get_channel(func: Callable[..., Any], address: str) -> Channel:
address_parameters = get_address_parameters(address)
address_pattern = get_address_pattern(address)
resolvers, dependencies = resolvers_dependencies(func, address_parameters)

payloads = sum(isinstance(resolver, PayloadResolver) for _, resolver in resolvers)
Expand All @@ -590,36 +566,29 @@ def get_channel(func: Callable[..., Any], address: str) -> Channel:
)

if inspect.iscoroutinefunction(func):
return AsyncChannel(
func, resolvers, dependencies, address, address_pattern, address_parameters
)
return AsyncChannel(func, resolvers, dependencies, address, address_parameters)
if inspect.isasyncgenfunction(func):
return AsyncGeneratorChannel(
func, resolvers, dependencies, address, address_pattern, address_parameters
func, resolvers, dependencies, address, address_parameters
)
if inspect.isgeneratorfunction(func):
return SyncGeneratorChannel(
func, resolvers, dependencies, address, address_pattern, address_parameters
func, resolvers, dependencies, address, address_parameters
)
return SyncChannel(
func, resolvers, dependencies, address, address_pattern, address_parameters
)
return SyncChannel(func, resolvers, dependencies, address, address_parameters)


class Router:
def __init__(self) -> None:
self.channels: list[Channel] = []
class ChannelRouter(Router[Channel]):
@property
def channels(self) -> Sequence[Channel]:
return [channel for _, channel in self.routes]

def add_channel(self, address: str, func: Callable[..., Any]) -> None:
self.channels.append(get_channel(func, address))
self.add_route(address, get_channel(func, address))

async def __call__(
self, scope: MessageScope, receive: AMGIReceiveCallable, send: AMGISendCallable
) -> None:
address = scope["address"]
for channel in self.channels:
parameters = channel.match(address)
if parameters is not None:
await channel(scope, receive, send, parameters)
return
raise ChannelNotFoundError(address)
parameters, channel = self.get(address)
await channel(scope, receive, send, parameters)
Loading
Loading