Add amgi-aiopika-amqp support.#55
Conversation
jackburridge
left a comment
There was a problem hiding this comment.
One main change is the commit history, should be in the form of Conventional Commits, they will then be picked up on merge, triggering a bump, then release
| @pytest.fixture | ||
| def queue_name() -> str: | ||
| return f"receive-{uuid4()}" |
There was a problem hiding this comment.
| @pytest.fixture | |
| def queue_name() -> str: | |
| return f"receive-{uuid4()}" | |
| @pytest.fixture | |
| def queue_name(rabbitmq_container: RabbitMqContainer) -> str: | |
| with BlockingConnection(rabbitmq_container.get_connection_params()) as connection: | |
| with connection.channel() as channel: | |
| queue_name = f"receive-{uuid4()}" | |
| channel.queue_declare(queue_name, durable=True) | |
| return queue_name |
This should probably be declaring the queue (I've used pika here)
| lifespan_shutdown = await receive() | ||
| assert lifespan_shutdown == {"type": "lifespan.shutdown"} | ||
| await send(cast(Any, {"type": "lifespan.shutdown.complete"})) | ||
| serve_task.cancel() |
There was a problem hiding this comment.
You shouldn't need to cancel the serve task here, I've made a suggestion in the implementation that should allow for graceful shutdown
| @@ -0,0 +1,47 @@ | |||
| [build-system] | |||
There was a problem hiding this comment.
Build system should be uv_build
|
|
||
| [project] | ||
| name = "amgi-aiopika-amqp" | ||
| version = "0.21.0" |
There was a problem hiding this comment.
Versions should match the current version of the overall project, they will be bumped automatically
| ## Installation | ||
|
|
||
| ``` | ||
| pip install amgi-aiopika-amqp==0.21.0 |
There was a problem hiding this comment.
This should match the global project version, it will be bumped automatically
| await message.ack() | ||
| except Exception: | ||
| await message.nack(requeue=True) |
There was a problem hiding this comment.
I think there is a little confusion here based on the AMGI spec. The n/ack should be sent via the application, the _MessageSend class should handle an n/ack
There was a problem hiding this comment.
NATS has something very similar https://github.com/asyncfast/amgi/pull/58/files#diff-e1977146c23e034f9016f9c7c850ac8588911997af82e1583465bfa9e5dd703cR32-R54
| self._connection: Optional[AbstractRobustConnection] = None | ||
| self._channel: Optional[AbstractRobustChannel] = None |
There was a problem hiding this comment.
This probably doesn't need to be properties of the class, they can be instantiated in serve() and be passed around
| async with queue.iterator() as queue_iter: | ||
| async for message in queue_iter: | ||
| if self._stop_event.is_set(): | ||
| break |
There was a problem hiding this comment.
| async with queue.iterator() as queue_iter: | |
| async for message in queue_iter: | |
| if self._stop_event.is_set(): | |
| break | |
| async with queue.iterator() as queue_iterator: | |
| queue_aiter = aiter(queue_iterator) | |
| async for message in self._stoppable.call(anext, queue_aiter): |
I think Stoppable can be used here. I believe the coroutine waiting for the next message, and this is why the serve task is having to be cancelled in the tests. Stoppable should allow you to handle this cleanly
There was a problem hiding this comment.
| @@ -0,0 +1,9 @@ | |||
| services: | |||
There was a problem hiding this comment.
its possible to load definitions on rabbitmq start using the load_definitions option, this can fully setup the queues so they do not need to be declared elsewhere
| requires = [ "hatchling" ] | ||
|
|
||
| [project] | ||
| name = "amgi-aiopika-amqp" |
There was a problem hiding this comment.
| name = "amgi-aiopika-amqp" | |
| name = "amgi-aio-pika" |
The other packages are named after the core dependency. Not to be overly pedantic but once a package is published we probably want to stick with the name (Though I have changed them later on... so who really cares)
| url: str = "amqp://guest:guest@localhost/", | ||
| durable: bool = True, | ||
| ) -> None: | ||
| run(app, queues[0], url, durable) |
There was a problem hiding this comment.
it should be possible to support multiple queues by simultaneously running multiple queue loops, look at the aiobotocore SQS implementation
Add amgi-aiopika-amqp support.
Closes #32.