Skip to content

Commit 39b104b

Browse files
[cross-repo from server#301] Conformance: prove malformed signal/query payload typed errors (#106)
1 parent f2842d8 commit 39b104b

5 files changed

Lines changed: 221 additions & 2 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,29 @@ server. Query routing and synchronous pre-accept update validator execution are
160160
still server-side follow-ups; use those paths only with deployments that
161161
advertise support for the target workflow type.
162162

163+
Malformed signal and query payloads are reported as typed client errors with
164+
the server's documented reason and status preserved:
165+
166+
```python
167+
from durable_workflow import Client, QueryFailed, SignalFailed
168+
169+
client = Client("http://localhost:8080")
170+
171+
try:
172+
await client.signal_workflow("counter-1", "increment", args=["not-an-int"])
173+
except SignalFailed as exc:
174+
assert exc.reason == "invalid_signal_arguments"
175+
assert exc.status == 422
176+
assert exc.validation_errors is not None
177+
178+
try:
179+
await client.query_workflow("counter-1", "current-at", args=["not-an-int"])
180+
except QueryFailed as exc:
181+
assert exc.reason == "invalid_query_arguments"
182+
assert exc.status == 422
183+
assert exc.validation_errors is not None
184+
```
185+
163186
Use `yield ctx.wait_condition(lambda: self.approved, key="approved",
164187
timeout=30)` to wait for signal- or update-mutated workflow state without
165188
polling timers by hand. The SDK sends a stable predicate fingerprint with the

src/durable_workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
ScheduleAlreadyExists,
6464
ScheduleNotFound,
6565
ServerError,
66+
SignalFailed,
6667
Unauthorized,
6768
UpdateRejected,
6869
WorkflowAlreadyStarted,
@@ -280,6 +281,7 @@
280281
"RetryPolicy",
281282
"S3ExternalStorage",
282283
"ServerError",
284+
"SignalFailed",
283285
"TransportRetryPolicy",
284286
"Unauthorized",
285287
"UpdateRejected",

src/durable_workflow/errors.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,54 @@ def __init__(self, schedule_id: str) -> None:
122122
class QueryFailed(DurableWorkflowError):
123123
"""A workflow query was rejected or the workflow raised while handling it."""
124124

125-
def __init__(self, message: str, *, reason: str | None = None, body: object | None = None) -> None:
125+
def __init__(
126+
self,
127+
message: str,
128+
*,
129+
reason: str | None = None,
130+
status: int | None = None,
131+
body: object | None = None,
132+
) -> None:
133+
super().__init__(message)
134+
self.reason = reason
135+
self.status = status
136+
self.body = body
137+
138+
@property
139+
def validation_errors(self) -> dict[str, Any] | None:
140+
"""Return structured query argument validation errors, if the server provided them."""
141+
if isinstance(self.body, dict):
142+
errors = self.body.get("validation_errors") or self.body.get("errors")
143+
if isinstance(errors, dict):
144+
return errors
145+
return None
146+
147+
148+
class SignalFailed(DurableWorkflowError):
149+
"""A workflow signal was rejected before it could be delivered."""
150+
151+
def __init__(
152+
self,
153+
message: str,
154+
*,
155+
reason: str | None = None,
156+
status: int | None = None,
157+
body: object | None = None,
158+
) -> None:
126159
super().__init__(message)
127160
self.reason = reason
161+
self.status = status
128162
self.body = body
129163

164+
@property
165+
def validation_errors(self) -> dict[str, Any] | None:
166+
"""Return structured signal argument validation errors, if the server provided them."""
167+
if isinstance(self.body, dict):
168+
errors = self.body.get("validation_errors") or self.body.get("errors")
169+
if isinstance(errors, dict):
170+
return errors
171+
return None
172+
130173

131174
class WorkflowPayloadDecodeError(DurableWorkflowError):
132175
"""A committed workflow history payload could not be decoded during replay."""
@@ -282,14 +325,30 @@ def _raise_for_status(status: int, body: object, *, context: str = "") -> None:
282325

283326
reason = body.get("reason") if isinstance(body, dict) else None
284327
message = body.get("message", "") if isinstance(body, dict) else str(body)
328+
operation = _control_plane_operation(body)
285329

286330
if status == 401:
287331
raise Unauthorized(message or "unauthorized")
288332

289333
def query_failed(default: str) -> QueryFailed:
290-
return QueryFailed(message or default, reason=reason if isinstance(reason, str) else None, body=body)
334+
return QueryFailed(
335+
message or default,
336+
reason=reason if isinstance(reason, str) else None,
337+
status=status,
338+
body=body,
339+
)
340+
341+
def signal_failed(default: str) -> SignalFailed:
342+
return SignalFailed(
343+
message or default,
344+
reason=reason if isinstance(reason, str) else None,
345+
status=status,
346+
body=body,
347+
)
291348

292349
if status == 404:
350+
if reason == "unknown_signal":
351+
raise signal_failed("signal not found")
293352
if reason in ("query_not_found", "rejected_unknown_query"):
294353
raise query_failed("query not found")
295354
if reason == "schedule_not_found":
@@ -305,6 +364,11 @@ def query_failed(default: str) -> QueryFailed:
305364
raise ScheduleAlreadyExists(context)
306365
if reason == "duplicate_not_allowed":
307366
raise WorkflowAlreadyStarted(context)
367+
if reason == "run_not_active":
368+
if operation == "signal":
369+
raise signal_failed("signal rejected")
370+
if operation == "query":
371+
raise query_failed("query rejected")
308372
if reason in (
309373
"query_rejected",
310374
"query_worker_unavailable",
@@ -326,9 +390,30 @@ def query_failed(default: str) -> QueryFailed:
326390
raise ServerError(status, body)
327391

328392
if status == 422:
393+
if reason == "invalid_signal_arguments":
394+
raise signal_failed("signal argument validation failed")
395+
if reason == "invalid_query_arguments":
396+
raise query_failed("query argument validation failed")
329397
errors = None
330398
if isinstance(body, dict):
331399
errors = body.get("errors") or body.get("validation_errors")
332400
raise InvalidArgument(message, errors)
333401

334402
raise ServerError(status, body)
403+
404+
405+
def _control_plane_operation(body: object) -> str | None:
406+
if not isinstance(body, dict):
407+
return None
408+
409+
control_plane = body.get("control_plane")
410+
if isinstance(control_plane, dict):
411+
operation = control_plane.get("operation")
412+
if isinstance(operation, str) and operation:
413+
return operation
414+
415+
operation = body.get("control_plane_operation")
416+
if isinstance(operation, str) and operation:
417+
return operation
418+
419+
return None

tests/test_client.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
InvalidArgument,
2424
QueryFailed,
2525
ServerError,
26+
SignalFailed,
2627
Unauthorized,
2728
UpdateRejected,
2829
WorkflowAlreadyStarted,
@@ -573,6 +574,36 @@ async def test_signal_request_matches_polyglot_fixture(self, client: Client) ->
573574
assert sdk["args"]["workflow_id"] == semantic["workflow_id"]
574575
assert sdk["args"]["signal_name"] == semantic["signal_name"]
575576

577+
@pytest.mark.asyncio
578+
async def test_unknown_signal_raises_signal_failed(self, client: Client) -> None:
579+
resp = _mock_response(404, {"reason": "unknown_signal", "message": "signal [missing] not declared"})
580+
with (
581+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
582+
pytest.raises(SignalFailed) as excinfo,
583+
):
584+
await client.signal_workflow("wf-1", "missing")
585+
586+
assert excinfo.value.reason == "unknown_signal"
587+
assert excinfo.value.status == 404
588+
589+
@pytest.mark.asyncio
590+
async def test_malformed_signal_payload_raises_signal_failed(self, client: Client) -> None:
591+
body = {
592+
"reason": "invalid_signal_arguments",
593+
"message": "signal argument validation failed",
594+
"validation_errors": {"n": ["The n argument must be an integer."]},
595+
}
596+
resp = _mock_response(422, body)
597+
with (
598+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
599+
pytest.raises(SignalFailed) as excinfo,
600+
):
601+
await client.signal_workflow("wf-1", "increment", args=["not-an-int"])
602+
603+
assert excinfo.value.reason == "invalid_signal_arguments"
604+
assert excinfo.value.status == 422
605+
assert excinfo.value.validation_errors == {"n": ["The n argument must be an integer."]}
606+
576607

577608
class TestWebhookBridgeAdapters:
578609
@pytest.mark.asyncio
@@ -828,6 +859,25 @@ async def test_worker_routed_unknown_query_raises_query_failed(self, client: Cli
828859
):
829860
await client.query_workflow("wf-1", "status")
830861

862+
@pytest.mark.asyncio
863+
async def test_malformed_query_payload_raises_query_failed(self, client: Client) -> None:
864+
body = {
865+
"reason": "invalid_query_arguments",
866+
"message": "query argument validation failed",
867+
"validation_errors": {"n": ["The n argument must be an integer."]},
868+
}
869+
resp = _mock_response(422, body)
870+
with (
871+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
872+
pytest.raises(QueryFailed) as excinfo,
873+
):
874+
await client.query_workflow("wf-1", "current", args=["not-an-int"])
875+
876+
assert excinfo.value.reason == "invalid_query_arguments"
877+
assert excinfo.value.status == 422
878+
assert excinfo.value.body == body
879+
assert excinfo.value.validation_errors == {"n": ["The n argument must be an integer."]}
880+
831881
@pytest.mark.asyncio
832882
async def test_query_rejected(self, client: Client) -> None:
833883
resp = _mock_response(409, {"reason": "query_rejected", "message": "workflow unavailable"})

tests/test_errors.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
NamespaceNotFound,
1010
QueryFailed,
1111
ServerError,
12+
SignalFailed,
1213
Unauthorized,
1314
UpdateRejected,
1415
WorkflowAlreadyStarted,
@@ -38,6 +39,12 @@ def test_404_query_not_found(self) -> None:
3839
with pytest.raises(QueryFailed):
3940
_raise_for_status(404, {"reason": "query_not_found", "message": "query [status] not declared"})
4041

42+
def test_404_unknown_signal(self) -> None:
43+
with pytest.raises(SignalFailed) as exc_info:
44+
_raise_for_status(404, {"reason": "unknown_signal", "message": "signal [advance] not declared"})
45+
assert exc_info.value.reason == "unknown_signal"
46+
assert exc_info.value.status == 404
47+
4148
def test_404_namespace(self) -> None:
4249
with pytest.raises(NamespaceNotFound):
4350
_raise_for_status(404, {"reason": "namespace_not_found", "message": "ns missing"})
@@ -54,6 +61,32 @@ def test_409_query_rejected(self) -> None:
5461
with pytest.raises(QueryFailed):
5562
_raise_for_status(409, {"reason": "query_rejected", "message": "workflow unavailable"})
5663

64+
def test_409_terminal_signal_rejected(self) -> None:
65+
with pytest.raises(SignalFailed) as exc_info:
66+
_raise_for_status(
67+
409,
68+
{
69+
"reason": "run_not_active",
70+
"message": "workflow is completed",
71+
"control_plane": {"operation": "signal"},
72+
},
73+
)
74+
assert exc_info.value.reason == "run_not_active"
75+
assert exc_info.value.status == 409
76+
77+
def test_409_terminal_query_rejected(self) -> None:
78+
with pytest.raises(QueryFailed) as exc_info:
79+
_raise_for_status(
80+
409,
81+
{
82+
"reason": "run_not_active",
83+
"message": "workflow is completed",
84+
"control_plane": {"operation": "query"},
85+
},
86+
)
87+
assert exc_info.value.reason == "run_not_active"
88+
assert exc_info.value.status == 409
89+
5790
def test_409_update_rejected(self) -> None:
5891
with pytest.raises(UpdateRejected):
5992
_raise_for_status(409, {"reason": "update_rejected", "message": "rejected by handler"})
@@ -67,6 +100,32 @@ def test_422_invalid(self) -> None:
67100
_raise_for_status(422, {"message": "bad", "errors": {"f": ["req"]}})
68101
assert exc_info.value.errors == {"f": ["req"]}
69102

103+
def test_422_invalid_signal_arguments(self) -> None:
104+
body = {
105+
"reason": "invalid_signal_arguments",
106+
"message": "signal argument validation failed",
107+
"validation_errors": {"name": ["The name argument must be a string."]},
108+
}
109+
with pytest.raises(SignalFailed) as exc_info:
110+
_raise_for_status(422, body)
111+
assert exc_info.value.reason == "invalid_signal_arguments"
112+
assert exc_info.value.status == 422
113+
assert exc_info.value.body == body
114+
assert exc_info.value.validation_errors == {"name": ["The name argument must be a string."]}
115+
116+
def test_422_invalid_query_arguments(self) -> None:
117+
body = {
118+
"reason": "invalid_query_arguments",
119+
"message": "query argument validation failed",
120+
"validation_errors": {"prefix": ["The prefix argument is required."]},
121+
}
122+
with pytest.raises(QueryFailed) as exc_info:
123+
_raise_for_status(422, body)
124+
assert exc_info.value.reason == "invalid_query_arguments"
125+
assert exc_info.value.status == 422
126+
assert exc_info.value.body == body
127+
assert exc_info.value.validation_errors == {"prefix": ["The prefix argument is required."]}
128+
70129
def test_500_server_error(self) -> None:
71130
with pytest.raises(ServerError) as exc_info:
72131
_raise_for_status(500, {"message": "internal"})

0 commit comments

Comments
 (0)