Skip to content

Commit 1b1ffd7

Browse files
committed
test: add targeted unit/state tests to reach 90% coverage
Add 10 new test files targeting previously uncovered branches: Unit tests: - tests/unit/test_lease.py — validate_lease_shape, validate_lease_op, is_lease_subset, assert_lease_subset, canonicalize_target, initial_budget_from_lease, _parse_iso_utc helpers (~45 tests) - tests/unit/test_job_unit.py — Job.apply_cost_metric, emit_result, emit_error, JobContext property accessors and all emitters (~50 tests) - tests/unit/test_job_handle.py — JobHandle property accessors and streaming (chunks, collect_chunks, done awaitable) (13 tests) - tests/unit/test_client_dispatch.py — dispatch handlers: session.ping, session.error, session.bye, job.event, job.result, job.error (14 tests) State/integration tests (in-memory transport): - tests/state/test_heartbeat.py — heartbeat_loop: cancellation, loss detection, ping enqueue, done-future guard (5 tests) - tests/state/test_result_stream.py — ResultStream open/write/close (9 tests) - tests/state/test_list_jobs_filters.py — list_jobs state/agent filters (5 tests) - tests/state/test_subscribe_history.py — replay history on subscribe (4 tests) - tests/state/test_cancel_and_unsubscribe.py — cancel/unsubscribe paths (5 tests) - tests/state/test_ping_pong.py — session.ping/pong round-trip (2 tests) Result: 304 passed, 3 skipped; total coverage 90.25% (fail-under=90 ✓)
1 parent a7e6dd9 commit 1b1ffd7

10 files changed

Lines changed: 2412 additions & 0 deletions
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
"""§8.6 job.cancel + §10.3 job.unsubscribe handler coverage."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import contextlib
7+
8+
import pytest
9+
10+
from arcp import (
11+
Capabilities,
12+
ClientInfo,
13+
InvalidRequestError,
14+
JobNotFoundError,
15+
PermissionDeniedError,
16+
RuntimeInfo,
17+
pair_memory_transports,
18+
)
19+
from arcp._envelope import Envelope
20+
from arcp._messages.execution import JobCancelPayload
21+
from arcp._ulid import new_envelope_id
22+
from arcp.client import ARCPClient
23+
from arcp.runtime import ARCPRuntime, StaticBearerVerifier
24+
25+
26+
def _make_rt() -> ARCPRuntime:
27+
return ARCPRuntime(
28+
runtime=RuntimeInfo(name="r", version="1"),
29+
bearer=StaticBearerVerifier({"tok": "p1"}),
30+
heartbeat_interval_sec=None,
31+
)
32+
33+
34+
async def test_cancel_unknown_job_raises_not_found() -> None:
35+
from arcp._runtime._handlers import handle_cancel
36+
37+
rt = _make_rt()
38+
server_t, client_t = pair_memory_transports()
39+
accept_task = asyncio.create_task(rt.accept(server_t))
40+
41+
client = ARCPClient(
42+
client=ClientInfo(name="c", version="1"),
43+
token="tok",
44+
capabilities=Capabilities(features=rt.capabilities.features),
45+
)
46+
welcome = await client.connect(client_t)
47+
ctx = rt._sessions[welcome.session_id]
48+
49+
env = Envelope(
50+
id=new_envelope_id(),
51+
type="job.cancel",
52+
session_id=welcome.session_id,
53+
job_id="nonexistent-job",
54+
payload=JobCancelPayload(reason="test").model_dump(mode="json"),
55+
)
56+
with pytest.raises(JobNotFoundError):
57+
await handle_cancel(rt, ctx, env)
58+
59+
await client.close()
60+
accept_task.cancel()
61+
with contextlib.suppress(asyncio.CancelledError, Exception):
62+
await accept_task
63+
await rt.close()
64+
65+
66+
async def test_cancel_missing_job_id_raises() -> None:
67+
from arcp._runtime._handlers import handle_cancel
68+
69+
rt = _make_rt()
70+
server_t, client_t = pair_memory_transports()
71+
accept_task = asyncio.create_task(rt.accept(server_t))
72+
73+
client = ARCPClient(
74+
client=ClientInfo(name="c", version="1"),
75+
token="tok",
76+
capabilities=Capabilities(features=rt.capabilities.features),
77+
)
78+
welcome = await client.connect(client_t)
79+
ctx = rt._sessions[welcome.session_id]
80+
81+
env = Envelope(
82+
id=new_envelope_id(),
83+
type="job.cancel",
84+
session_id=welcome.session_id,
85+
job_id=None, # missing job_id
86+
payload=JobCancelPayload(reason="test").model_dump(mode="json"),
87+
)
88+
with pytest.raises(InvalidRequestError, match="job_id"):
89+
await handle_cancel(rt, ctx, env)
90+
91+
await client.close()
92+
accept_task.cancel()
93+
with contextlib.suppress(asyncio.CancelledError, Exception):
94+
await accept_task
95+
await rt.close()
96+
97+
98+
async def test_cancel_other_principals_job_raises_permission_denied() -> None:
99+
from arcp._runtime._handlers import handle_cancel
100+
101+
rt = ARCPRuntime(
102+
runtime=RuntimeInfo(name="r", version="1"),
103+
bearer=StaticBearerVerifier({"tok": "p1", "other": "p2"}),
104+
heartbeat_interval_sec=None,
105+
)
106+
107+
long_running_started = asyncio.Event()
108+
109+
async def slow_agent(input_value, ctx):
110+
long_running_started.set()
111+
await asyncio.sleep(10)
112+
113+
rt.register_agent("slow", slow_agent)
114+
115+
# Connect as p1 and submit
116+
server_a, client_a = pair_memory_transports()
117+
server_b, client_b = pair_memory_transports()
118+
task_a = asyncio.create_task(rt.accept(server_a))
119+
task_b = asyncio.create_task(rt.accept(server_b))
120+
121+
ca = ARCPClient(
122+
client=ClientInfo(name="a", version="1"),
123+
token="tok",
124+
capabilities=Capabilities(features=rt.capabilities.features),
125+
)
126+
cb = ARCPClient(
127+
client=ClientInfo(name="b", version="1"),
128+
token="other",
129+
capabilities=Capabilities(features=rt.capabilities.features),
130+
)
131+
welcome_a = await ca.connect(client_a)
132+
welcome_b = await cb.connect(client_b)
133+
134+
handle = await ca.submit(agent="slow")
135+
await asyncio.wait_for(long_running_started.wait(), timeout=2.0)
136+
137+
# p2's ctx tries to cancel p1's job
138+
ctx_b = rt._sessions[welcome_b.session_id]
139+
job = rt._jobs[handle.job_id]
140+
env = Envelope(
141+
id=new_envelope_id(),
142+
type="job.cancel",
143+
session_id=welcome_b.session_id,
144+
job_id=handle.job_id,
145+
payload=JobCancelPayload(reason="hostile").model_dump(mode="json"),
146+
)
147+
148+
with pytest.raises(PermissionDeniedError):
149+
await handle_cancel(rt, ctx_b, env)
150+
151+
for cli in (ca, cb):
152+
await cli.close()
153+
for t in (task_a, task_b):
154+
t.cancel()
155+
with contextlib.suppress(asyncio.CancelledError, Exception):
156+
await t
157+
await rt.close()
158+
159+
160+
async def test_cancel_own_job_cancels_task() -> None:
161+
"""Cancelling a running job via client.cancel_job cancels the underlying task."""
162+
rt = _make_rt()
163+
started = asyncio.Event()
164+
cancelled = asyncio.Event()
165+
166+
async def slow_agent(input_value, ctx):
167+
started.set()
168+
try:
169+
await asyncio.sleep(10)
170+
except asyncio.CancelledError:
171+
cancelled.set()
172+
raise
173+
174+
rt.register_agent("slow", slow_agent)
175+
176+
server_t, client_t = pair_memory_transports()
177+
accept_task = asyncio.create_task(rt.accept(server_t))
178+
179+
client = ARCPClient(
180+
client=ClientInfo(name="c", version="1"),
181+
token="tok",
182+
capabilities=Capabilities(features=rt.capabilities.features),
183+
)
184+
await client.connect(client_t)
185+
186+
handle = await client.submit(agent="slow")
187+
await asyncio.wait_for(started.wait(), timeout=2.0)
188+
await client.cancel_job(handle.job_id)
189+
await asyncio.wait_for(cancelled.wait(), timeout=2.0)
190+
assert cancelled.is_set()
191+
192+
await client.close()
193+
accept_task.cancel()
194+
with contextlib.suppress(asyncio.CancelledError, Exception):
195+
await accept_task
196+
await rt.close()
197+
198+
199+
async def test_unsubscribe_unknown_job_is_noop() -> None:
200+
"""handle_unsubscribe with unknown job_id must silently return."""
201+
from arcp._runtime._handlers import handle_unsubscribe
202+
from arcp._messages.execution import JobUnsubscribePayload
203+
204+
rt = _make_rt()
205+
server_t, client_t = pair_memory_transports()
206+
accept_task = asyncio.create_task(rt.accept(server_t))
207+
208+
client = ARCPClient(
209+
client=ClientInfo(name="c", version="1"),
210+
token="tok",
211+
capabilities=Capabilities(features=rt.capabilities.features),
212+
)
213+
welcome = await client.connect(client_t)
214+
ctx = rt._sessions[welcome.session_id]
215+
216+
env = Envelope(
217+
id=new_envelope_id(),
218+
type="job.unsubscribe",
219+
session_id=welcome.session_id,
220+
payload=JobUnsubscribePayload(job_id="ghost-job").model_dump(mode="json"),
221+
)
222+
# Must not raise
223+
await handle_unsubscribe(rt, ctx, env)
224+
225+
await client.close()
226+
accept_task.cancel()
227+
with contextlib.suppress(asyncio.CancelledError, Exception):
228+
await accept_task
229+
await rt.close()

tests/state/test_heartbeat.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
"""§6.4 heartbeat_loop — ping cycle, cancellation, and loss detection coverage."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import contextlib
7+
from datetime import UTC, datetime
8+
from unittest.mock import MagicMock
9+
10+
import pytest
11+
12+
from arcp._errors import HeartbeatLostError
13+
from arcp._runtime.session import SessionContext, SessionState, heartbeat_loop
14+
from arcp._transport.base import Transport
15+
16+
17+
def _make_ctx() -> SessionContext:
18+
"""Construct a minimal SessionContext for heartbeat testing."""
19+
state = SessionState(
20+
session_id="heartbeat-test-session",
21+
resume_token="tok",
22+
principal="p1",
23+
negotiated_features=(),
24+
heartbeat_interval_sec=None,
25+
resume_window_sec=60,
26+
accepted_at=datetime.now(UTC),
27+
)
28+
transport = MagicMock(spec=Transport)
29+
return SessionContext(
30+
transport=transport,
31+
state=state,
32+
send_queue=asyncio.Queue(),
33+
)
34+
35+
36+
async def test_heartbeat_cancelled_returns_cleanly() -> None:
37+
"""CancelledError during sleep causes heartbeat_loop to exit without setting exception."""
38+
ctx = _make_ctx()
39+
loop = asyncio.get_running_loop()
40+
ctx.heartbeat_outcome = loop.create_future()
41+
42+
task = asyncio.create_task(heartbeat_loop(ctx, interval=60.0))
43+
await asyncio.sleep(0.01) # let the task enter asyncio.sleep
44+
task.cancel()
45+
46+
with contextlib.suppress(asyncio.CancelledError):
47+
await task
48+
49+
# heartbeat_outcome must not have been set
50+
assert not ctx.heartbeat_outcome.done()
51+
ctx.heartbeat_outcome.cancel() # clean up the future
52+
53+
54+
async def test_heartbeat_lost_sets_exception_on_outcome() -> None:
55+
"""Gap >= interval * miss_threshold sets HeartbeatLostError on heartbeat_outcome."""
56+
ctx = _make_ctx()
57+
loop = asyncio.get_running_loop()
58+
ctx.heartbeat_outcome = loop.create_future()
59+
60+
# Back-date last_inbound_at so gap is always enormous
61+
ctx._last_inbound_at = datetime(2000, 1, 1, tzinfo=UTC)
62+
63+
# interval=0.05 s, miss_threshold=1 → threshold=0.05 s; gap >> threshold
64+
await asyncio.wait_for(
65+
heartbeat_loop(ctx, interval=0.05, miss_threshold=1),
66+
timeout=1.0,
67+
)
68+
69+
assert ctx.heartbeat_outcome.done()
70+
with pytest.raises(HeartbeatLostError):
71+
ctx.heartbeat_outcome.result()
72+
73+
74+
async def test_heartbeat_loss_without_outcome_future_is_safe() -> None:
75+
"""heartbeat_loop handles heartbeat_outcome=None gracefully when loss fires."""
76+
ctx = _make_ctx()
77+
ctx.heartbeat_outcome = None # no future attached
78+
79+
# Back-date to trigger loss on first check
80+
ctx._last_inbound_at = datetime(2000, 1, 1, tzinfo=UTC)
81+
82+
# Should complete without AttributeError or TypeError
83+
await asyncio.wait_for(
84+
heartbeat_loop(ctx, interval=0.05, miss_threshold=1),
85+
timeout=1.0,
86+
)
87+
# No assertion needed — reaching here means no exception was raised
88+
89+
90+
async def test_heartbeat_sends_ping_and_invokes_on_ping_callback() -> None:
91+
"""Normal interval enqueues session.ping and calls on_ping with the nonce."""
92+
ctx = _make_ctx()
93+
received_nonces: list[str] = []
94+
95+
# Use large miss_threshold so we never trigger the lost-heartbeat branch
96+
task = asyncio.create_task(
97+
heartbeat_loop(ctx, interval=0.05, miss_threshold=20, on_ping=received_nonces.append)
98+
)
99+
# Wait long enough for at least one complete iteration
100+
await asyncio.sleep(0.13)
101+
task.cancel()
102+
with contextlib.suppress(asyncio.CancelledError):
103+
await task
104+
105+
assert len(received_nonces) >= 1
106+
107+
# The send queue must contain at least one session.ping envelope
108+
found_ping = False
109+
while not ctx._send_queue.empty():
110+
item = ctx._send_queue.get_nowait()
111+
if item is not None and item.type == "session.ping":
112+
found_ping = True
113+
break
114+
assert found_ping, "expected at least one session.ping in the send queue"
115+
116+
117+
async def test_heartbeat_loss_with_done_outcome_does_not_raise() -> None:
118+
"""If heartbeat_outcome is already done, set_exception is skipped (guarded by .done())."""
119+
ctx = _make_ctx()
120+
loop = asyncio.get_running_loop()
121+
fut: asyncio.Future[None] = loop.create_future()
122+
fut.cancel() # mark future as done (cancelled)
123+
ctx.heartbeat_outcome = fut
124+
125+
ctx._last_inbound_at = datetime(2000, 1, 1, tzinfo=UTC)
126+
127+
# Should complete without InvalidStateError (future already done guard)
128+
await asyncio.wait_for(
129+
heartbeat_loop(ctx, interval=0.05, miss_threshold=1),
130+
timeout=1.0,
131+
)

0 commit comments

Comments
 (0)