Skip to content

Commit e2efefe

Browse files
Expose task queue admission in Python SDK
Issue: zorporation/durable-workflow#488 Loop-ID: build-01
1 parent 013456b commit e2efefe

4 files changed

Lines changed: 272 additions & 1 deletion

File tree

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,17 @@ restart the worker process with a new id before serving changed workflow code.
159159
Workers also advertise their local workflow and activity concurrency limits
160160
during registration. Tune `max_concurrent_workflow_tasks` and
161161
`max_concurrent_activity_tasks` on `Worker(...)` to align local semaphores with
162-
the server's task-queue admission and operator visibility surfaces.
162+
the server's task-queue admission and operator visibility surfaces. Use
163+
`Client.list_task_queues()` or `Client.describe_task_queue("orders")` to read
164+
the server-side workflow, activity, and query-task admission status before
165+
tuning those local limits:
166+
167+
```python
168+
queues = await client.list_task_queues()
169+
for queue in queues.task_queues:
170+
workflow_admission = queue.admission.workflow_tasks if queue.admission else None
171+
print(queue.name, workflow_admission.status if workflow_admission else "unknown")
172+
```
163173

164174
## Replay captured histories
165175

src/durable_workflow/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
ScheduleList,
1818
ScheduleSpec,
1919
ScheduleTriggerResult,
20+
TaskQueueAdmission,
21+
TaskQueueDescription,
22+
TaskQueueList,
23+
TaskQueueQueryAdmission,
24+
TaskQueueTaskAdmission,
2025
WorkflowExecution,
2126
WorkflowHandle,
2227
WorkflowList,
@@ -100,6 +105,11 @@
100105
"ScheduleSpec",
101106
"ScheduleTriggerResult",
102107
"StartChildWorkflow",
108+
"TaskQueueAdmission",
109+
"TaskQueueDescription",
110+
"TaskQueueList",
111+
"TaskQueueQueryAdmission",
112+
"TaskQueueTaskAdmission",
103113
"Worker",
104114
"WorkerInterceptor",
105115
"WorkflowExecution",

src/durable_workflow/client.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from importlib.metadata import PackageNotFoundError
2626
from importlib.metadata import version as _pkg_version
2727
from typing import Any
28+
from urllib.parse import quote
2829

2930
import httpx
3031

@@ -102,6 +103,127 @@ class WorkflowList:
102103
next_page_token: str | None = None
103104

104105

106+
@dataclass
107+
class TaskQueueTaskAdmission:
108+
"""Workflow/activity admission state for one task queue."""
109+
110+
status: str | None = None
111+
budget_source: str | None = None
112+
server_budget_source: str | None = None
113+
active_worker_count: int | None = None
114+
configured_slot_count: int | None = None
115+
leased_count: int | None = None
116+
ready_count: int | None = None
117+
available_slot_count: int | None = None
118+
server_max_active_leases_per_queue: int | None = None
119+
server_active_lease_count: int | None = None
120+
server_remaining_active_lease_capacity: int | None = None
121+
server_lock_required: bool | None = None
122+
server_lock_supported: bool | None = None
123+
124+
@classmethod
125+
def from_dict(cls, data: dict[str, Any] | None) -> TaskQueueTaskAdmission | None:
126+
if data is None:
127+
return None
128+
return cls(
129+
status=data.get("status"),
130+
budget_source=data.get("budget_source"),
131+
server_budget_source=data.get("server_budget_source"),
132+
active_worker_count=data.get("active_worker_count"),
133+
configured_slot_count=data.get("configured_slot_count"),
134+
leased_count=data.get("leased_count"),
135+
ready_count=data.get("ready_count"),
136+
available_slot_count=data.get("available_slot_count"),
137+
server_max_active_leases_per_queue=data.get("server_max_active_leases_per_queue"),
138+
server_active_lease_count=data.get("server_active_lease_count"),
139+
server_remaining_active_lease_capacity=data.get("server_remaining_active_lease_capacity"),
140+
server_lock_required=data.get("server_lock_required"),
141+
server_lock_supported=data.get("server_lock_supported"),
142+
)
143+
144+
145+
@dataclass
146+
class TaskQueueQueryAdmission:
147+
"""Worker-routed query-task admission state for one task queue."""
148+
149+
status: str | None = None
150+
budget_source: str | None = None
151+
max_pending_per_queue: int | None = None
152+
approximate_pending_count: int | None = None
153+
remaining_pending_capacity: int | None = None
154+
lock_required: bool | None = None
155+
lock_supported: bool | None = None
156+
157+
@classmethod
158+
def from_dict(cls, data: dict[str, Any] | None) -> TaskQueueQueryAdmission | None:
159+
if data is None:
160+
return None
161+
return cls(
162+
status=data.get("status"),
163+
budget_source=data.get("budget_source"),
164+
max_pending_per_queue=data.get("max_pending_per_queue"),
165+
approximate_pending_count=data.get("approximate_pending_count"),
166+
remaining_pending_capacity=data.get("remaining_pending_capacity"),
167+
lock_required=data.get("lock_required"),
168+
lock_supported=data.get("lock_supported"),
169+
)
170+
171+
172+
@dataclass
173+
class TaskQueueAdmission:
174+
"""Server-side admission budgets for workflow, activity, and query tasks."""
175+
176+
workflow_tasks: TaskQueueTaskAdmission | None = None
177+
activity_tasks: TaskQueueTaskAdmission | None = None
178+
query_tasks: TaskQueueQueryAdmission | None = None
179+
raw: dict[str, Any] | None = None
180+
181+
@classmethod
182+
def from_dict(cls, data: dict[str, Any] | None) -> TaskQueueAdmission:
183+
payload = data or {}
184+
return cls(
185+
workflow_tasks=TaskQueueTaskAdmission.from_dict(payload.get("workflow_tasks")),
186+
activity_tasks=TaskQueueTaskAdmission.from_dict(payload.get("activity_tasks")),
187+
query_tasks=TaskQueueQueryAdmission.from_dict(payload.get("query_tasks")),
188+
raw=payload,
189+
)
190+
191+
192+
@dataclass
193+
class TaskQueueDescription:
194+
"""Current server visibility and admission state for one task queue."""
195+
196+
name: str
197+
namespace: str | None = None
198+
stats: dict[str, Any] | None = None
199+
admission: TaskQueueAdmission | None = None
200+
pollers: list[dict[str, Any]] | None = None
201+
current_leases: list[dict[str, Any]] | None = None
202+
raw: dict[str, Any] | None = None
203+
204+
@classmethod
205+
def from_dict(cls, data: dict[str, Any]) -> TaskQueueDescription:
206+
pollers = data.get("pollers")
207+
current_leases = data.get("current_leases")
208+
return cls(
209+
name=data.get("name", ""),
210+
namespace=data.get("namespace"),
211+
stats=data.get("stats"),
212+
admission=TaskQueueAdmission.from_dict(data.get("admission")),
213+
pollers=pollers if isinstance(pollers, list) else None,
214+
current_leases=current_leases if isinstance(current_leases, list) else None,
215+
raw=data,
216+
)
217+
218+
219+
@dataclass
220+
class TaskQueueList:
221+
"""One task-queue visibility page returned by the server."""
222+
223+
namespace: str | None
224+
task_queues: list[TaskQueueDescription]
225+
226+
105227
@dataclass
106228
class ScheduleSpec:
107229
"""Calendar or interval rules for a scheduled workflow."""
@@ -571,6 +693,38 @@ async def health(self) -> dict[str, Any]:
571693
)
572694
return result
573695

696+
# ── Task queues ────────────────────────────────────────────────────
697+
async def list_task_queues(self) -> TaskQueueList:
698+
"""List task queues with server-side admission status.
699+
700+
Admission data describes server budgets and observed backlog. Worker
701+
constructor limits remain local semaphores that are advertised during
702+
registration.
703+
"""
704+
data = await self._request("GET", "/task-queues")
705+
items = data.get("task_queues", []) if isinstance(data, dict) else []
706+
return TaskQueueList(
707+
namespace=data.get("namespace") if isinstance(data, dict) else None,
708+
task_queues=[
709+
TaskQueueDescription.from_dict(item)
710+
for item in items
711+
if isinstance(item, dict)
712+
],
713+
)
714+
715+
async def describe_task_queue(self, name: str) -> TaskQueueDescription:
716+
"""Return backlog, poller, lease, and admission detail for ``name``."""
717+
data = await self._request("GET", f"/task-queues/{quote(name, safe='')}", context=name)
718+
if not isinstance(data, dict):
719+
raise ServerError(
720+
200,
721+
{
722+
"reason": "invalid_task_queue_response",
723+
"message": f"expected JSON object, got {type(data).__name__}",
724+
},
725+
)
726+
return TaskQueueDescription.from_dict(data)
727+
574728
# ── Workflows ──────────────────────────────────────────────────────
575729
async def start_workflow(
576730
self,

tests/test_client.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,103 @@ async def test_list(self, client: Client) -> None:
361361
assert result.executions[0].workflow_id == "wf-1"
362362

363363

364+
class TestTaskQueues:
365+
@pytest.mark.asyncio
366+
async def test_list_task_queues_parses_admission(self, client: Client) -> None:
367+
resp = _mock_response(200, {
368+
"namespace": "ns1",
369+
"task_queues": [
370+
{
371+
"name": "orders",
372+
"stats": {"approximate_backlog_count": 2},
373+
"admission": {
374+
"workflow_tasks": {
375+
"status": "throttled",
376+
"budget_source": "worker_registration.max_concurrent_workflow_tasks",
377+
"active_worker_count": 2,
378+
"configured_slot_count": 10,
379+
"leased_count": 1,
380+
"ready_count": 2,
381+
"available_slot_count": 9,
382+
"server_budget_source": "server.admission.workflow_tasks.max_active_leases_per_queue",
383+
"server_max_active_leases_per_queue": 1,
384+
"server_active_lease_count": 1,
385+
"server_remaining_active_lease_capacity": 0,
386+
"server_lock_required": True,
387+
"server_lock_supported": True,
388+
},
389+
"activity_tasks": {"status": "accepting", "configured_slot_count": 5},
390+
"query_tasks": {
391+
"status": "full",
392+
"budget_source": "server.query_tasks.max_pending_per_queue",
393+
"max_pending_per_queue": 10,
394+
"approximate_pending_count": 10,
395+
"remaining_pending_capacity": 0,
396+
"lock_required": True,
397+
"lock_supported": True,
398+
},
399+
},
400+
}
401+
],
402+
})
403+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
404+
result = await client.list_task_queues()
405+
406+
assert result.namespace == "ns1"
407+
assert len(result.task_queues) == 1
408+
queue = result.task_queues[0]
409+
assert queue.name == "orders"
410+
assert queue.stats == {"approximate_backlog_count": 2}
411+
assert queue.admission is not None
412+
assert queue.admission.workflow_tasks is not None
413+
assert queue.admission.workflow_tasks.status == "throttled"
414+
assert queue.admission.workflow_tasks.server_remaining_active_lease_capacity == 0
415+
assert queue.admission.activity_tasks is not None
416+
assert queue.admission.activity_tasks.configured_slot_count == 5
417+
assert queue.admission.query_tasks is not None
418+
assert queue.admission.query_tasks.status == "full"
419+
assert queue.admission.query_tasks.lock_supported is True
420+
assert queue.admission.raw is not None
421+
assert queue.admission.raw["query_tasks"]["max_pending_per_queue"] == 10
422+
assert mock.call_args.args[:2] == ("GET", "/api/task-queues")
423+
424+
@pytest.mark.asyncio
425+
async def test_describe_task_queue_parses_details_and_escapes_name(self, client: Client) -> None:
426+
resp = _mock_response(200, {
427+
"name": "orders/high priority",
428+
"namespace": "ns1",
429+
"stats": {"pollers": {"active_count": 1}},
430+
"pollers": [{"worker_id": "w1", "status": "active"}],
431+
"current_leases": [{"task_id": "t1", "task_type": "workflow"}],
432+
"admission": {"workflow_tasks": {"status": "accepting"}},
433+
})
434+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
435+
result = await client.describe_task_queue("orders/high priority")
436+
437+
assert result.name == "orders/high priority"
438+
assert result.namespace == "ns1"
439+
assert result.pollers == [{"worker_id": "w1", "status": "active"}]
440+
assert result.current_leases == [{"task_id": "t1", "task_type": "workflow"}]
441+
assert result.admission is not None
442+
assert result.admission.workflow_tasks is not None
443+
assert result.admission.workflow_tasks.status == "accepting"
444+
assert mock.call_args.args[:2] == ("GET", "/api/task-queues/orders%2Fhigh%20priority")
445+
446+
@pytest.mark.asyncio
447+
async def test_describe_task_queue_rejects_non_object_response(self, client: Client) -> None:
448+
resp = httpx.Response(
449+
status_code=200,
450+
content=b"[]",
451+
headers={"content-type": "application/json"},
452+
request=httpx.Request("GET", "http://test"),
453+
)
454+
with (
455+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
456+
pytest.raises(ServerError, match="invalid_task_queue_response"),
457+
):
458+
await client.describe_task_queue("orders")
459+
460+
364461
class TestErrorMapping:
365462
@pytest.mark.asyncio
366463
async def test_401_unauthorized(self, client: Client) -> None:

0 commit comments

Comments
 (0)