Skip to content

Commit d20ac25

Browse files
Retry activity polls with request id
1 parent 356cdad commit d20ac25

3 files changed

Lines changed: 54 additions & 9 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "durable-workflow"
7-
version = "0.4.34"
7+
version = "0.4.35"
88
description = "Python SDK for the Durable Workflow server (language-neutral HTTP protocol)"
99
readme = "README.md"
1010
requires-python = ">=3.10"

src/durable_workflow/client.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3276,14 +3276,22 @@ async def poll_activity_task(
32763276
Returns the task payload, or ``None`` on poll timeout. Worker-plane
32773277
endpoint — typically used by :class:`~durable_workflow.Worker`.
32783278
"""
3279-
body: dict[str, Any] = {"worker_id": worker_id, "task_queue": task_queue}
3280-
try:
3281-
data = await self._request(
3282-
"POST", "/worker/activity-tasks/poll", worker=True, json=body, timeout=timeout
3283-
)
3284-
except httpx.TimeoutException:
3285-
return None
3286-
return (data or {}).get("task")
3279+
body: dict[str, Any] = {
3280+
"worker_id": worker_id,
3281+
"task_queue": task_queue,
3282+
"poll_request_id": f"activity-poll-{uuid.uuid4().hex}",
3283+
}
3284+
for _ in range(2):
3285+
try:
3286+
data = await self._request(
3287+
"POST", "/worker/activity-tasks/poll", worker=True, json=body, timeout=timeout
3288+
)
3289+
except httpx.TimeoutException:
3290+
continue
3291+
3292+
return (data or {}).get("task")
3293+
3294+
return None
32873295

32883296
async def complete_activity_task(
32893297
self,

tests/test_client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,6 +2072,43 @@ async def test_poll_workflow_task_retries_once_with_same_poll_request_id_after_t
20722072
assert first_body["task_queue"] == "queue-1"
20732073
assert first_body["poll_request_id"] == second_body["poll_request_id"]
20742074

2075+
@pytest.mark.asyncio
2076+
async def test_poll_activity_task_sends_poll_request_id(self, client: Client) -> None:
2077+
response_task = {"task": {"task_id": "activity-task-123"}}
2078+
2079+
with patch.object(client, "_request", new_callable=AsyncMock, return_value=response_task) as mock:
2080+
task = await client.poll_activity_task(worker_id="worker-1", task_queue="queue-1")
2081+
2082+
assert task == response_task["task"]
2083+
request_body = mock.await_args.kwargs["json"]
2084+
assert request_body["worker_id"] == "worker-1"
2085+
assert request_body["task_queue"] == "queue-1"
2086+
assert isinstance(request_body["poll_request_id"], str)
2087+
assert request_body["poll_request_id"] != ""
2088+
2089+
@pytest.mark.asyncio
2090+
async def test_poll_activity_task_retries_once_with_same_poll_request_id_after_timeout(
2091+
self, client: Client
2092+
) -> None:
2093+
response_task = {"task": {"task_id": "activity-task-123"}}
2094+
2095+
with patch.object(
2096+
client,
2097+
"_request",
2098+
new_callable=AsyncMock,
2099+
side_effect=[httpx.TimeoutException("timeout"), response_task],
2100+
) as mock:
2101+
task = await client.poll_activity_task(worker_id="worker-1", task_queue="queue-1")
2102+
2103+
assert task == response_task["task"]
2104+
assert mock.await_count == 2
2105+
2106+
first_body = mock.await_args_list[0].kwargs["json"]
2107+
second_body = mock.await_args_list[1].kwargs["json"]
2108+
assert first_body["worker_id"] == "worker-1"
2109+
assert first_body["task_queue"] == "queue-1"
2110+
assert first_body["poll_request_id"] == second_body["poll_request_id"]
2111+
20752112
@pytest.mark.asyncio
20762113
async def test_complete_workflow_task_matches_polyglot_fixture(self, client: Client) -> None:
20772114
fixture_path = Path(__file__).parent / "fixtures" / "control-plane" / "workflow-task-complete-parity.json"

0 commit comments

Comments
 (0)