Skip to content

Commit 75bc71b

Browse files
committed
Use running loop for asyncio futures
Signed-off-by: Miro <200482516+Mirochill@users.noreply.github.com>
1 parent 98897be commit 75bc71b

4 files changed

Lines changed: 33 additions & 3 deletions

File tree

src/arcp/_client/handles.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def __init__(self, job_id: str, accepted: JobAcceptedPayload) -> None:
2525
self.accepted = accepted
2626
self._events: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
2727
self._chunks: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
28-
loop = asyncio.get_event_loop()
28+
loop = asyncio.get_running_loop()
2929
self._terminal: asyncio.Future[JobResultPayload] = loop.create_future()
3030

3131
@property

src/arcp/_client/ops.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def submit_job( # noqa: PLR0913
6969
trace_id=trace_id,
7070
payload=submit.model_dump(mode="json", exclude_none=True),
7171
)
72-
accept_fut: asyncio.Future[JobAcceptedPayload] = asyncio.get_event_loop().create_future()
72+
accept_fut: asyncio.Future[JobAcceptedPayload] = asyncio.get_running_loop().create_future()
7373
client._pending_accepts.append(accept_fut)
7474
await client._transport.send(env.to_wire())
7575
try:

src/arcp/_runtime/_accept.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def _accept_or_close(runtime: ARCPRuntime, transport: Transport) -> Sessio
5757
def _maybe_start_heartbeat(ctx: SessionContext) -> asyncio.Task[Any] | None:
5858
if not (ctx.has_feature("heartbeat") and ctx.state.heartbeat_interval_sec):
5959
return None
60-
ctx.heartbeat_outcome = asyncio.get_event_loop().create_future()
60+
ctx.heartbeat_outcome = asyncio.get_running_loop().create_future()
6161
return asyncio.create_task(
6262
heartbeat_loop(ctx, interval=float(ctx.state.heartbeat_interval_sec))
6363
)

tests/e2e/test_e2e_memory.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import asyncio
6+
import warnings
67

78
from arcp import ClientInfo, RuntimeInfo, pair_memory_transports
89
from arcp.client import ARCPClient
@@ -70,3 +71,32 @@ async def echo(input_value, ctx: JobContext):
7071
await client.close()
7172
await rt.close()
7273
server.cancel()
74+
75+
76+
async def test_submit_does_not_emit_deprecation_warning() -> None:
77+
rt = ARCPRuntime(
78+
runtime=RuntimeInfo(name="r", version="1"),
79+
bearer=StaticBearerVerifier({"tok": "p1"}),
80+
heartbeat_interval_sec=60.0,
81+
)
82+
83+
async def echo(input_value, ctx: JobContext):
84+
return input_value
85+
86+
rt.register_agent("echo", echo)
87+
88+
a, b = pair_memory_transports()
89+
server = asyncio.create_task(rt.accept(a))
90+
client = ARCPClient(client=ClientInfo(name="c", version="1"), token="tok")
91+
try:
92+
with warnings.catch_warnings():
93+
warnings.simplefilter("error", DeprecationWarning)
94+
await client.connect(b)
95+
handle = await client.submit(agent="echo", input={"x": 1})
96+
result = await handle.done
97+
assert result.final_status == "success"
98+
assert result.result == {"x": 1}
99+
finally:
100+
await client.close()
101+
await rt.close()
102+
server.cancel()

0 commit comments

Comments
 (0)