Skip to content

Commit cb19627

Browse files
Apply payload warnings to schedule inputs
Issue: zorporation/durable-workflow#444 Loop-ID: build-01
1 parent 9315fb4 commit cb19627

4 files changed

Lines changed: 117 additions & 8 deletions

File tree

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ restart the worker process with a new id before serving changed workflow code.
156156
- **Polyglot**: Works alongside PHP workers on the same task queue
157157
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
158158
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
159-
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, signal, update, query, or search-attribute payloads reach the server
159+
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, schedule, signal, update, query, or search-attribute payloads reach the server
160160
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
161161
- **Worker interceptors**: Typed hooks around workflow tasks, activity calls, and query tasks for tracing, logging, and custom metrics
162162
- **Metrics hooks**: Pluggable counters and histograms, with an optional Prometheus adapter
@@ -165,9 +165,9 @@ restart the worker process with a new id before serving changed workflow code.
165165

166166
The SDK logs a structured warning before an encoded payload reaches 80% of the
167167
default 2 MiB server payload limit. Warnings include context such as
168-
`workflow_id`, `activity_name`, `signal_name`, `update_name`, `query_name`,
169-
`payload_size`, `threshold_bytes`, and `limit_bytes` when those fields are
170-
known at the call site.
168+
`workflow_id`, `workflow_type`, `activity_name`, `schedule_id`, `signal_name`,
169+
`update_name`, `query_name`, `payload_size`, `threshold_bytes`, and
170+
`limit_bytes` when those fields are known at the call site.
171171

172172
Tune or disable the warning threshold on the client:
173173

src/durable_workflow/client.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import asyncio
2222
import time
23+
from collections.abc import Callable
2324
from dataclasses import dataclass
2425
from importlib.metadata import PackageNotFoundError
2526
from importlib.metadata import version as _pkg_version
@@ -130,12 +131,16 @@ class ScheduleAction:
130131
execution_timeout_seconds: int | None = None
131132
run_timeout_seconds: int | None = None
132133

133-
def to_dict(self) -> dict[str, Any]:
134+
def to_dict(
135+
self,
136+
*,
137+
input_encoder: Callable[[list[Any]], dict[str, str]] | None = None,
138+
) -> dict[str, Any]:
134139
d: dict[str, Any] = {"workflow_type": self.workflow_type}
135140
if self.task_queue is not None:
136141
d["task_queue"] = self.task_queue
137142
if self.input is not None:
138-
d["input"] = serializer.envelope(self.input)
143+
d["input"] = input_encoder(self.input) if input_encoder else serializer.envelope(self.input)
139144
if self.execution_timeout_seconds is not None:
140145
d["execution_timeout_seconds"] = self.execution_timeout_seconds
141146
if self.run_timeout_seconds is not None:
@@ -392,6 +397,7 @@ def _payload_context(
392397
*,
393398
kind: str,
394399
workflow_id: str | None = None,
400+
workflow_type: str | None = None,
395401
run_id: str | None = None,
396402
activity_name: str | None = None,
397403
signal_name: str | None = None,
@@ -403,6 +409,7 @@ def _payload_context(
403409
return serializer.PayloadSizeWarningContext(
404410
kind=kind,
405411
workflow_id=workflow_id,
412+
workflow_type=workflow_type,
406413
run_id=run_id,
407414
activity_name=activity_name,
408415
signal_name=signal_name,
@@ -420,6 +427,7 @@ def _payload_envelope(
420427
kind: str,
421428
codec: str = serializer.AVRO_CODEC,
422429
workflow_id: str | None = None,
430+
workflow_type: str | None = None,
423431
run_id: str | None = None,
424432
activity_name: str | None = None,
425433
signal_name: str | None = None,
@@ -435,6 +443,7 @@ def _payload_envelope(
435443
warning_context=self._payload_context(
436444
kind=kind,
437445
workflow_id=workflow_id,
446+
workflow_type=workflow_type,
438447
run_id=run_id,
439448
activity_name=activity_name,
440449
signal_name=signal_name,
@@ -901,7 +910,15 @@ async def create_schedule(
901910
"""
902911
body: dict[str, Any] = {
903912
"spec": spec.to_dict(),
904-
"action": action.to_dict(),
913+
"action": action.to_dict(
914+
input_encoder=lambda value: self._payload_envelope(
915+
value,
916+
kind="schedule_input",
917+
workflow_type=action.workflow_type,
918+
schedule_id=schedule_id,
919+
task_queue=action.task_queue,
920+
)
921+
),
905922
}
906923
if schedule_id is not None:
907924
body["schedule_id"] = schedule_id
@@ -999,7 +1016,15 @@ async def update_schedule(
9991016
if spec is not None:
10001017
body["spec"] = spec.to_dict()
10011018
if action is not None:
1002-
body["action"] = action.to_dict()
1019+
body["action"] = action.to_dict(
1020+
input_encoder=lambda value: self._payload_envelope(
1021+
value,
1022+
kind="schedule_input",
1023+
workflow_type=action.workflow_type,
1024+
schedule_id=schedule_id,
1025+
task_queue=action.task_queue,
1026+
)
1027+
)
10031028
if overlap_policy is not None:
10041029
body["overlap_policy"] = overlap_policy
10051030
if jitter_seconds is not None:

src/durable_workflow/serializer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PayloadSizeWarningContext:
5454

5555
kind: str
5656
workflow_id: str | None = None
57+
workflow_type: str | None = None
5758
run_id: str | None = None
5859
activity_name: str | None = None
5960
signal_name: str | None = None
@@ -67,6 +68,7 @@ def to_log_context(self) -> dict[str, str]:
6768
values = {
6869
"kind": self.kind,
6970
"workflow_id": self.workflow_id,
71+
"workflow_type": self.workflow_type,
7072
"run_id": self.run_id,
7173
"activity_name": self.activity_name,
7274
"signal_name": self.signal_name,

tests/test_schedules.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import json
4+
import logging
45
from unittest.mock import AsyncMock, patch
56

67
import httpx
@@ -112,6 +113,59 @@ async def test_action_input_uses_codec_envelope(self, client: Client) -> None:
112113
assert action_input["codec"] == "avro"
113114
assert serializer.decode(action_input["blob"], codec="avro") == ["Alice", 42]
114115

116+
@pytest.mark.asyncio
117+
async def test_action_input_warning_uses_client_policy(
118+
self, client: Client, caplog: pytest.LogCaptureFixture
119+
) -> None:
120+
client.payload_size_warning_config = serializer.PayloadSizeWarningConfig(
121+
limit_bytes=10,
122+
threshold_percent=50,
123+
)
124+
resp = _mock_response(201, {"schedule_id": "sched-large", "outcome": "created"})
125+
with (
126+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
127+
caplog.at_level(logging.WARNING, logger="durable_workflow.serializer"),
128+
):
129+
await client.create_schedule(
130+
schedule_id="sched-large",
131+
spec=ScheduleSpec(cron_expressions=["0 * * * *"]),
132+
action=ScheduleAction(
133+
workflow_type="greeter",
134+
task_queue="q1",
135+
input=["this payload is intentionally large"],
136+
),
137+
)
138+
139+
payload = caplog.records[0].durable_workflow_payload
140+
assert payload["kind"] == "schedule_input"
141+
assert payload["workflow_type"] == "greeter"
142+
assert payload["schedule_id"] == "sched-large"
143+
assert payload["task_queue"] == "q1"
144+
assert payload["namespace"] == "ns1"
145+
assert payload["threshold_bytes"] == 5
146+
147+
@pytest.mark.asyncio
148+
async def test_action_input_warning_can_be_disabled(
149+
self, client: Client, caplog: pytest.LogCaptureFixture
150+
) -> None:
151+
client.payload_size_warning_config = None
152+
resp = _mock_response(201, {"schedule_id": "sched-quiet", "outcome": "created"})
153+
with (
154+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
155+
caplog.at_level(logging.WARNING, logger="durable_workflow.serializer"),
156+
):
157+
await client.create_schedule(
158+
schedule_id="sched-quiet",
159+
spec=ScheduleSpec(cron_expressions=["0 * * * *"]),
160+
action=ScheduleAction(
161+
workflow_type="greeter",
162+
task_queue="q1",
163+
input=["this payload is intentionally large"],
164+
),
165+
)
166+
167+
assert caplog.records == []
168+
115169
@pytest.mark.asyncio
116170
async def test_minimal(self, client: Client) -> None:
117171
resp = _mock_response(201, {"schedule_id": "auto-id", "outcome": "created"})
@@ -255,6 +309,34 @@ async def test_update_note(self, client: Client) -> None:
255309
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
256310
assert body["note"] == "Updated note"
257311

312+
@pytest.mark.asyncio
313+
async def test_update_action_input_warning_uses_schedule_id(
314+
self, client: Client, caplog: pytest.LogCaptureFixture
315+
) -> None:
316+
client.payload_size_warning_config = serializer.PayloadSizeWarningConfig(
317+
limit_bytes=10,
318+
threshold_percent=50,
319+
)
320+
resp = _mock_response(200, {"schedule_id": "sched-1", "outcome": "updated"})
321+
with (
322+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
323+
caplog.at_level(logging.WARNING, logger="durable_workflow.serializer"),
324+
):
325+
await client.update_schedule(
326+
"sched-1",
327+
action=ScheduleAction(
328+
workflow_type="ticker",
329+
task_queue="q2",
330+
input=["this update payload is intentionally large"],
331+
),
332+
)
333+
334+
payload = caplog.records[0].durable_workflow_payload
335+
assert payload["kind"] == "schedule_input"
336+
assert payload["workflow_type"] == "ticker"
337+
assert payload["schedule_id"] == "sched-1"
338+
assert payload["task_queue"] == "q2"
339+
258340
@pytest.mark.asyncio
259341
async def test_not_found(self, client: Client) -> None:
260342
resp = _mock_response(404, {"reason": "schedule_not_found", "message": "not found"})

0 commit comments

Comments
 (0)