Skip to content
217 changes: 185 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,52 +1,205 @@
# AMGI

[![Tests](https://github.com/asyncfast/amgi/actions/workflows/tests.yml/badge.svg)](https://github.com/asyncfast/amgi/actions/workflows/tests.yml) [![codecov](https://codecov.io/gh/asyncfast/amgi/graph/badge.svg?token=QXLUFJ0GOO)](https://codecov.io/gh/asyncfast/amgi)
[![Tests](https://github.com/asyncfast/amgi/actions/workflows/tests.yml/badge.svg)](https://github.com/asyncfast/amgi/actions/workflows/tests.yml)
[![codecov](https://codecov.io/gh/asyncfast/amgi/graph/badge.svg?token=QXLUFJ0GOO)](https://codecov.io/gh/asyncfast/amgi)

AMGI (*Asynchronous Messaging Gateway Interface*) is the spiritual sibling of
[ASGI](https://asgi.readthedocs.io/en/latest/). While the focus of [ASGI](https://asgi.readthedocs.io/en/latest/) is
HTTP, the focus of AMGI is event-based applications.
**AMGI** (*Asynchronous Messaging Gateway Interface*) is the message-driven sibling of
[ASGI](https://asgi.readthedocs.io/en/latest/).

This repository contains multiple AMGI implementations, as well as a typed microframework, AsyncFast, in the spirit of
[FastAPI](https://fastapi.tiangolo.com/) it attempts to make typed asynchronous APIs easy to develop.
Where ASGI standardises the interface between Python applications and HTTP servers, AMGI standardises the interface
between Python applications and message brokers.

## Core Aims
It enables you to build broker-agnostic, strongly-typed, message-driven applications that are portable across protocols
and compute environments without rewriting business logic.

Core aims of both AMGI and AsyncFast:
## Architecture

- **Portable**: Following [AMGI](https://amgi.readthedocs.io/en/latest/) should allow for implementations of any
protocol, applications should be able to run anywhere regardless of compute. Running in
[Lambda](https://aws.amazon.com/lambda/) should be no more difficult than running on
[EC2](https://aws.amazon.com/ec2/)
![AMGI Layered Architecture](layers.svg)

- **Standards-based**: Based on [AsyncAPI](https://www.asyncapi.com/), and [JSON Schema](https://json-schema.org/). The
framework should allow for easy documentation generation
AMGI defines a clean separation between:

- **Clean Implementation**: Each protocol should be implemented well; this means easy to use, and as optimal as possible
- Your application logic
- The framework layer (e.g. AsyncFast)
- The AMGI interface contract
- The underlying protocol implementation

## Documentation
Protocol implementations may include Kafka, SQS, MQTT, Redis, and others - but your application code remains unchanged.

## Why AMGI?

Modern systems are message-driven - but each broker has:

- Different client libraries
- Different runtime models
- Different infrastructure constraints
- Different deployment assumptions

AMGI introduces a minimal interface boundary so that:

- Switching brokers does not require rewriting your application
- Running in Lambda is no more complex than running on EC2
- Local development mirrors production semantics

If it runs on one protocol, it should run on another.

# AsyncFast - Typed Message APIs

This repository also includes **AsyncFast**, a typed microframework built on AMGI, inspired by
[FastAPI](https://fastapi.tiangolo.com/).

If FastAPI made HTTP APIs easy to build, AsyncFast aims to do the same for message-driven systems.

## Quick Example (AsyncFast)

```python
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
from pydantic import BaseModel

app = AsyncFast()


class UserCreated(BaseModel):
id: int
email: str


@app.channel("user.created")
async def handle_user_created(
payload: UserCreated,
correlation_id: Annotated[str, Header()],
) -> None:
print(f"User created: {payload.email} ({correlation_id})")
```

What you get automatically:

- Typed payload validation
- Typed headers
- Clean dependency injection
- AsyncAPI-aligned schema generation
- Broker portability

## Running on Kafka

```python
from amgi_aiokafka import Server

server = Server(
app, "my-topic", bootstrap_servers="localhost:9092", group_id="my-group"
)

server.run()
```

or

```commandline
asyncfast run amgi-aiokafka main:app my-topic --group-id my-group
```

## Running in AWS Lambda (SQS Trigger)

```python
from amgi_sqs_event_source_mapping import SqsEventSourceMappingHandler

handler = SqsEventSourceMappingHandler(app)
```

- For documentation on the [AMGI specification](https://amgi.readthedocs.io/en/latest/)
- For documentation on [AsyncFast](https://asyncfast.readthedocs.io/en/latest/)
The application code remains unchanged.

## Servers
# What Is AMGI?

At the moment there are base implementations for the following protocols, these are all working
[AMGI](https://amgi.readthedocs.io/en/latest/) servers:
At its core, AMGI defines a minimal, low-level callable interface, similar in spirit to ASGI:

**Kafka:** [amgi-aiokafka](https://pypi.org/project/amgi-aiokafka/) is a basic [Kafka](https://kafka.apache.org/) sever
implementation
```python
async def app(scope, receive, send):
message = await receive()

**MQTT:** [amgi-paho-mqtt](https://pypi.org/project/amgi-paho-mqtt/) is a basic [MQTT](https://mqtt.org/) sever
implementation
await send(
{
"type": "message.ack",
"id": message["id"],
}
)
```

AMGI provides:

- A standard application callable
- A structured message scope
- A receive mechanism
- A send mechanism

Frameworks like AsyncFast can be built on top of this interface.

# Core Aims

## Portable

Applications written against AMGI:

- Are independent of a specific broker
- Are independent of compute environment
- Can run in containers, VMs, or Lambda

Infrastructure changes should not rewrite business logic.

## Standards-Based

AMGI and AsyncFast are aligned with:

- [AsyncAPI](https://www.asyncapi.com/)
- [JSON Schema](https://json-schema.org/)
- Strong Python typing (Pydantic v2)

Schemas are first-class. Validation and documentation are built into the design.

## Clean Implementation

Each protocol implementation aims to be:

- Minimal
- Explicit
- Correct
- Efficient

This is engineered abstraction.

# Protocol Implementations

The following AMGI servers are currently available:

### Kafka

- **[amgi-aiokafka](https://pypi.org/project/amgi-aiokafka/)** - [Kafka](https://kafka.apache.org/) server
implementation
- **[amgi-kafka-event-source-mapping](https://pypi.org/project/amgi-kafka-event-source-mapping/)** -
[AWS Lambda](https://aws.amazon.com/lambda/) integration for Kafka

### MQTT

- **[amgi-paho-mqtt](https://pypi.org/project/amgi-paho-mqtt/)** - [MQTT](https://mqtt.org/) server implementation

### Redis

- **[amgi-redis](https://pypi.org/project/amgi-redis/)** - [Redis](https://redis.io/) server implementation

### SQS

- **[amgi-aiobotocore](https://pypi.org/project/amgi-aiobotocore/)** - Contains a [SQS](https://aws.amazon.com/sqs/)
server implementation
- **[amgi-sqs-event-source-mapping](https://pypi.org/project/amgi-sqs-event-source-mapping/)** -
[AWS Lambda](https://aws.amazon.com/lambda/) integration for SQS

## Documentation

**Redis:** [amgi-redis](https://pypi.org/project/amgi-redis/) is a basic [Redis](https://redis.io/) server
implementation, soon with support for [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/)
- AMGI Specification\
https://amgi.readthedocs.io/en/latest/

**SQS:** [amgi-aiobotocore](https://pypi.org/project/amgi-aiobotocore/) contains a basic
[SQS](https://aws.amazon.com/sqs/) sever implementation.
[amgi-sqs-event-source-mapping](https://pypi.org/project/amgi-sqs-event-source-mapping/) allows you to run an
application in [Lambda](https://aws.amazon.com/lambda/), with it translating the SQS Lambda event to the necessary
[AMGI](https://amgi.readthedocs.io/en/latest/) calls
- AsyncFast Documentation\
https://asyncfast.readthedocs.io/en/latest/

## Contact

Expand Down
61 changes: 61 additions & 0 deletions layers.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __eq__(self, other: Any) -> bool:

@pytest.fixture(scope="module")
async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]:
with LocalStackContainer(image="localstack/localstack:4.9.2").with_services(
with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services(
"sqs"
) as localstack_container:
yield localstack_container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@pytest.fixture(scope="module")
async def kafka_container() -> AsyncGenerator[KafkaContainer, None]:
with KafkaContainer() as kafka_container:
with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container:
yield kafka_container


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

@pytest.fixture(scope="module")
async def kafka_container() -> AsyncGenerator[KafkaContainer, None]:
with KafkaContainer() as kafka_container:
with KafkaContainer(image="ghcr.io/asyncfast/cp-kafka:7.6.0") as kafka_container:
yield kafka_container


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def topic() -> str:
@pytest.fixture(scope="module")
async def mosquitto_container() -> AsyncGenerator[MosquittoContainer, None]:
mosquitto_container = MosquittoContainer(
image="eclipse-mosquitto:2.0.22"
image="ghcr.io/asyncfast/eclipse-mosquitto:2.0.22"
).with_volume_mapping(
Path(__file__).parent / "mqtt.acl",
"/mosquitto/config/mqtt.acl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

@pytest.fixture(scope="module")
async def redis_container() -> AsyncGenerator[AsyncRedisContainer, None]:
with AsyncRedisContainer(image="redis:8.2.2") as redis_container:
with AsyncRedisContainer(image="ghcr.io/asyncfast/redis:8.2.2") as redis_container:
yield redis_container


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.fixture(scope="module")
async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]:
with LocalStackContainer(image="localstack/localstack:4.9.2").with_services(
with LocalStackContainer(image="ghcr.io/asyncfast/localstack:4.9.2").with_services(
"sqs"
) as localstack_container:
yield localstack_container
Expand Down