diff --git a/.gitignore b/.gitignore index 1a615ca..e00f533 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,5 @@ _build # coverage .coverage +.coverage.* coverage.xml \ No newline at end of file diff --git a/.python-versions b/.python-versions index 312dcdd..f1d9c78 100644 --- a/.python-versions +++ b/.python-versions @@ -2,3 +2,4 @@ 3.11 3.12 3.13 +3.14 diff --git a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py index dcc390a..364d823 100644 --- a/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py +++ b/packages/amgi-aiokafka/src/amgi_aiokafka/__init__.py @@ -1,3 +1,4 @@ +import asyncio import logging from asyncio import Lock from collections import deque @@ -24,7 +25,7 @@ def run( app: AMGIApplication, - *topics: Iterable[str], + *topics: str, bootstrap_servers: str | list[str] = "localhost", group_id: str | None = None, ) -> None: @@ -96,7 +97,7 @@ class Server: def __init__( self, app: AMGIApplication, - *topics: Iterable[str], + *topics: str, bootstrap_servers: str | list[str], group_id: str | None, ) -> None: @@ -127,35 +128,44 @@ async def _main_loop(self, state: dict[str, Any]) -> None: async for messages in self._stoppable.call( self._consumer.getmany, timeout_ms=1000 ): - for topic_partition, records in messages.items(): - if records: - scope: MessageScope = { - "type": "message", - "amgi": {"version": "1.0", "spec_version": "1.0"}, - "address": topic_partition.topic, - "state": state.copy(), - } - - message_receive_ids = { - f"{record.topic}:{record.partition}:{record.offset}": { - TopicPartition( - record.topic, record.partition - ): record.offset - + 1 - } - for record in records - } - - await self._app( - scope, - _Receive(records), - _Send( - self._consumer, - message_receive_ids, - self._message_send, - self._ackable_consumer, - ), - ) + await asyncio.gather( + *[ + self._handle_partition_records(topic_partition, records, state) + for topic_partition, records in messages.items() + ] + ) + + async def _handle_partition_records( + self, + topic_partition: TopicPartition, + records: list[ConsumerRecord], + state: dict[str, Any], + ) -> None: + if records: + scope: MessageScope = { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": topic_partition.topic, + "state": state.copy(), + } + + message_receive_ids = { + f"{record.topic}:{record.partition}:{record.offset}": { + TopicPartition(record.topic, record.partition): record.offset + 1 + } + for record in records + } + + await self._app( + scope, + _Receive(records), + _Send( + self._consumer, + message_receive_ids, + self._message_send, + self._ackable_consumer, + ), + ) async def _get_producer(self) -> AIOKafkaProducer: async with self._producer_lock: diff --git a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py index c73a8e3..e4fc146 100644 --- a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py +++ b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py @@ -221,7 +221,7 @@ def __init__( self._endpoint_url = endpoint_url self._aws_access_key_id = aws_access_key_id self._aws_secret_access_key = aws_secret_access_key - self._loop = asyncio.get_event_loop() + self._loop = asyncio.new_event_loop() self._lifespan = lifespan self._lifespan_context: Lifespan | None = None self._state: dict[str, Any] = {} diff --git a/packages/test-utils/src/test_utils/__init__.py b/packages/test-utils/src/test_utils/__init__.py index 963571f..bf66e99 100644 --- a/packages/test-utils/src/test_utils/__init__.py +++ b/packages/test-utils/src/test_utils/__init__.py @@ -1,9 +1,10 @@ import asyncio -import multiprocessing +import multiprocessing.synchronize from asyncio import Event from asyncio import Queue from collections.abc import AsyncGenerator from contextlib import asynccontextmanager +from functools import partial from typing import Any from typing import Callable from typing import Optional @@ -74,21 +75,25 @@ async def __call__( await return_event.wait() +async def _app( + scope: Scope, + receive: AMGIReceiveCallable, + send: AMGISendCallable, + lifespan_event: multiprocessing.synchronize.Event, +) -> None: + if scope["type"] == "lifespan": + lifespan_event.set() + raise Exception + + def assert_run_can_terminate( run: Callable[..., None], *args: Any, **kwargs: Any ) -> None: lifespan_event = multiprocessing.Event() - async def _app( - scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable - ) -> None: - if scope["type"] == "lifespan": - lifespan_event.set() - raise Exception - process = multiprocessing.Process( target=run, - args=(_app, *args), + args=(partial(_app, lifespan_event=lifespan_event), *args), kwargs=kwargs, ) process.start() diff --git a/pyproject.toml b/pyproject.toml index b186547..18a3f0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ members = [ [tool.pytest.ini_options] asyncio_mode = "auto" -timeout = 60 +timeout = 10 timeout_func_only = true filterwarnings = [ "ignore:^The wait_for_logs function with string or callable predicates is deprecated:DeprecationWarning", diff --git a/tox.ini b/tox.ini index 9159f3f..7fb0861 100644 --- a/tox.ini +++ b/tox.ini @@ -2,11 +2,12 @@ requires = tox>=4.2 env_list = + py314-asyncfast-pydantic212 py313-asyncfast-pydantic2{8-12} clean - py3{10-13}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli} + py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, asyncfast-cli} py3{10-12}-asyncfast-pydantic2{0-12} - py3{10-13}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import + py3{10-14}-{amgi-aiobotocore, amgi-aiokafka, amgi-common, amgi-paho-mqtt, amgi-redis, amgi-sqs-event-source-mapping, amgi-types, asyncfast, asyncfast-cli}-import [testenv] runner = uv-venv-lock-runner @@ -21,7 +22,7 @@ pass_env = commands = pytest {tty:--color=yes} {posargs:-v --cov --cov-append --cov-report=xml} depends = - {py313, py312, py311, py310}: clean + {py314, py313, py312, py311, py310}: clean uv_sync_flags = --all-packages [testenv:clean] @@ -30,7 +31,7 @@ commands = coverage erase uv_sync_flags = --all-packages -[testenv:py3{10-13}-asyncfast-pydantic2{0-12}] +[testenv:py3{10-14}-asyncfast-pydantic2{0-12}] commands_pre = pydantic20: uv pip install "pydantic>=2.0,<2.1" pydantic21: uv pip install "pydantic>=2.1,<2.2" @@ -49,72 +50,72 @@ commands = {[testenv]commands} packages/asyncfast uv_sync_flags = --package=asyncfast -[testenv:py3{10-13}-amgi-aiobotocore] +[testenv:py3{10-14}-amgi-aiobotocore] commands = {[testenv]commands} packages/amgi-aiobotocore uv_sync_flags = --package=amgi-aiobotocore -[testenv:py3{10-13}-amgi-aiokafka] +[testenv:py3{10-14}-amgi-aiokafka] commands = {[testenv]commands} packages/amgi-aiokafka uv_sync_flags = --package=amgi-aiokafka -[testenv:py3{10-13}-amgi-common] +[testenv:py3{10-14}-amgi-common] commands = {[testenv]commands} packages/amgi-common uv_sync_flags = --package=amgi-common -[testenv:py3{10-13}-amgi-paho-mqtt] +[testenv:py3{10-14}-amgi-paho-mqtt] commands = {[testenv]commands} packages/amgi-paho-mqtt uv_sync_flags = --package=amgi-paho-mqtt -[testenv:py3{10-13}-amgi-redis] +[testenv:py3{10-14}-amgi-redis] commands = {[testenv]commands} packages/amgi-redis uv_sync_flags = --package=amgi-redis -[testenv:py3{10-13}-amgi-sqs-event-source-mapping] +[testenv:py3{10-14}-amgi-sqs-event-source-mapping] commands = {[testenv]commands} packages/amgi-sqs-event-source-mapping uv_sync_flags = --package=amgi-sqs-event-source-mapping -[testenv:py3{10-13}-amgi-aiobotocore-import] +[testenv:py3{10-14}-amgi-aiobotocore-import] commands = python -c "import amgi_aiobotocore" uv_sync_flags = --package=amgi-aiobotocore --no-dev -[testenv:py3{10-13}-amgi-aiokafka-import] +[testenv:py3{10-14}-amgi-aiokafka-import] commands = python -c "import amgi_aiokafka" uv_sync_flags = --package=amgi-aiokafka --no-dev -[testenv:py3{10-13}-amgi-common-import] +[testenv:py3{10-14}-amgi-common-import] commands = python -c "import amgi_common" uv_sync_flags = --package=amgi-common --no-dev -[testenv:py3{10-13}-amgi-paho-mqtt-import] +[testenv:py3{10-14}-amgi-paho-mqtt-import] commands = python -c "import amgi_paho_mqtt" uv_sync_flags = --package=amgi-paho-mqtt --no-dev -[testenv:py3{10-13}-amgi-redis-import] +[testenv:py3{10-14}-amgi-redis-import] commands = python -c "import amgi_redis" uv_sync_flags = --package=amgi-redis --no-dev -[testenv:py3{10-13}-amgi-sqs-event-source-mapping-import] +[testenv:py3{10-14}-amgi-sqs-event-source-mapping-import] commands = python -c "import amgi_sqs_event_source_mapping" uv_sync_flags = @@ -122,26 +123,26 @@ uv_sync_flags = --no-dev --extra=boto3 -[testenv:py3{10-13}-amgi-types-import] +[testenv:py3{10-14}-amgi-types-import] commands = python -c "import amgi_types" uv_sync_flags = --package=amgi-types --no-dev -[testenv:py3{10-13}-asyncfast-import] +[testenv:py3{10-14}-asyncfast-import] commands = python -c "import asyncfast" uv_sync_flags = --package=asyncfast --no-dev -[testenv:py3{10-13}-asyncfast-cli] +[testenv:py3{10-14}-asyncfast-cli] commands = {[testenv]commands} packages/asyncfast-cli uv_sync_flags = --package=asyncfast-cli -[testenv:py3{10-13}-asyncfast-cli-import] +[testenv:py3{10-14}-asyncfast-cli-import] commands = python -c "import asyncfast_cli" uv_sync_flags =