diff --git a/README.md b/README.md index ee52212..1499b51 100644 --- a/README.md +++ b/README.md @@ -1,52 +1,205 @@ # AMGI -[![Tests](https://github.com/asyncfast/amgi/actions/workflows/tests.yml/badge.svg)](https://github.com/asyncfast/amgi/actions/workflows/tests.yml) [![codecov](https://codecov.io/gh/asyncfast/amgi/graph/badge.svg?token=QXLUFJ0GOO)](https://codecov.io/gh/asyncfast/amgi) +[![Tests](https://github.com/asyncfast/amgi/actions/workflows/tests.yml/badge.svg)](https://github.com/asyncfast/amgi/actions/workflows/tests.yml) +[![codecov](https://codecov.io/gh/asyncfast/amgi/graph/badge.svg?token=QXLUFJ0GOO)](https://codecov.io/gh/asyncfast/amgi) -AMGI (*Asynchronous Messaging Gateway Interface*) is the spiritual sibling of -[ASGI](https://asgi.readthedocs.io/en/latest/). While the focus of [ASGI](https://asgi.readthedocs.io/en/latest/) is -HTTP, the focus of AMGI is event-based applications. +**AMGI** (*Asynchronous Messaging Gateway Interface*) is the message-driven sibling of +[ASGI](https://asgi.readthedocs.io/en/latest/). -This repository contains multiple AMGI implementations, as well as a typed microframework, AsyncFast, in the spirit of -[FastAPI](https://fastapi.tiangolo.com/) it attempts to make typed asynchronous APIs easy to develop. +Where ASGI standardises the interface between Python applications and HTTP servers, AMGI standardises the interface +between Python applications and message brokers. -## Core Aims +It enables you to build broker-agnostic, strongly-typed, message-driven applications that are portable across protocols +and compute environments without rewriting business logic. -Core aims of both AMGI and AsyncFast: +## Architecture -- **Portable**: Following [AMGI](https://amgi.readthedocs.io/en/latest/) should allow for implementations of any - protocol, applications should be able to run anywhere regardless of compute. Running in - [Lambda](https://aws.amazon.com/lambda/) should be no more difficult than running on - [EC2](https://aws.amazon.com/ec2/) +![AMGI Layered Architecture](layers.svg) -- **Standards-based**: Based on [AsyncAPI](https://www.asyncapi.com/), and [JSON Schema](https://json-schema.org/). The - framework should allow for easy documentation generation +AMGI defines a clean separation between: -- **Clean Implementation**: Each protocol should be implemented well; this means easy to use, and as optimal as possible +- Your application logic +- The framework layer (e.g. AsyncFast) +- The AMGI interface contract +- The underlying protocol implementation -## Documentation +Protocol implementations may include Kafka, SQS, MQTT, Redis, and others - but your application code remains unchanged. + +## Why AMGI? + +Modern systems are message-driven - but each broker has: + +- Different client libraries +- Different runtime models +- Different infrastructure constraints +- Different deployment assumptions + +AMGI introduces a minimal interface boundary so that: + +- Switching brokers does not require rewriting your application +- Running in Lambda is no more complex than running on EC2 +- Local development mirrors production semantics + +If it runs on one protocol, it should run on another. + +# AsyncFast - Typed Message APIs + +This repository also includes **AsyncFast**, a typed microframework built on AMGI, inspired by +[FastAPI](https://fastapi.tiangolo.com/). + +If FastAPI made HTTP APIs easy to build, AsyncFast aims to do the same for message-driven systems. + +## Quick Example (AsyncFast) + +```python +from typing import Annotated +from asyncfast import AsyncFast +from asyncfast import Header +from pydantic import BaseModel + +app = AsyncFast() + + +class UserCreated(BaseModel): + id: int + email: str + + +@app.channel("user.created") +async def handle_user_created( + payload: UserCreated, + correlation_id: Annotated[str, Header()], +) -> None: + print(f"User created: {payload.email} ({correlation_id})") +``` + +What you get automatically: + +- Typed payload validation +- Typed headers +- Clean dependency injection +- AsyncAPI-aligned schema generation +- Broker portability + +## Running on Kafka + +```python +from amgi_aiokafka import Server + +server = Server( + app, "my-topic", bootstrap_servers="localhost:9092", group_id="my-group" +) + +server.run() +``` + +or + +```commandline +asyncfast run amgi-aiokafka main:app my-topic --group-id my-group +``` + +## Running in AWS Lambda (SQS Trigger) + +```python +from amgi_sqs_event_source_mapping import SqsEventSourceMappingHandler + +handler = SqsEventSourceMappingHandler(app) +``` -- For documentation on the [AMGI specification](https://amgi.readthedocs.io/en/latest/) -- For documentation on [AsyncFast](https://asyncfast.readthedocs.io/en/latest/) +The application code remains unchanged. -## Servers +# What Is AMGI? -At the moment there are base implementations for the following protocols, these are all working -[AMGI](https://amgi.readthedocs.io/en/latest/) servers: +At its core, AMGI defines a minimal, low-level callable interface, similar in spirit to ASGI: -**Kafka:** [amgi-aiokafka](https://pypi.org/project/amgi-aiokafka/) is a basic [Kafka](https://kafka.apache.org/) sever -implementation +```python +async def app(scope, receive, send): + message = await receive() -**MQTT:** [amgi-paho-mqtt](https://pypi.org/project/amgi-paho-mqtt/) is a basic [MQTT](https://mqtt.org/) sever -implementation + await send( + { + "type": "message.ack", + "id": message["id"], + } + ) +``` + +AMGI provides: + +- A standard application callable +- A structured message scope +- A receive mechanism +- A send mechanism + +Frameworks like AsyncFast can be built on top of this interface. + +# Core Aims + +## Portable + +Applications written against AMGI: + +- Are independent of a specific broker +- Are independent of compute environment +- Can run in containers, VMs, or Lambda + +Infrastructure changes should not rewrite business logic. + +## Standards-Based + +AMGI and AsyncFast are aligned with: + +- [AsyncAPI](https://www.asyncapi.com/) +- [JSON Schema](https://json-schema.org/) +- Strong Python typing (Pydantic v2) + +Schemas are first-class. Validation and documentation are built into the design. + +## Clean Implementation + +Each protocol implementation aims to be: + +- Minimal +- Explicit +- Correct +- Efficient + +This is engineered abstraction. + +# Protocol Implementations + +The following AMGI servers are currently available: + +### Kafka + +- **[amgi-aiokafka](https://pypi.org/project/amgi-aiokafka/)** - [Kafka](https://kafka.apache.org/) server + implementation +- **[amgi-kafka-event-source-mapping](https://pypi.org/project/amgi-kafka-event-source-mapping/)** - + [AWS Lambda](https://aws.amazon.com/lambda/) integration for Kafka + +### MQTT + +- **[amgi-paho-mqtt](https://pypi.org/project/amgi-paho-mqtt/)** - [MQTT](https://mqtt.org/) server implementation + +### Redis + +- **[amgi-redis](https://pypi.org/project/amgi-redis/)** - [Redis](https://redis.io/) server implementation + +### SQS + +- **[amgi-aiobotocore](https://pypi.org/project/amgi-aiobotocore/)** - Contains a [SQS](https://aws.amazon.com/sqs/) + server implementation +- **[amgi-sqs-event-source-mapping](https://pypi.org/project/amgi-sqs-event-source-mapping/)** - + [AWS Lambda](https://aws.amazon.com/lambda/) integration for SQS + +## Documentation -**Redis:** [amgi-redis](https://pypi.org/project/amgi-redis/) is a basic [Redis](https://redis.io/) server -implementation, soon with support for [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/) +- AMGI Specification\ + https://amgi.readthedocs.io/en/latest/ -**SQS:** [amgi-aiobotocore](https://pypi.org/project/amgi-aiobotocore/) contains a basic -[SQS](https://aws.amazon.com/sqs/) sever implementation. -[amgi-sqs-event-source-mapping](https://pypi.org/project/amgi-sqs-event-source-mapping/) allows you to run an -application in [Lambda](https://aws.amazon.com/lambda/), with it translating the SQS Lambda event to the necessary -[AMGI](https://amgi.readthedocs.io/en/latest/) calls +- AsyncFast Documentation\ + https://asyncfast.readthedocs.io/en/latest/ ## Contact diff --git a/layers.svg b/layers.svg new file mode 100644 index 0000000..7d8cd60 --- /dev/null +++ b/layers.svg @@ -0,0 +1,61 @@ +ApplicationAsyncFastAMGISpecProtocols \ No newline at end of file diff --git a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py index fea521d..8123179 100644 --- a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py +++ b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py @@ -22,7 +22,7 @@ def __eq__(self, other: Any) -> bool: @pytest.fixture(scope="module") async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: - with LocalStackContainer(image="localstack/localstack:4.9.2").with_services( + with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services( "sqs" ) as localstack_container: yield localstack_container diff --git a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py index aa7e22f..dc147bc 100644 --- a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py +++ b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py @@ -18,7 +18,7 @@ @pytest.fixture(scope="module") async def kafka_container() -> AsyncGenerator[KafkaContainer, None]: - with KafkaContainer() as kafka_container: + with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container: yield kafka_container diff --git a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py index b751a31..f004a7f 100644 --- a/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py +++ b/packages/amgi-kafka-event-source-mapping/tests_amgi_kafka_event_source_mapping/test_kafka_event_source_mapping_handler_integration.py @@ -16,7 +16,7 @@ @pytest.fixture(scope="module") async def kafka_container() -> AsyncGenerator[KafkaContainer, None]: - with KafkaContainer() as kafka_container: + with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container: yield kafka_container diff --git a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py index dbac489..b9f6ca4 100644 --- a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py +++ b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py @@ -26,7 +26,7 @@ def topic() -> str: @pytest.fixture(scope="module") async def mosquitto_container() -> AsyncGenerator[MosquittoContainer, None]: mosquitto_container = MosquittoContainer( - image="eclipse-mosquitto:2.0.22" + image="ghcr.io/asyncfast/eclipse-mosquitto:2.0.22" ).with_volume_mapping( Path(__file__).parent / "mqtt.acl", "/mosquitto/config/mqtt.acl", diff --git a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py index 1c03b5e..bd6071a 100644 --- a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py +++ b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py @@ -14,7 +14,7 @@ @pytest.fixture(scope="module") async def redis_container() -> AsyncGenerator[AsyncRedisContainer, None]: - with AsyncRedisContainer(image="redis:8.2.2") as redis_container: + with AsyncRedisContainer(image="ghcr.io/asyncfast/redis:8.2.2") as redis_container: yield redis_container diff --git a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py index 32325f1..0ef12ab 100644 --- a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py +++ b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_event_source_mapping_handler_integration.py @@ -12,7 +12,7 @@ @pytest.fixture(scope="module") async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: - with LocalStackContainer(image="localstack/localstack:4.9.2").with_services( + with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services( "sqs" ) as localstack_container: yield localstack_container