Skip to content

feat(openworkflow, dashboard, docs): name-addressed signals#400

Open
jamescmartinez wants to merge 7 commits intomainfrom
signals-name-addressed
Open

feat(openworkflow, dashboard, docs): name-addressed signals#400
jamescmartinez wants to merge 7 commits intomainfrom
signals-name-addressed

Conversation

@jamescmartinez
Copy link
Copy Markdown
Contributor

@jamescmartinez jamescmartinez commented Mar 27, 2026

🚧 wip alternative to #385

This PR adds workflow signal support so runs can durably wait events and resume when a matching signal arrives.

Unlike #385, these signals are name-addressed (an arbitrary string like payment:${invoiceId}).

This enables fanout, but removes signal buffering. if sendSignal() is called and no workflow is waiting on that signal name, the signal is lost.

Changes

  • ow.sendSignal(), step.sendSignal(), and step.waitForSignal()
  • Persist and consume signals in SQLite/Postgres (w/ migrations)
  • Update docs

Copilot AI review requested due to automatic review settings March 27, 2026 18:27
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class “signals” to OpenWorkflow so workflows can durably wait on a named event and resume when a matching signal is delivered (with SQLite/Postgres persistence, worker support, dashboard labeling, and docs).

Changes:

  • Introduces ow.sendSignal(), step.sendSignal(), and step.waitForSignal() with persisted deliveries in workflow_signals.
  • Extends worker execution to create/complete signal-send and signal-wait step attempts and to park/resume on delivery/timeout.
  • Adds migrations, backend implementations, tests, and documentation for signal semantics and usage.

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
packages/openworkflow/worker/execution.ts Implements signal-send / signal-wait step execution and timeout handling.
packages/openworkflow/worker/execution.test.ts Adds StepExecutor-level tests for sending/waiting/timeout/schema validation/replay-safety.
packages/openworkflow/testing/backend.testsuite.ts Adds backend contract tests for signal fan-out, idempotency, and delivery retrieval.
packages/openworkflow/sqlite/sqlite.ts Adds SQLite migration for workflow_signals table + indexes.
packages/openworkflow/sqlite/backend.ts Implements sendSignal() / getSignalDelivery() and wake-up reconciliation in SQLite backend.
packages/openworkflow/postgres/postgres.ts Adds Postgres migration for workflow_signals table + indexes.
packages/openworkflow/postgres/backend.ts Implements sendSignal() / getSignalDelivery() and reconciliation in Postgres backend.
packages/openworkflow/core/workflow-function.ts Extends StepApi types and introduces StepWaitTimeout.
packages/openworkflow/core/step-attempt.ts Extends StepKind and adds SignalWaitStepAttemptContext.
packages/openworkflow/core/step-attempt.test.ts Tests createSignalWaitContext().
packages/openworkflow/core/backend.ts Extends backend interface with signals methods and related types.
packages/openworkflow/client/client.ts Adds OpenWorkflow.sendSignal() client API.
packages/docs/docs/workflows.mdx Updates workflow API docs to mention signal step APIs.
packages/docs/docs/steps.mdx Documents new step types: sendSignal and waitForSignal.
packages/docs/docs/signals.mdx New dedicated Signals documentation page.
packages/docs/docs/roadmap.mdx Marks signals as shipped.
packages/docs/docs/overview.mdx Highlights signals capability in overview.
packages/docs/docs.json Adds Signals doc page to nav.
packages/dashboard/src/routes/runs/$runId.tsx Formats step kind labels for signal steps in run details UI.
ARCHITECTURE.md Documents signal semantics and adds workflow_signals to system diagram.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

PRIMARY KEY ("namespace_id", "id")
);

CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx"
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration creates workflow_signals without a uniqueness constraint tying a delivery to a step_attempt_id. Because getSignalDelivery() later selects by step_attempt_id, multiple rows per attempt would make delivery nondeterministic and effectively buffer signals. Consider adding a UNIQUE constraint (e.g., (namespace_id, step_attempt_id)) and adjusting inserts to use ON CONFLICT semantics so repeated sends don’t create duplicates.

Suggested change
CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx"
CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx"

Copilot uses AI. Check for mistakes.
Comment on lines +264 to +302
// find active waiting step attempts & insert a signal delivery row for each
const waitersStmt = this.db.prepare(`
SELECT "id", "workflow_run_id"
FROM "step_attempts"
WHERE "namespace_id" = ?
AND "kind" = 'signal-wait'
AND "status" = 'running'
AND json_extract("context", '$.signal') = ?
`);
const waiters = waitersStmt.all(this.namespaceId, params.signal) as {
id: string;
workflow_run_id: string;
}[];

if (waiters.length === 0) {
this.db.exec("COMMIT");
return Promise.resolve({ workflowRunIds: [] });
}

const insertStmt = this.db.prepare(`
INSERT INTO "workflow_signals" (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const w of waiters) {
insertStmt.run(
this.namespaceId,
generateUUID(),
params.signal,
toJSON(params.data),
params.idempotencyKey,
w.workflow_run_id,
w.id,
currentTime,
);
}
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendSignal() can insert multiple deliveries for the same signal-wait step attempt (e.g., if the same signal is sent twice before the waiter completes). Since getSignalDelivery() later does LIMIT 1 with no ordering, the payload consumed becomes nondeterministic and signals become effectively buffered/duplicated despite docs saying they are not buffered. Consider enforcing at-most-one delivery per step_attempt_id (e.g., a UNIQUE constraint on (namespace_id, step_attempt_id) plus INSERT-or-ignore / upsert) and/or filtering waiters to exclude attempts that already have a delivery row, so repeated sends are dropped for that waiter.

Copilot uses AI. Check for mistakes.
Comment on lines +288 to +304
await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
`;
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Postgres, sendSignal() isn't concurrency-safe for idempotency: two concurrent calls with the same idempotencyKey can both observe no existing rows, then both insert deliveries (there’s no uniqueness constraint preventing duplicates). Additionally, repeated sends (with or without idempotency) can create multiple deliveries per step_attempt_id, and getSignalDelivery() reads an arbitrary row via LIMIT 1. Consider adding a uniqueness constraint (at least (namespace_id, step_attempt_id), and/or (namespace_id, sender_idempotency_key, step_attempt_id)), using INSERT .. ON CONFLICT DO NOTHING/UPDATE, and structuring the transaction so idempotency is enforced even under concurrent requests.

Suggested change
await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
`;
// When an idempotency key is provided, enforce idempotency at the
// database level so concurrent sendSignal() calls cannot insert
// duplicate deliveries for the same step attempt.
if (params.idempotencyKey != null) {
await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
ON CONFLICT ("namespace_id", "sender_idempotency_key", "step_attempt_id")
DO NOTHING
`;
} else {
// Preserve existing behavior (potentially multiple deliveries)
// when no idempotency key is supplied.
await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
`;
}

Copilot uses AI. Check for mistakes.
"created_at" TEXT NOT NULL,
PRIMARY KEY ("namespace_id", "id")
);

Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow_signals table schema allows multiple rows for the same (namespace_id, step_attempt_id). Since signal waits are consumed by step_attempt_id, allowing multiple rows makes getSignalDelivery() ambiguous and enables unbounded buffering for a single waiter. Consider adding a UNIQUE index/constraint on (namespace_id, step_attempt_id) (and updating inserts to be conflict-tolerant) to enforce at-most-one delivery per waiting step attempt.

Suggested change
CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_namespace_step_attempt_unique"
ON "workflow_signals" ("namespace_id", "step_attempt_id");

Copilot uses AI. Check for mistakes.
@jamescmartinez jamescmartinez force-pushed the signals-name-addressed branch from d479203 to 290a938 Compare March 27, 2026 18:51
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new bot commented Mar 27, 2026

Open in StackBlitz

npm i https://pkg.pr.new/openworkflowdev/openworkflow/@openworkflow/cli@400
npm i https://pkg.pr.new/openworkflowdev/openworkflow/@openworkflow/dashboard@400
npm i https://pkg.pr.new/openworkflowdev/openworkflow@400

commit: 9de1d0b

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 27, 2026

Codecov Report

❌ Patch coverage is 98.70968% with 2 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
packages/openworkflow/worker/execution.ts 97.53% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copilot AI review requested due to automatic review settings March 27, 2026 22:29
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +254 to +256
return await this.pg.begin(async (tx): Promise<SendSignalResult> => {
// eslint-disable-next-line sonarjs/todo-tag
const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a TODO and a type cast (tx as unknown as Postgres) left in sendSignal(). Please remove the TODO before merge and use a properly-typed transaction client (or refactor helper methods to accept the transaction type) so this doesn't ship as a known-cleanup item.

Suggested change
return await this.pg.begin(async (tx): Promise<SendSignalResult> => {
// eslint-disable-next-line sonarjs/todo-tag
const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR
return await this.pg.begin(async (sql: Postgres): Promise<SendSignalResult> => {

Copilot uses AI. Check for mistakes.
Comment on lines +260 to +269
if (params.idempotencyKey !== null) {
const existing = await sql<{ workflowRunId: string }[]>`
SELECT DISTINCT "workflow_run_id" AS "workflowRunId"
FROM ${workflowSignalsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "sender_idempotency_key" = ${params.idempotencyKey}
`;
if (existing.length > 0) {
return { workflowRunIds: existing.map((r) => r.workflowRunId) };
}
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idempotency lookup for sendSignal is scoped only to sender_idempotency_key (and namespace). Reusing the same idempotency key across different signal names would incorrectly dedupe and return unrelated results. Include signal in the lookup (and consider an index/constraint that matches the intended (signal, sender_idempotency_key) semantics).

Copilot uses AI. Check for mistakes.
Comment on lines +288 to +327
for (const w of waiters) {
await sql`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${sql.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING
`;
}

// wake each waiting workflow run
const runIds = [...new Set(waiters.map((w) => w.workflowRunId))];
await sql`
UPDATE ${workflowRunsTable}
SET
"available_at" = CASE
WHEN "available_at" IS NULL OR "available_at" > NOW()
THEN NOW()
ELSE "available_at"
END,
"updated_at" = NOW()
WHERE "namespace_id" = ${this.namespaceId}
AND "id" = ANY(${runIds})
AND "status" IN ('pending', 'running', 'sleeping')
AND "worker_id" IS NULL
`;

return { workflowRunIds: runIds };
});
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendSignal() returns workflowRunIds for all matching waiters even if the INSERT ... ON CONFLICT DO NOTHING skipped inserting a delivery row (e.g. duplicate sends to the same signal-wait step attempt). This can make the return value imply a workflow run received this signal when it actually already had a prior delivery. Consider returning only newly-delivered run IDs, or clarify the API semantics/docs.

Copilot uses AI. Check for mistakes.
Comment on lines +283 to +306
const insertStmt = this.db.prepare(`
INSERT OR IGNORE INTO "workflow_signals" (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const w of waiters) {
insertStmt.run(
this.namespaceId,
generateUUID(),
params.signal,
toJSON(params.data),
params.idempotencyKey,
w.workflow_run_id,
w.id,
currentTime,
);
}

// wake each waiting workflow run
const runIds = [...new Set(waiters.map((w) => w.workflow_run_id))];
const wakeStmt = this.db.prepare(`
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendSignal returns workflowRunIds based on the current waiters, even if INSERT OR IGNORE skipped inserting a delivery row for some/all of them (e.g. duplicate sends to the same running signal-wait step attempt). That can make the return value claim a workflow "received" a signal when it actually did not (because a prior delivery already exists). Consider returning only the run IDs for which a new delivery row was inserted, or adjust the API/docs to clarify the semantics.

Copilot uses AI. Check for mistakes.
ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id");

CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx"
ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key")
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration adds an idempotency index on (namespace_id, sender_idempotency_key) only. If sendSignal() dedupe is meant to be per-signal (to avoid cross-signal collisions), consider including signal in the index and/or adding a uniqueness constraint that matches the dedupe semantics.

Suggested change
ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key")
ON ${quotedSchema}."workflow_signals" ("namespace_id", "signal", "sender_idempotency_key")

Copilot uses AI. Check for mistakes.
Comment on lines +179 to +180
If a signal with the same idempotency key has already been sent, the call
returns the original result without re-delivering.
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs claim reusing the same idempotency key returns the original result without re-delivering. With the current backend behavior, if the first send had no active waiters (signal dropped), no row is written, so retrying later with the same idempotency key can still deliver once waiters exist. Either persist idempotency even when there are no waiters, or clarify this limitation in the docs.

Suggested change
If a signal with the same idempotency key has already been sent, the call
returns the original result without re-delivering.
If a signal with the same idempotency key has already been sent **and that send
had at least one active waiter**, the call returns the original result without
re-delivering. If the original send occurred when there were no active waiters
(the signal was effectively dropped), it is not recorded for idempotency, and a
later retry with the same key can still deliver once waiters exist.

Copilot uses AI. Check for mistakes.
Comment on lines 130 to 131
* Retry policy for workflow step failures (no retries - the child workflow
* is responsible for retries).
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above TERMINAL_STEP_RETRY_POLICY still describes it as specific to "workflow step failures". This policy is now also used for other terminal step types (e.g. signal send/wait), so the comment is misleading. Update the comment to reflect that it is the no-retry policy for terminal/non-retryable step failures in general.

Suggested change
* Retry policy for workflow step failures (no retries - the child workflow
* is responsible for retries).
* Retry policy for terminal/non-retryable step failures (no retries, the
* caller or child workflow is responsible for handling retries).

Copilot uses AI. Check for mistakes.
Comment on lines +247 to +262
if (params.idempotencyKey !== null) {
const existingStmt = this.db.prepare(`
SELECT DISTINCT "workflow_run_id"
FROM "workflow_signals"
WHERE "namespace_id" = ? AND "sender_idempotency_key" = ?
`);
const existing = existingStmt.all(
this.namespaceId,
params.idempotencyKey,
) as { workflow_run_id: string }[];
if (existing.length > 0) {
this.db.exec("COMMIT");
return Promise.resolve({
workflowRunIds: existing.map((r) => r.workflow_run_id),
});
}
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idempotency lookup for sendSignal is keyed only by sender_idempotency_key (and namespace). If a caller reuses the same idempotency key for different signals, the later call will be treated as a duplicate of the earlier one and return unrelated workflowRunIds. Scope the dedupe to (signal, sender_idempotency_key) (and ideally enforce it at the DB/index level) to avoid cross-signal collisions.

Copilot uses AI. Check for mistakes.
ON "workflow_signals" ("namespace_id", "step_attempt_id");

CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx"
ON "workflow_signals" ("namespace_id", "sender_idempotency_key")
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration creates an idempotency index on (namespace_id, sender_idempotency_key) only. Since sendSignal() dedupe should be scoped to the signal name as well (to avoid cross-signal idempotency-key collisions), consider including signal in the index (and/or adding a uniqueness constraint appropriate to the intended semantics).

Suggested change
ON "workflow_signals" ("namespace_id", "sender_idempotency_key")
ON "workflow_signals" ("namespace_id", "signal", "sender_idempotency_key")

Copilot uses AI. Check for mistakes.
@jamescmartinez jamescmartinez changed the title feat(openworkflow, dashboard, docs): signals feat(openworkflow, dashboard, docs): name-addressed signals Mar 28, 2026
Copilot AI review requested due to automatic review settings March 31, 2026 22:19
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +67 to +81
sendSignal: (
options: Readonly<{
name?: string;
signal: string;
data?: JsonValue;
}>,
) => Promise<{ workflowRunIds: string[] }>;
waitForSignal: <Output>(
options: Readonly<{
name?: string;
signal: string;
timeout?: StepWaitTimeout;
schema?: StandardSchemaV1<unknown, Output>;
}>,
) => Promise<Output | null>;
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForSignal() uses null as the timeout sentinel, but signal payloads can also be null (since sendSignal defaults data to null). That makes “timed out” indistinguishable from “signal delivered with null payload” for callers and can lead to incorrect behavior. Consider changing the API to return a discriminated result (e.g., { timedOut: true } | { timedOut: false; data: Output }), or disallow null payloads at the type/runtime level and document/enforce it consistently.

Copilot uses AI. Check for mistakes.
Comment on lines +1118 to +1125
const existingWaiter = this.activeSignalWaitStepNameBySignal.get(
options.signal,
);
if (existingWaiter && existingWaiter !== stepName) {
throw new Error(
`Signal "${options.signal}" is already being waited on by step "${existingWaiter}"`,
);
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate-signal waiter guard prevents a workflow from waiting on the same signal name in two different steps, even though resolveStepName() auto-indexes duplicates for other step APIs and the docs/architecture describe collision auto-indexing across all step APIs. Either remove this restriction (and track multiple stepNames per signal), or explicitly document that at most one active waitForSignal per signal is allowed per workflow run and ensure the architecture/docs reflect that limitation.

Copilot uses AI. Check for mistakes.
Comment on lines +253 to +316
async sendSignal(params: SendSignalParams): Promise<SendSignalResult> {
return await this.pg.begin(async (sql): Promise<SendSignalResult> => {
const tx = sql as unknown as Postgres;
const stepAttemptsTable = this.stepAttemptsTable(tx);
const workflowSignalsTable = this.workflowSignalsTable(tx);

if (params.idempotencyKey !== null) {
const existing = await tx<{ workflowRunId: string }[]>`
SELECT DISTINCT "workflow_run_id" AS "workflowRunId"
FROM ${workflowSignalsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "signal" = ${params.signal}
AND "sender_idempotency_key" = ${params.idempotencyKey}
`;
if (existing.length > 0) {
return { workflowRunIds: existing.map((r) => r.workflowRunId) };
}
}
const workflowRunsTable = this.workflowRunsTable(tx);

// find active waiting step attempts & insert a signal delivery row for each
const waiters = await tx<{ id: string; workflowRunId: string }[]>`
SELECT "id", "workflow_run_id" AS "workflowRunId"
FROM ${stepAttemptsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "kind" = 'signal-wait'
AND "status" = 'running'
AND "context"->>'signal' = ${params.signal}
FOR UPDATE
`;

if (waiters.length === 0) {
return { workflowRunIds: [] };
}

const deliveredRunIds = new Set<string>();
for (const w of waiters) {
const inserted = await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING
RETURNING "workflow_run_id"
`;
if (inserted.length > 0) {
deliveredRunIds.add(w.workflowRunId);
}
}

if (deliveredRunIds.size === 0) {
return { workflowRunIds: [] };
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres sendSignal() idempotency is not concurrency-safe: two concurrent calls with the same idempotencyKey can both see no existing rows, then the second transaction’s inserts will all DO NOTHING on the unique (namespace_id, step_attempt_id) constraint and return workflowRunIds: [] instead of the original result. To guarantee idempotency under concurrency, add a serialization mechanism keyed by (namespace_id, signal, idempotencyKey) (e.g., pg_advisory_xact_lock, or a separate signal_sends table/unique constraint that records the send result).

Copilot uses AI. Check for mistakes.
Comment on lines +116 to +140
## Timeout

`step.waitForSignal` accepts an optional `timeout`. If the signal doesn't
arrive before the timeout, the step resolves with `null` instead of blocking
forever.

```ts
const result = await step.waitForSignal({
signal: `approval:${orderId}`,
timeout: "24h",
});

if (result === null) {
// timed out — no signal arrived within 24 hours
await step.run({ name: "escalate" }, async () => {
await alerts.send("Approval timed out");
});
}
```

`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a
number of milliseconds, or a `Date`.

If no timeout is specified, the wait defaults to **1 year**.

Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Signals docs state that waitForSignal() returns null on timeout, but they don’t mention that null may also be a valid signal payload (since sendSignal can send data: null). Without a contract here, callers can’t reliably distinguish timeout vs a delivered-null payload. Please document the limitation or adjust the API/result shape so timeouts are unambiguous.

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings April 2, 2026 21:13
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +314 to +335
// wake each waiting workflow run
const runIds = [...deliveredRunIds];
const wakeStmt = this.db.prepare(`
UPDATE "workflow_runs"
SET
"available_at" = CASE
WHEN "available_at" IS NULL OR "available_at" > ? THEN ?
ELSE "available_at"
END,
"updated_at" = ?
WHERE "namespace_id" = ?
AND "id" = ?
AND "status" IN ('pending', 'running', 'sleeping')
AND "worker_id" IS NULL
`);
for (const runId of runIds) {
wakeStmt.run(
currentTime,
currentTime,
currentTime,
this.namespaceId,
runId,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLite sendSignal() wakes runs with one UPDATE per workflowRunId inside a loop. For large fan-out signals this can become a write bottleneck; consider batching into a single UPDATE with an IN (...) clause (or a temp table) so wake-up is O(1) statements per send.

Suggested change
// wake each waiting workflow run
const runIds = [...deliveredRunIds];
const wakeStmt = this.db.prepare(`
UPDATE "workflow_runs"
SET
"available_at" = CASE
WHEN "available_at" IS NULL OR "available_at" > ? THEN ?
ELSE "available_at"
END,
"updated_at" = ?
WHERE "namespace_id" = ?
AND "id" = ?
AND "status" IN ('pending', 'running', 'sleeping')
AND "worker_id" IS NULL
`);
for (const runId of runIds) {
wakeStmt.run(
currentTime,
currentTime,
currentTime,
this.namespaceId,
runId,
// wake waiting workflow runs in batches to avoid one UPDATE per run ID
const runIds = [...deliveredRunIds];
const maxWakeBatchSize = 500;
for (let i = 0; i < runIds.length; i += maxWakeBatchSize) {
const runIdBatch = runIds.slice(i, i + maxWakeBatchSize);
const placeholders = runIdBatch.map(() => "?").join(", ");
const wakeStmt = this.db.prepare(`
UPDATE "workflow_runs"
SET
"available_at" = CASE
WHEN "available_at" IS NULL OR "available_at" > ? THEN ?
ELSE "available_at"
END,
"updated_at" = ?
WHERE "namespace_id" = ?
AND "id" IN (${placeholders})
AND "status" IN ('pending', 'running', 'sleeping')
AND "worker_id" IS NULL
`);
wakeStmt.run(
currentTime,
currentTime,
currentTime,
this.namespaceId,
...runIdBatch,

Copilot uses AI. Check for mistakes.
Comment on lines +273 to +313
// find active waiting step attempts & insert a signal delivery row for each
const waiters = await tx<{ id: string; workflowRunId: string }[]>`
SELECT "id", "workflow_run_id" AS "workflowRunId"
FROM ${stepAttemptsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "kind" = 'signal-wait'
AND "status" = 'running'
AND "context"->>'signal' = ${params.signal}
FOR UPDATE
`;

if (waiters.length === 0) {
return { workflowRunIds: [] };
}

const deliveredRunIds = new Set<string>();
for (const w of waiters) {
const inserted = await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING
RETURNING "workflow_run_id"
`;
if (inserted.length > 0) {
deliveredRunIds.add(w.workflowRunId);
}
}

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendSignal() inserts one workflow_signals row per waiter via a per-row INSERT in a loop, causing N round-trips inside the transaction for fan-out scenarios. Consider a set-based INSERT ... SELECT (with ON CONFLICT DO NOTHING) that returns affected workflow_run_id values to reduce query overhead.

Suggested change
// find active waiting step attempts & insert a signal delivery row for each
const waiters = await tx<{ id: string; workflowRunId: string }[]>`
SELECT "id", "workflow_run_id" AS "workflowRunId"
FROM ${stepAttemptsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "kind" = 'signal-wait'
AND "status" = 'running'
AND "context"->>'signal' = ${params.signal}
FOR UPDATE
`;
if (waiters.length === 0) {
return { workflowRunIds: [] };
}
const deliveredRunIds = new Set<string>();
for (const w of waiters) {
const inserted = await tx`
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
VALUES (
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
${w.workflowRunId},
${w.id},
NOW()
)
ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING
RETURNING "workflow_run_id"
`;
if (inserted.length > 0) {
deliveredRunIds.add(w.workflowRunId);
}
}
// find active waiting step attempts, lock them, and insert signal deliveries in one statement
const insertedSignals = await tx<{ workflowRunId: string }[]>`
WITH waiters AS (
SELECT "id", "workflow_run_id" AS "workflowRunId"
FROM ${stepAttemptsTable}
WHERE "namespace_id" = ${this.namespaceId}
AND "kind" = 'signal-wait'
AND "status" = 'running'
AND "context"->>'signal' = ${params.signal}
FOR UPDATE
),
inserted AS (
INSERT INTO ${workflowSignalsTable} (
"namespace_id", "id", "signal", "data",
"sender_idempotency_key", "workflow_run_id",
"step_attempt_id", "created_at"
)
SELECT
${this.namespaceId},
gen_random_uuid(),
${params.signal},
${tx.json(params.data)},
${params.idempotencyKey},
waiters."workflowRunId",
waiters."id",
NOW()
FROM waiters
ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING
RETURNING "workflow_run_id" AS "workflowRunId"
)
SELECT "workflowRunId"
FROM inserted
`;
const deliveredRunIds = new Set(
insertedSignals.map((inserted) => inserted.workflowRunId),
);

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants