feat(openworkflow, dashboard, docs): name-addressed signals#400
feat(openworkflow, dashboard, docs): name-addressed signals#400jamescmartinez wants to merge 7 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds first-class “signals” to OpenWorkflow so workflows can durably wait on a named event and resume when a matching signal is delivered (with SQLite/Postgres persistence, worker support, dashboard labeling, and docs).
Changes:
- Introduces
ow.sendSignal(),step.sendSignal(), andstep.waitForSignal()with persisted deliveries inworkflow_signals. - Extends worker execution to create/complete
signal-sendandsignal-waitstep attempts and to park/resume on delivery/timeout. - Adds migrations, backend implementations, tests, and documentation for signal semantics and usage.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/openworkflow/worker/execution.ts | Implements signal-send / signal-wait step execution and timeout handling. |
| packages/openworkflow/worker/execution.test.ts | Adds StepExecutor-level tests for sending/waiting/timeout/schema validation/replay-safety. |
| packages/openworkflow/testing/backend.testsuite.ts | Adds backend contract tests for signal fan-out, idempotency, and delivery retrieval. |
| packages/openworkflow/sqlite/sqlite.ts | Adds SQLite migration for workflow_signals table + indexes. |
| packages/openworkflow/sqlite/backend.ts | Implements sendSignal() / getSignalDelivery() and wake-up reconciliation in SQLite backend. |
| packages/openworkflow/postgres/postgres.ts | Adds Postgres migration for workflow_signals table + indexes. |
| packages/openworkflow/postgres/backend.ts | Implements sendSignal() / getSignalDelivery() and reconciliation in Postgres backend. |
| packages/openworkflow/core/workflow-function.ts | Extends StepApi types and introduces StepWaitTimeout. |
| packages/openworkflow/core/step-attempt.ts | Extends StepKind and adds SignalWaitStepAttemptContext. |
| packages/openworkflow/core/step-attempt.test.ts | Tests createSignalWaitContext(). |
| packages/openworkflow/core/backend.ts | Extends backend interface with signals methods and related types. |
| packages/openworkflow/client/client.ts | Adds OpenWorkflow.sendSignal() client API. |
| packages/docs/docs/workflows.mdx | Updates workflow API docs to mention signal step APIs. |
| packages/docs/docs/steps.mdx | Documents new step types: sendSignal and waitForSignal. |
| packages/docs/docs/signals.mdx | New dedicated Signals documentation page. |
| packages/docs/docs/roadmap.mdx | Marks signals as shipped. |
| packages/docs/docs/overview.mdx | Highlights signals capability in overview. |
| packages/docs/docs.json | Adds Signals doc page to nav. |
| packages/dashboard/src/routes/runs/$runId.tsx | Formats step kind labels for signal steps in run details UI. |
| ARCHITECTURE.md | Documents signal semantics and adds workflow_signals to system diagram. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| PRIMARY KEY ("namespace_id", "id") | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" |
There was a problem hiding this comment.
The migration creates workflow_signals without a uniqueness constraint tying a delivery to a step_attempt_id. Because getSignalDelivery() later selects by step_attempt_id, multiple rows per attempt would make delivery nondeterministic and effectively buffer signals. Consider adding a UNIQUE constraint (e.g., (namespace_id, step_attempt_id)) and adjusting inserts to use ON CONFLICT semantics so repeated sends don’t create duplicates.
| CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" | |
| CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" |
| // find active waiting step attempts & insert a signal delivery row for each | ||
| const waitersStmt = this.db.prepare(` | ||
| SELECT "id", "workflow_run_id" | ||
| FROM "step_attempts" | ||
| WHERE "namespace_id" = ? | ||
| AND "kind" = 'signal-wait' | ||
| AND "status" = 'running' | ||
| AND json_extract("context", '$.signal') = ? | ||
| `); | ||
| const waiters = waitersStmt.all(this.namespaceId, params.signal) as { | ||
| id: string; | ||
| workflow_run_id: string; | ||
| }[]; | ||
|
|
||
| if (waiters.length === 0) { | ||
| this.db.exec("COMMIT"); | ||
| return Promise.resolve({ workflowRunIds: [] }); | ||
| } | ||
|
|
||
| const insertStmt = this.db.prepare(` | ||
| INSERT INTO "workflow_signals" ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | ||
| `); | ||
| for (const w of waiters) { | ||
| insertStmt.run( | ||
| this.namespaceId, | ||
| generateUUID(), | ||
| params.signal, | ||
| toJSON(params.data), | ||
| params.idempotencyKey, | ||
| w.workflow_run_id, | ||
| w.id, | ||
| currentTime, | ||
| ); | ||
| } |
There was a problem hiding this comment.
sendSignal() can insert multiple deliveries for the same signal-wait step attempt (e.g., if the same signal is sent twice before the waiter completes). Since getSignalDelivery() later does LIMIT 1 with no ordering, the payload consumed becomes nondeterministic and signals become effectively buffered/duplicated despite docs saying they are not buffered. Consider enforcing at-most-one delivery per step_attempt_id (e.g., a UNIQUE constraint on (namespace_id, step_attempt_id) plus INSERT-or-ignore / upsert) and/or filtering waiters to exclude attempts that already have a delivery row, so repeated sends are dropped for that waiter.
| await tx` | ||
| INSERT INTO ${workflowSignalsTable} ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES ( | ||
| ${this.namespaceId}, | ||
| gen_random_uuid(), | ||
| ${params.signal}, | ||
| ${tx.json(params.data)}, | ||
| ${params.idempotencyKey}, | ||
| ${w.workflowRunId}, | ||
| ${w.id}, | ||
| NOW() | ||
| ) | ||
| `; |
There was a problem hiding this comment.
In Postgres, sendSignal() isn't concurrency-safe for idempotency: two concurrent calls with the same idempotencyKey can both observe no existing rows, then both insert deliveries (there’s no uniqueness constraint preventing duplicates). Additionally, repeated sends (with or without idempotency) can create multiple deliveries per step_attempt_id, and getSignalDelivery() reads an arbitrary row via LIMIT 1. Consider adding a uniqueness constraint (at least (namespace_id, step_attempt_id), and/or (namespace_id, sender_idempotency_key, step_attempt_id)), using INSERT .. ON CONFLICT DO NOTHING/UPDATE, and structuring the transaction so idempotency is enforced even under concurrent requests.
| await tx` | |
| INSERT INTO ${workflowSignalsTable} ( | |
| "namespace_id", "id", "signal", "data", | |
| "sender_idempotency_key", "workflow_run_id", | |
| "step_attempt_id", "created_at" | |
| ) | |
| VALUES ( | |
| ${this.namespaceId}, | |
| gen_random_uuid(), | |
| ${params.signal}, | |
| ${tx.json(params.data)}, | |
| ${params.idempotencyKey}, | |
| ${w.workflowRunId}, | |
| ${w.id}, | |
| NOW() | |
| ) | |
| `; | |
| // When an idempotency key is provided, enforce idempotency at the | |
| // database level so concurrent sendSignal() calls cannot insert | |
| // duplicate deliveries for the same step attempt. | |
| if (params.idempotencyKey != null) { | |
| await tx` | |
| INSERT INTO ${workflowSignalsTable} ( | |
| "namespace_id", "id", "signal", "data", | |
| "sender_idempotency_key", "workflow_run_id", | |
| "step_attempt_id", "created_at" | |
| ) | |
| VALUES ( | |
| ${this.namespaceId}, | |
| gen_random_uuid(), | |
| ${params.signal}, | |
| ${tx.json(params.data)}, | |
| ${params.idempotencyKey}, | |
| ${w.workflowRunId}, | |
| ${w.id}, | |
| NOW() | |
| ) | |
| ON CONFLICT ("namespace_id", "sender_idempotency_key", "step_attempt_id") | |
| DO NOTHING | |
| `; | |
| } else { | |
| // Preserve existing behavior (potentially multiple deliveries) | |
| // when no idempotency key is supplied. | |
| await tx` | |
| INSERT INTO ${workflowSignalsTable} ( | |
| "namespace_id", "id", "signal", "data", | |
| "sender_idempotency_key", "workflow_run_id", | |
| "step_attempt_id", "created_at" | |
| ) | |
| VALUES ( | |
| ${this.namespaceId}, | |
| gen_random_uuid(), | |
| ${params.signal}, | |
| ${tx.json(params.data)}, | |
| ${params.idempotencyKey}, | |
| ${w.workflowRunId}, | |
| ${w.id}, | |
| NOW() | |
| ) | |
| `; | |
| } |
| "created_at" TEXT NOT NULL, | ||
| PRIMARY KEY ("namespace_id", "id") | ||
| ); | ||
|
|
There was a problem hiding this comment.
The workflow_signals table schema allows multiple rows for the same (namespace_id, step_attempt_id). Since signal waits are consumed by step_attempt_id, allowing multiple rows makes getSignalDelivery() ambiguous and enables unbounded buffering for a single waiter. Consider adding a UNIQUE index/constraint on (namespace_id, step_attempt_id) (and updating inserts to be conflict-tolerant) to enforce at-most-one delivery per waiting step attempt.
| CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_namespace_step_attempt_unique" | |
| ON "workflow_signals" ("namespace_id", "step_attempt_id"); |
d479203 to
290a938
Compare
commit: |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return await this.pg.begin(async (tx): Promise<SendSignalResult> => { | ||
| // eslint-disable-next-line sonarjs/todo-tag | ||
| const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR |
There was a problem hiding this comment.
There is a TODO and a type cast (tx as unknown as Postgres) left in sendSignal(). Please remove the TODO before merge and use a properly-typed transaction client (or refactor helper methods to accept the transaction type) so this doesn't ship as a known-cleanup item.
| return await this.pg.begin(async (tx): Promise<SendSignalResult> => { | |
| // eslint-disable-next-line sonarjs/todo-tag | |
| const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR | |
| return await this.pg.begin(async (sql: Postgres): Promise<SendSignalResult> => { |
| if (params.idempotencyKey !== null) { | ||
| const existing = await sql<{ workflowRunId: string }[]>` | ||
| SELECT DISTINCT "workflow_run_id" AS "workflowRunId" | ||
| FROM ${workflowSignalsTable} | ||
| WHERE "namespace_id" = ${this.namespaceId} | ||
| AND "sender_idempotency_key" = ${params.idempotencyKey} | ||
| `; | ||
| if (existing.length > 0) { | ||
| return { workflowRunIds: existing.map((r) => r.workflowRunId) }; | ||
| } |
There was a problem hiding this comment.
The idempotency lookup for sendSignal is scoped only to sender_idempotency_key (and namespace). Reusing the same idempotency key across different signal names would incorrectly dedupe and return unrelated results. Include signal in the lookup (and consider an index/constraint that matches the intended (signal, sender_idempotency_key) semantics).
| for (const w of waiters) { | ||
| await sql` | ||
| INSERT INTO ${workflowSignalsTable} ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES ( | ||
| ${this.namespaceId}, | ||
| gen_random_uuid(), | ||
| ${params.signal}, | ||
| ${sql.json(params.data)}, | ||
| ${params.idempotencyKey}, | ||
| ${w.workflowRunId}, | ||
| ${w.id}, | ||
| NOW() | ||
| ) | ||
| ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING | ||
| `; | ||
| } | ||
|
|
||
| // wake each waiting workflow run | ||
| const runIds = [...new Set(waiters.map((w) => w.workflowRunId))]; | ||
| await sql` | ||
| UPDATE ${workflowRunsTable} | ||
| SET | ||
| "available_at" = CASE | ||
| WHEN "available_at" IS NULL OR "available_at" > NOW() | ||
| THEN NOW() | ||
| ELSE "available_at" | ||
| END, | ||
| "updated_at" = NOW() | ||
| WHERE "namespace_id" = ${this.namespaceId} | ||
| AND "id" = ANY(${runIds}) | ||
| AND "status" IN ('pending', 'running', 'sleeping') | ||
| AND "worker_id" IS NULL | ||
| `; | ||
|
|
||
| return { workflowRunIds: runIds }; | ||
| }); |
There was a problem hiding this comment.
sendSignal() returns workflowRunIds for all matching waiters even if the INSERT ... ON CONFLICT DO NOTHING skipped inserting a delivery row (e.g. duplicate sends to the same signal-wait step attempt). This can make the return value imply a workflow run received this signal when it actually already had a prior delivery. Consider returning only newly-delivered run IDs, or clarify the API semantics/docs.
| const insertStmt = this.db.prepare(` | ||
| INSERT OR IGNORE INTO "workflow_signals" ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | ||
| `); | ||
| for (const w of waiters) { | ||
| insertStmt.run( | ||
| this.namespaceId, | ||
| generateUUID(), | ||
| params.signal, | ||
| toJSON(params.data), | ||
| params.idempotencyKey, | ||
| w.workflow_run_id, | ||
| w.id, | ||
| currentTime, | ||
| ); | ||
| } | ||
|
|
||
| // wake each waiting workflow run | ||
| const runIds = [...new Set(waiters.map((w) => w.workflow_run_id))]; | ||
| const wakeStmt = this.db.prepare(` |
There was a problem hiding this comment.
sendSignal returns workflowRunIds based on the current waiters, even if INSERT OR IGNORE skipped inserting a delivery row for some/all of them (e.g. duplicate sends to the same running signal-wait step attempt). That can make the return value claim a workflow "received" a signal when it actually did not (because a prior delivery already exists). Consider returning only the run IDs for which a new delivery row was inserted, or adjust the API/docs to clarify the semantics.
| ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id"); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" | ||
| ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key") |
There was a problem hiding this comment.
The migration adds an idempotency index on (namespace_id, sender_idempotency_key) only. If sendSignal() dedupe is meant to be per-signal (to avoid cross-signal collisions), consider including signal in the index and/or adding a uniqueness constraint that matches the dedupe semantics.
| ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key") | |
| ON ${quotedSchema}."workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") |
| If a signal with the same idempotency key has already been sent, the call | ||
| returns the original result without re-delivering. |
There was a problem hiding this comment.
Docs claim reusing the same idempotency key returns the original result without re-delivering. With the current backend behavior, if the first send had no active waiters (signal dropped), no row is written, so retrying later with the same idempotency key can still deliver once waiters exist. Either persist idempotency even when there are no waiters, or clarify this limitation in the docs.
| If a signal with the same idempotency key has already been sent, the call | |
| returns the original result without re-delivering. | |
| If a signal with the same idempotency key has already been sent **and that send | |
| had at least one active waiter**, the call returns the original result without | |
| re-delivering. If the original send occurred when there were no active waiters | |
| (the signal was effectively dropped), it is not recorded for idempotency, and a | |
| later retry with the same key can still deliver once waiters exist. |
| * Retry policy for workflow step failures (no retries - the child workflow | ||
| * is responsible for retries). |
There was a problem hiding this comment.
The comment above TERMINAL_STEP_RETRY_POLICY still describes it as specific to "workflow step failures". This policy is now also used for other terminal step types (e.g. signal send/wait), so the comment is misleading. Update the comment to reflect that it is the no-retry policy for terminal/non-retryable step failures in general.
| * Retry policy for workflow step failures (no retries - the child workflow | |
| * is responsible for retries). | |
| * Retry policy for terminal/non-retryable step failures (no retries, the | |
| * caller or child workflow is responsible for handling retries). |
| if (params.idempotencyKey !== null) { | ||
| const existingStmt = this.db.prepare(` | ||
| SELECT DISTINCT "workflow_run_id" | ||
| FROM "workflow_signals" | ||
| WHERE "namespace_id" = ? AND "sender_idempotency_key" = ? | ||
| `); | ||
| const existing = existingStmt.all( | ||
| this.namespaceId, | ||
| params.idempotencyKey, | ||
| ) as { workflow_run_id: string }[]; | ||
| if (existing.length > 0) { | ||
| this.db.exec("COMMIT"); | ||
| return Promise.resolve({ | ||
| workflowRunIds: existing.map((r) => r.workflow_run_id), | ||
| }); | ||
| } |
There was a problem hiding this comment.
The idempotency lookup for sendSignal is keyed only by sender_idempotency_key (and namespace). If a caller reuses the same idempotency key for different signals, the later call will be treated as a duplicate of the earlier one and return unrelated workflowRunIds. Scope the dedupe to (signal, sender_idempotency_key) (and ideally enforce it at the DB/index level) to avoid cross-signal collisions.
| ON "workflow_signals" ("namespace_id", "step_attempt_id"); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" | ||
| ON "workflow_signals" ("namespace_id", "sender_idempotency_key") |
There was a problem hiding this comment.
The migration creates an idempotency index on (namespace_id, sender_idempotency_key) only. Since sendSignal() dedupe should be scoped to the signal name as well (to avoid cross-signal idempotency-key collisions), consider including signal in the index (and/or adding a uniqueness constraint appropriate to the intended semantics).
| ON "workflow_signals" ("namespace_id", "sender_idempotency_key") | |
| ON "workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sendSignal: ( | ||
| options: Readonly<{ | ||
| name?: string; | ||
| signal: string; | ||
| data?: JsonValue; | ||
| }>, | ||
| ) => Promise<{ workflowRunIds: string[] }>; | ||
| waitForSignal: <Output>( | ||
| options: Readonly<{ | ||
| name?: string; | ||
| signal: string; | ||
| timeout?: StepWaitTimeout; | ||
| schema?: StandardSchemaV1<unknown, Output>; | ||
| }>, | ||
| ) => Promise<Output | null>; |
There was a problem hiding this comment.
waitForSignal() uses null as the timeout sentinel, but signal payloads can also be null (since sendSignal defaults data to null). That makes “timed out” indistinguishable from “signal delivered with null payload” for callers and can lead to incorrect behavior. Consider changing the API to return a discriminated result (e.g., { timedOut: true } | { timedOut: false; data: Output }), or disallow null payloads at the type/runtime level and document/enforce it consistently.
| const existingWaiter = this.activeSignalWaitStepNameBySignal.get( | ||
| options.signal, | ||
| ); | ||
| if (existingWaiter && existingWaiter !== stepName) { | ||
| throw new Error( | ||
| `Signal "${options.signal}" is already being waited on by step "${existingWaiter}"`, | ||
| ); | ||
| } |
There was a problem hiding this comment.
The duplicate-signal waiter guard prevents a workflow from waiting on the same signal name in two different steps, even though resolveStepName() auto-indexes duplicates for other step APIs and the docs/architecture describe collision auto-indexing across all step APIs. Either remove this restriction (and track multiple stepNames per signal), or explicitly document that at most one active waitForSignal per signal is allowed per workflow run and ensure the architecture/docs reflect that limitation.
| async sendSignal(params: SendSignalParams): Promise<SendSignalResult> { | ||
| return await this.pg.begin(async (sql): Promise<SendSignalResult> => { | ||
| const tx = sql as unknown as Postgres; | ||
| const stepAttemptsTable = this.stepAttemptsTable(tx); | ||
| const workflowSignalsTable = this.workflowSignalsTable(tx); | ||
|
|
||
| if (params.idempotencyKey !== null) { | ||
| const existing = await tx<{ workflowRunId: string }[]>` | ||
| SELECT DISTINCT "workflow_run_id" AS "workflowRunId" | ||
| FROM ${workflowSignalsTable} | ||
| WHERE "namespace_id" = ${this.namespaceId} | ||
| AND "signal" = ${params.signal} | ||
| AND "sender_idempotency_key" = ${params.idempotencyKey} | ||
| `; | ||
| if (existing.length > 0) { | ||
| return { workflowRunIds: existing.map((r) => r.workflowRunId) }; | ||
| } | ||
| } | ||
| const workflowRunsTable = this.workflowRunsTable(tx); | ||
|
|
||
| // find active waiting step attempts & insert a signal delivery row for each | ||
| const waiters = await tx<{ id: string; workflowRunId: string }[]>` | ||
| SELECT "id", "workflow_run_id" AS "workflowRunId" | ||
| FROM ${stepAttemptsTable} | ||
| WHERE "namespace_id" = ${this.namespaceId} | ||
| AND "kind" = 'signal-wait' | ||
| AND "status" = 'running' | ||
| AND "context"->>'signal' = ${params.signal} | ||
| FOR UPDATE | ||
| `; | ||
|
|
||
| if (waiters.length === 0) { | ||
| return { workflowRunIds: [] }; | ||
| } | ||
|
|
||
| const deliveredRunIds = new Set<string>(); | ||
| for (const w of waiters) { | ||
| const inserted = await tx` | ||
| INSERT INTO ${workflowSignalsTable} ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES ( | ||
| ${this.namespaceId}, | ||
| gen_random_uuid(), | ||
| ${params.signal}, | ||
| ${tx.json(params.data)}, | ||
| ${params.idempotencyKey}, | ||
| ${w.workflowRunId}, | ||
| ${w.id}, | ||
| NOW() | ||
| ) | ||
| ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING | ||
| RETURNING "workflow_run_id" | ||
| `; | ||
| if (inserted.length > 0) { | ||
| deliveredRunIds.add(w.workflowRunId); | ||
| } | ||
| } | ||
|
|
||
| if (deliveredRunIds.size === 0) { | ||
| return { workflowRunIds: [] }; | ||
| } |
There was a problem hiding this comment.
Postgres sendSignal() idempotency is not concurrency-safe: two concurrent calls with the same idempotencyKey can both see no existing rows, then the second transaction’s inserts will all DO NOTHING on the unique (namespace_id, step_attempt_id) constraint and return workflowRunIds: [] instead of the original result. To guarantee idempotency under concurrency, add a serialization mechanism keyed by (namespace_id, signal, idempotencyKey) (e.g., pg_advisory_xact_lock, or a separate signal_sends table/unique constraint that records the send result).
| ## Timeout | ||
|
|
||
| `step.waitForSignal` accepts an optional `timeout`. If the signal doesn't | ||
| arrive before the timeout, the step resolves with `null` instead of blocking | ||
| forever. | ||
|
|
||
| ```ts | ||
| const result = await step.waitForSignal({ | ||
| signal: `approval:${orderId}`, | ||
| timeout: "24h", | ||
| }); | ||
|
|
||
| if (result === null) { | ||
| // timed out — no signal arrived within 24 hours | ||
| await step.run({ name: "escalate" }, async () => { | ||
| await alerts.send("Approval timed out"); | ||
| }); | ||
| } | ||
| ``` | ||
|
|
||
| `timeout` accepts a [duration string](/docs/sleeping#duration-formats), a | ||
| number of milliseconds, or a `Date`. | ||
|
|
||
| If no timeout is specified, the wait defaults to **1 year**. | ||
|
|
There was a problem hiding this comment.
The Signals docs state that waitForSignal() returns null on timeout, but they don’t mention that null may also be a valid signal payload (since sendSignal can send data: null). Without a contract here, callers can’t reliably distinguish timeout vs a delivered-null payload. Please document the limitation or adjust the API/result shape so timeouts are unambiguous.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // wake each waiting workflow run | ||
| const runIds = [...deliveredRunIds]; | ||
| const wakeStmt = this.db.prepare(` | ||
| UPDATE "workflow_runs" | ||
| SET | ||
| "available_at" = CASE | ||
| WHEN "available_at" IS NULL OR "available_at" > ? THEN ? | ||
| ELSE "available_at" | ||
| END, | ||
| "updated_at" = ? | ||
| WHERE "namespace_id" = ? | ||
| AND "id" = ? | ||
| AND "status" IN ('pending', 'running', 'sleeping') | ||
| AND "worker_id" IS NULL | ||
| `); | ||
| for (const runId of runIds) { | ||
| wakeStmt.run( | ||
| currentTime, | ||
| currentTime, | ||
| currentTime, | ||
| this.namespaceId, | ||
| runId, |
There was a problem hiding this comment.
SQLite sendSignal() wakes runs with one UPDATE per workflowRunId inside a loop. For large fan-out signals this can become a write bottleneck; consider batching into a single UPDATE with an IN (...) clause (or a temp table) so wake-up is O(1) statements per send.
| // wake each waiting workflow run | |
| const runIds = [...deliveredRunIds]; | |
| const wakeStmt = this.db.prepare(` | |
| UPDATE "workflow_runs" | |
| SET | |
| "available_at" = CASE | |
| WHEN "available_at" IS NULL OR "available_at" > ? THEN ? | |
| ELSE "available_at" | |
| END, | |
| "updated_at" = ? | |
| WHERE "namespace_id" = ? | |
| AND "id" = ? | |
| AND "status" IN ('pending', 'running', 'sleeping') | |
| AND "worker_id" IS NULL | |
| `); | |
| for (const runId of runIds) { | |
| wakeStmt.run( | |
| currentTime, | |
| currentTime, | |
| currentTime, | |
| this.namespaceId, | |
| runId, | |
| // wake waiting workflow runs in batches to avoid one UPDATE per run ID | |
| const runIds = [...deliveredRunIds]; | |
| const maxWakeBatchSize = 500; | |
| for (let i = 0; i < runIds.length; i += maxWakeBatchSize) { | |
| const runIdBatch = runIds.slice(i, i + maxWakeBatchSize); | |
| const placeholders = runIdBatch.map(() => "?").join(", "); | |
| const wakeStmt = this.db.prepare(` | |
| UPDATE "workflow_runs" | |
| SET | |
| "available_at" = CASE | |
| WHEN "available_at" IS NULL OR "available_at" > ? THEN ? | |
| ELSE "available_at" | |
| END, | |
| "updated_at" = ? | |
| WHERE "namespace_id" = ? | |
| AND "id" IN (${placeholders}) | |
| AND "status" IN ('pending', 'running', 'sleeping') | |
| AND "worker_id" IS NULL | |
| `); | |
| wakeStmt.run( | |
| currentTime, | |
| currentTime, | |
| currentTime, | |
| this.namespaceId, | |
| ...runIdBatch, |
| // find active waiting step attempts & insert a signal delivery row for each | ||
| const waiters = await tx<{ id: string; workflowRunId: string }[]>` | ||
| SELECT "id", "workflow_run_id" AS "workflowRunId" | ||
| FROM ${stepAttemptsTable} | ||
| WHERE "namespace_id" = ${this.namespaceId} | ||
| AND "kind" = 'signal-wait' | ||
| AND "status" = 'running' | ||
| AND "context"->>'signal' = ${params.signal} | ||
| FOR UPDATE | ||
| `; | ||
|
|
||
| if (waiters.length === 0) { | ||
| return { workflowRunIds: [] }; | ||
| } | ||
|
|
||
| const deliveredRunIds = new Set<string>(); | ||
| for (const w of waiters) { | ||
| const inserted = await tx` | ||
| INSERT INTO ${workflowSignalsTable} ( | ||
| "namespace_id", "id", "signal", "data", | ||
| "sender_idempotency_key", "workflow_run_id", | ||
| "step_attempt_id", "created_at" | ||
| ) | ||
| VALUES ( | ||
| ${this.namespaceId}, | ||
| gen_random_uuid(), | ||
| ${params.signal}, | ||
| ${tx.json(params.data)}, | ||
| ${params.idempotencyKey}, | ||
| ${w.workflowRunId}, | ||
| ${w.id}, | ||
| NOW() | ||
| ) | ||
| ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING | ||
| RETURNING "workflow_run_id" | ||
| `; | ||
| if (inserted.length > 0) { | ||
| deliveredRunIds.add(w.workflowRunId); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
sendSignal() inserts one workflow_signals row per waiter via a per-row INSERT in a loop, causing N round-trips inside the transaction for fan-out scenarios. Consider a set-based INSERT ... SELECT (with ON CONFLICT DO NOTHING) that returns affected workflow_run_id values to reduce query overhead.
| // find active waiting step attempts & insert a signal delivery row for each | |
| const waiters = await tx<{ id: string; workflowRunId: string }[]>` | |
| SELECT "id", "workflow_run_id" AS "workflowRunId" | |
| FROM ${stepAttemptsTable} | |
| WHERE "namespace_id" = ${this.namespaceId} | |
| AND "kind" = 'signal-wait' | |
| AND "status" = 'running' | |
| AND "context"->>'signal' = ${params.signal} | |
| FOR UPDATE | |
| `; | |
| if (waiters.length === 0) { | |
| return { workflowRunIds: [] }; | |
| } | |
| const deliveredRunIds = new Set<string>(); | |
| for (const w of waiters) { | |
| const inserted = await tx` | |
| INSERT INTO ${workflowSignalsTable} ( | |
| "namespace_id", "id", "signal", "data", | |
| "sender_idempotency_key", "workflow_run_id", | |
| "step_attempt_id", "created_at" | |
| ) | |
| VALUES ( | |
| ${this.namespaceId}, | |
| gen_random_uuid(), | |
| ${params.signal}, | |
| ${tx.json(params.data)}, | |
| ${params.idempotencyKey}, | |
| ${w.workflowRunId}, | |
| ${w.id}, | |
| NOW() | |
| ) | |
| ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING | |
| RETURNING "workflow_run_id" | |
| `; | |
| if (inserted.length > 0) { | |
| deliveredRunIds.add(w.workflowRunId); | |
| } | |
| } | |
| // find active waiting step attempts, lock them, and insert signal deliveries in one statement | |
| const insertedSignals = await tx<{ workflowRunId: string }[]>` | |
| WITH waiters AS ( | |
| SELECT "id", "workflow_run_id" AS "workflowRunId" | |
| FROM ${stepAttemptsTable} | |
| WHERE "namespace_id" = ${this.namespaceId} | |
| AND "kind" = 'signal-wait' | |
| AND "status" = 'running' | |
| AND "context"->>'signal' = ${params.signal} | |
| FOR UPDATE | |
| ), | |
| inserted AS ( | |
| INSERT INTO ${workflowSignalsTable} ( | |
| "namespace_id", "id", "signal", "data", | |
| "sender_idempotency_key", "workflow_run_id", | |
| "step_attempt_id", "created_at" | |
| ) | |
| SELECT | |
| ${this.namespaceId}, | |
| gen_random_uuid(), | |
| ${params.signal}, | |
| ${tx.json(params.data)}, | |
| ${params.idempotencyKey}, | |
| waiters."workflowRunId", | |
| waiters."id", | |
| NOW() | |
| FROM waiters | |
| ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING | |
| RETURNING "workflow_run_id" AS "workflowRunId" | |
| ) | |
| SELECT "workflowRunId" | |
| FROM inserted | |
| `; | |
| const deliveredRunIds = new Set( | |
| insertedSignals.map((inserted) => inserted.workflowRunId), | |
| ); |
🚧 wip alternative to #385
This PR adds workflow signal support so runs can durably wait events and resume when a matching signal arrives.
Unlike #385, these signals are name-addressed (an arbitrary string like
payment:${invoiceId}).This enables fanout, but removes signal buffering. if
sendSignal()is called and no workflow is waiting on that signal name, the signal is lost.Changes