From 290a938e5c45cae2c9730c1aa64884d640fe1941 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 27 Mar 2026 13:51:34 -0500 Subject: [PATCH 01/11] feat(openworkflow, dashboard, docs): signals --- ARCHITECTURE.md | 46 ++- packages/dashboard/src/routes/runs/$runId.tsx | 17 +- packages/docs/docs.json | 1 + packages/docs/docs/overview.mdx | 2 + packages/docs/docs/roadmap.mdx | 2 +- packages/docs/docs/signals.mdx | 345 ++++++++++++++++++ packages/docs/docs/steps.mdx | 27 +- packages/docs/docs/workflows.mdx | 12 +- packages/openworkflow/client/client.ts | 30 +- packages/openworkflow/core/backend.ts | 20 + .../openworkflow/core/step-attempt.test.ts | 21 ++ packages/openworkflow/core/step-attempt.ts | 36 +- .../openworkflow/core/workflow-function.ts | 27 +- packages/openworkflow/postgres/backend.ts | 134 ++++++- packages/openworkflow/postgres/postgres.ts | 33 ++ packages/openworkflow/sqlite/backend.ts | 153 +++++++- packages/openworkflow/sqlite/sqlite.ts | 31 ++ .../openworkflow/testing/backend.testsuite.ts | 254 ++++++++++++- .../openworkflow/worker/execution.test.ts | 249 ++++++++++++- packages/openworkflow/worker/execution.ts | 298 +++++++++++++-- 20 files changed, 1660 insertions(+), 78 deletions(-) create mode 100644 packages/docs/docs/signals.mdx diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 87a38b79..d824cc1c 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -36,6 +36,10 @@ durable execution with minimal operational complexity. discovery paths, and optional ignore patterns for CLI commands. It typically imports the shared `backend` from `openworkflow/client.*` so app code and CLI use the same connection. +- **Signal**: A named, point-in-time message sent to all workflows currently + waiting on that signal name. Signals carry an optional JSON payload. When + sent, a row is written to `workflow_signals` for each waiting workflow and + the workflow is woken. If no workflow is waiting, the signal is dropped. - **`availableAt`**: A critical timestamp on a workflow run that controls its visibility to workers. It is used for scheduling, heartbeating, crash recovery, and durable timers. @@ -105,6 +109,7 @@ of coordination. There is no separate orchestrator server. | | | - workflow_runs | | - step_attempts | + | - workflow_signals | +------------------------------+ ``` @@ -122,9 +127,10 @@ of coordination. There is no separate orchestrator server. `npx @openworkflow/cli worker start` with auto-discovery of workflow files. - **Backend**: The source of truth. It stores workflow runs and step attempts. - The `workflow_runs` table serves as the job queue for the workers, while the - `step_attempts` table serves as a record of started and completed work, - enabling memoization. + The `workflow_runs` table serves as the job queue for the workers, the + `step_attempts` table serves as a record of started and completed work + enabling memoization, and the `workflow_signals` table records signal + deliveries so a waking workflow can read the payload. ### 2.3. Basic Execution Flow @@ -149,7 +155,7 @@ of coordination. There is no separate orchestrator server. `step_attempt` record with status `running`, executes the step function, and then updates the `step_attempt` to `completed` upon completion. The Worker continues executing inline until the workflow code completes or encounters a - sleep. + durable wait such as sleep, child-workflow waiting, or signal waiting. 6. **State Update**: The Worker updates the Backend with each `step_attempt` as it is created and completed, and updates the status of the `workflow_run` (e.g., `completed`, `running` for parked waits). @@ -234,10 +240,34 @@ target workflow name in `spec`) and `options.timeout` controls the wait timeout (default 1y). When the timeout is reached, the parent step fails but the child workflow continues running independently. -All step APIs (`step.run`, `step.sleep`, and `step.runWorkflow`) share the same -collision logic for durable keys. If duplicate base names are encountered in one -execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, -and so on so each step call maps to a distinct step attempt. +**`step.sendSignal(options)`**: Sends a named signal to all workflows currently +waiting on it. The send is recorded as a step attempt so it won't repeat on +replay. If no workflow is waiting, the signal is silently dropped. + +```ts +await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + +**`step.waitForSignal(options)`**: Parks the workflow until a matching signal +arrives or the timeout expires. When a signal is sent targeting this signal +name, a delivery row is written to `workflow_signals` and the workflow is woken. +If the timeout expires first, the step resolves with `null`. + +```ts +const data = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "7d", +}); +``` + +All step APIs (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, +and `step.waitForSignal`) share the same collision logic for durable keys. If +duplicate base names are encountered in one execution pass, OpenWorkflow +auto-indexes them as `name`, `name:1`, `name:2`, and so on so each step call +maps to a distinct step attempt. ## 4. Error Handling & Retries diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index 4952f4cd..0a22579e 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -352,8 +352,7 @@ function RunDetailsPage() { const config = STEP_STATUS_CONFIG[step.status]; const StatusIcon = config.icon; const iconColor = config.color; - const stepTypeLabel = - step.kind === "function" ? "function" : step.kind; + const stepTypeLabel = formatStepKindLabel(step.kind); const stepDuration = computeDuration( step.startedAt, step.finishedAt, @@ -1045,6 +1044,20 @@ function getDefaultSelectedStepId( return steps.at(-1)?.id ?? null; } +function formatStepKindLabel(kind: string): string { + switch (kind) { + case "signal-send": { + return "signal send"; + } + case "signal-wait": { + return "signal wait"; + } + default: { + return kind; + } + } +} + function getRunStatusHelp(status: string): string { switch (status as WorkflowRunStatus) { case "pending": { diff --git a/packages/docs/docs.json b/packages/docs/docs.json index 13eb3f90..4d34e3bf 100644 --- a/packages/docs/docs.json +++ b/packages/docs/docs.json @@ -42,6 +42,7 @@ "docs/parallel-steps", "docs/dynamic-steps", "docs/child-workflows", + "docs/signals", "docs/retries", "docs/type-safety", "docs/versioning", diff --git a/packages/docs/docs/overview.mdx b/packages/docs/docs/overview.mdx index 75ac999c..08edaf17 100644 --- a/packages/docs/docs/overview.mdx +++ b/packages/docs/docs/overview.mdx @@ -37,6 +37,8 @@ work. - **Memoized steps** prevent duplicate side effects on retries - **Durable sleep** (`step.sleep`) pauses runs without holding worker capacity +- **Signals** (`step.sendSignal`, `step.waitForSignal`) enable runtime + communication to & between workflows - **Heartbeats + leases** (`availableAt`) allow automatic crash recovery - **Database as source of truth** avoids a separate orchestration service diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index f97bc187..6a51854c 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -20,10 +20,10 @@ description: What's coming next for OpenWorkflow - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint - ✅ Child workflows (`step.runWorkflow`) +- ✅ Signals (`step.sendSignal`, `step.waitForSignal`) ## Coming Soon -- Signals - Cron / scheduling - Rollback / compensation functions - Priority and concurrency controls diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx new file mode 100644 index 00000000..4c999f7f --- /dev/null +++ b/packages/docs/docs/signals.mdx @@ -0,0 +1,345 @@ +--- +title: Signals +description: Send data between workflows at runtime without polling +--- + +Signals let workflows communicate at runtime. A workflow can pause and wait for +a signal, and another workflow (or your application code) can send that signal +to wake it up with data attached. + +This is useful any time a workflow needs to wait for something that isn't on a +timer: a human approval, a webhook callback, a payment confirmation, or a +coordination message from another workflow. + +## Basic Usage + +### Waiting for a Signal + +Use `step.waitForSignal()` inside a workflow to pause until a matching signal +arrives: + +```ts +import { defineWorkflow } from "openworkflow"; + +export const approvalWorkflow = defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + await step.run({ name: "request-approval" }, async () => { + await slack.send({ + channel: "#approvals", + text: `Please approve order ${input.orderId}`, + }); + }); + + // wait until someone sends the "approval" signal + const decision = await step.waitForSignal<{ approved: boolean }>({ + signal: `approval:${input.orderId}`, + timeout: "7d", + }); + + if (decision?.approved) { + await step.run({ name: "process-order" }, async () => { + await orders.process(input.orderId); + }); + } + }, +); +``` + +### Sending a Signal + +Send a signal from another workflow using `step.sendSignal()`: + +```ts +export const reviewWorkflow = defineWorkflow( + { name: "review-workflow" }, + async ({ input, step }) => { + const verdict = await step.run({ name: "run-review" }, async () => { + return await reviewService.evaluate(input.orderId); + }); + + await step.sendSignal({ + signal: `approval:${input.orderId}`, + data: { approved: verdict.passed }, + }); + }, +); +``` + +Or send a signal from your application code using the client: + +```ts +import { ow } from "./openworkflow/client"; + +// from an API route, webhook handler, etc. +await ow.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + + + Signals are not buffered. If you send a signal before any workflow is waiting + for it, the signal is lost. + + +## Signal Names + +Signal names are arbitrary strings scoped to the backend namespace. Use +descriptive, unique names — often including an entity ID — to avoid collisions: + +```ts +// Good - scoped to a specific entity +await step.waitForSignal({ signal: `payment:${invoiceId}` }); +await step.waitForSignal({ signal: `approval:order:${orderId}` }); + +// Bad - too generic, could collide across workflows +await step.waitForSignal({ signal: "done" }); +``` + +## Step Names + +Like other step types, signal steps need unique names within a workflow. If you +don't provide one, OpenWorkflow uses the signal name as the step name. + +```ts +// Implicit — step name defaults to signal name +await step.waitForSignal({ signal: `payment:${invoiceId}` }); + +// Explicit step name +await step.waitForSignal({ + name: "wait-for-payment", + signal: `payment:${invoiceId}`, +}); +``` + +## Timeout + +`step.waitForSignal` accepts an optional `timeout`. If the signal doesn't +arrive before the timeout, the step resolves with `null` instead of blocking +forever. + +```ts +const result = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "24h", +}); + +if (result === null) { + // timed out — no signal arrived within 24 hours + await step.run({ name: "escalate" }, async () => { + await alerts.send("Approval timed out"); + }); +} +``` + +`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a +number of milliseconds, or a `Date`. + +If no timeout is specified, the wait defaults to **1 year**. + +## Schema Validation + +Validate signal payloads at receive time using any +[Standard Schema](/docs/standard-schema) compatible validator: + +```ts +import { z } from "zod"; + +const approvalSchema = z.object({ + approved: z.boolean(), + reviewedBy: z.string(), +}); + +const decision = await step.waitForSignal({ + signal: `approval:${orderId}`, + schema: approvalSchema, + timeout: "7d", +}); + +// `decision` is typed as { approved: boolean; reviewedBy: string } | null +``` + +If the signal data doesn't match the schema, the step fails permanently (no +retries) to surface the contract violation immediately. + +## Idempotency + +When sending signals from the client, you can provide an idempotency key to +safely retry without delivering the signal twice: + +```ts +await ow.sendSignal({ + signal: `payment:${invoiceId}`, + data: { amount: 99.99 }, + idempotencyKey: `payment-confirmed:${invoiceId}`, +}); +``` + +If a signal with the same idempotency key has already been sent, the call +returns the original result without re-delivering. + +## Fan-Out: One Signal, Many Waiters + +A single `sendSignal` call delivers to **every** workflow currently waiting on +that signal name. This makes fan-out coordination straightforward: + +```ts +// Multiple workflows waiting on the same signal +const workflowA = defineWorkflow({ name: "listener-a" }, async ({ step }) => { + const data = await step.waitForSignal({ signal: "config-updated" }); + // handle update +}); + +const workflowB = defineWorkflow({ name: "listener-b" }, async ({ step }) => { + const data = await step.waitForSignal({ signal: "config-updated" }); + // handle update +}); + +// One signal wakes both workflows +await ow.sendSignal({ + signal: "config-updated", + data: { version: 42 }, +}); +``` + +## Sending Signals from Workflows + +Use `step.sendSignal()` inside a workflow to send signals durably. The send is +recorded as a step attempt, so it won't be repeated on replay: + +```ts +const result = await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); + +// result.workflowRunIds contains IDs of workflows that received the signal +``` + +## Common Patterns + +### Human-in-the-Loop Approval + +```ts +export const purchaseWorkflow = defineWorkflow( + { name: "purchase" }, + async ({ input, step }) => { + await step.run({ name: "send-approval-request" }, async () => { + await email.send({ + to: input.managerEmail, + subject: `Approve purchase: $${input.amount}`, + body: `Click to approve: ${approvalUrl(input.purchaseId)}`, + }); + }); + + const approval = await step.waitForSignal<{ approved: boolean }>({ + signal: `purchase-approval:${input.purchaseId}`, + timeout: "3d", + }); + + if (!approval?.approved) { + await step.run({ name: "notify-rejected" }, async () => { + await email.send({ + to: input.requesterEmail, + subject: "Purchase request denied", + }); + }); + return { status: "rejected" }; + } + + await step.run({ name: "process-purchase" }, async () => { + await purchasing.submit(input.purchaseId); + }); + + return { status: "approved" }; + }, +); + +// in your API route handler: +app.post("/approve/:purchaseId", async (req, res) => { + await ow.sendSignal({ + signal: `purchase-approval:${req.params.purchaseId}`, + data: { approved: req.body.approved }, + }); + res.json({ ok: true }); +}); +``` + +### Webhook Callback + +```ts +export const paymentWorkflow = defineWorkflow( + { name: "payment" }, + async ({ input, step }) => { + const checkout = await step.run({ name: "create-checkout" }, async () => { + return await stripe.checkout.sessions.create({ + metadata: { workflowSignal: `payment:${input.orderId}` }, + // ... + }); + }); + + const payment = await step.waitForSignal({ + signal: `payment:${input.orderId}`, + timeout: "1h", + }); + + if (!payment) { + return { status: "expired" }; + } + + await step.run({ name: "fulfill-order" }, async () => { + await orders.fulfill(input.orderId); + }); + + return { status: "paid" }; + }, +); + +// in your Stripe webhook handler: +app.post("/webhooks/stripe", async (req, res) => { + const event = req.body; + if (event.type === "checkout.session.completed") { + await ow.sendSignal({ + signal: event.data.object.metadata.workflowSignal, + data: { sessionId: event.data.object.id }, + idempotencyKey: event.id, + }); + } + res.sendStatus(200); +}); +``` + +### Workflow-to-Workflow Coordination + +```ts +const producer = defineWorkflow( + { name: "data-producer" }, + async ({ input, step }) => { + const data = await step.run({ name: "generate-data" }, async () => { + return await heavyComputation(input); + }); + + await step.sendSignal({ + signal: `data-ready:${input.batchId}`, + data: { recordCount: data.length }, + }); + }, +); + +const consumer = defineWorkflow( + { name: "data-consumer" }, + async ({ input, step }) => { + const notification = await step.waitForSignal({ + signal: `data-ready:${input.batchId}`, + timeout: "1h", + }); + + if (notification) { + await step.run({ name: "process-data" }, async () => { + await processRecords(input.batchId, notification.recordCount); + }); + } + }, +); +``` diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 5eaa2f80..5ceffd71 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -115,7 +115,7 @@ await step.run({ name: "log-event" }, async () => { ## Step Types -OpenWorkflow provides three step types: +OpenWorkflow provides five step types: ### `step.run()` @@ -148,6 +148,31 @@ const childOutput = await step.runWorkflow( ); ``` +### `step.sendSignal()` + +Sends a signal to all workflows currently waiting on that signal name: + +```ts +const { workflowRunIds } = await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + +### `step.waitForSignal()` + +Pauses the workflow until a matching signal is sent, or the timeout expires: + +```ts +const data = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "7d", +}); +// data is the signal payload, or null if timed out +``` + +See [Signals](/docs/signals) for details. + ## Retry Policy (Optional) Control backoff and retry limits for an individual step: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 5d8ae39e..ca480568 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,12 +213,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | --------------------------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | -------------------------------------------------------------------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, `step.waitForSignal`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index bb936dce..7f7029d0 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -1,5 +1,6 @@ -import type { Backend } from "../core/backend.js"; +import type { Backend, SendSignalResult } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; +import { JsonValue } from "../core/json.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; import { calculateDateFromDuration } from "../core/step-attempt.js"; import { @@ -173,6 +174,33 @@ export class OpenWorkflow { async cancelWorkflowRun(workflowRunId: string): Promise { await this.backend.cancelWorkflowRun({ workflowRunId }); } + + /** + * Send a signal to all workflows currently waiting on the given signal + * string. If no workflow is waiting, the signal is silently dropped. + * @param options - Signal options + * @returns IDs of workflow runs that received the signal + * @example + * ```ts + * const { workflowRunIds } = await ow.sendSignal({ + * signal: "approval:order_456", + * data: { approved: true }, + * }); + * ``` + */ + async sendSignal( + options: Readonly<{ + signal: string; + data?: JsonValue; + idempotencyKey?: string; + }>, + ): Promise { + return this.backend.sendSignal({ + signal: options.signal, + data: options.data ?? null, + idempotencyKey: options.idempotencyKey ?? null, + }); + } } /** diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index d79dc5e6..9edf79be 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -68,6 +68,12 @@ export interface Backend { params: Readonly, ): Promise; + // Signals + sendSignal(params: Readonly): Promise; + getSignalDelivery( + params: Readonly, + ): Promise; + // Lifecycle stop(): Promise; } @@ -173,6 +179,20 @@ export interface SetStepAttemptChildWorkflowRunParams { childWorkflowRunId: string; } +export interface SendSignalParams { + signal: string; + data: JsonValue | null; + idempotencyKey: string | null; +} + +export interface SendSignalResult { + workflowRunIds: string[]; +} + +export interface GetSignalDeliveryParams { + stepAttemptId: string; +} + export interface PaginationOptions { limit?: number; after?: string; diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts index ce5f207f..502693bd 100644 --- a/packages/openworkflow/core/step-attempt.test.ts +++ b/packages/openworkflow/core/step-attempt.test.ts @@ -7,6 +7,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalWaitContext, } from "./step-attempt.js"; import type { StepAttempt, StepAttemptCache } from "./step-attempt.js"; import { describe, expect, test } from "vitest"; @@ -339,6 +340,26 @@ describe("createWorkflowContext", () => { }); }); +describe("createSignalWaitContext", () => { + test("creates signal-wait context with signal and timeout", () => { + const timeoutAt = new Date("2025-06-15T10:30:00.000Z"); + const context = createSignalWaitContext("approval:order_456", timeoutAt); + + expect(context).toEqual({ + kind: "signal-wait", + signal: "approval:order_456", + timeoutAt: "2025-06-15T10:30:00.000Z", + }); + }); + + test("preserves millisecond precision in timeout", () => { + const timeoutAt = new Date("2025-01-01T00:00:00.123Z"); + const context = createSignalWaitContext("test-signal", timeoutAt); + + expect(context.timeoutAt).toBe("2025-01-01T00:00:00.123Z"); + }); +}); + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 888120ac..3730db67 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,12 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "workflow"; +export type StepKind = + | "function" + | "sleep" + | "workflow" + | "signal-send" + | "signal-wait"; /** * Status of a step attempt through its lifecycle. @@ -34,12 +39,22 @@ export interface WorkflowStepAttemptContext { timeoutAt: string | null; } +/** + * Context for a signal-wait step attempt. + */ +export interface SignalWaitStepAttemptContext { + kind: "signal-wait"; + signal: string; + timeoutAt: string; +} + /** * Context for a step attempt. */ export type StepAttemptContext = | SleepStepAttemptContext - | WorkflowStepAttemptContext; + | WorkflowStepAttemptContext + | SignalWaitStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -171,3 +186,20 @@ export function createWorkflowContext( timeoutAt: timeoutAt?.toISOString() ?? null, }; } + +/** + * Create the context object for a signal-wait step attempt. + * @param signal - Signal address string + * @param timeoutAt - Wait timeout deadline + * @returns The context object for the signal-wait step + */ +export function createSignalWaitContext( + signal: string, + timeoutAt: Readonly, +): SignalWaitStepAttemptContext { + return { + kind: "signal-wait", + signal, + timeoutAt: timeoutAt.toISOString(), + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 01745679..dc322a61 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,7 +1,15 @@ import type { DurationString } from "./duration.js"; +import type { JsonValue } from "./json.js"; +import type { StandardSchemaV1 } from "./standard-schema.js"; import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; +/** + * Timeout for a step wait. Accepts milliseconds (number), a duration string + * (e.g. "5m", "1h"), or an absolute Date. + */ +export type StepWaitTimeout = number | string | Date; + /** * Config for an individual step defined with `step.run()`. */ @@ -37,13 +45,13 @@ export interface StepRunWorkflowOptions { /** * Maximum time to wait for the child workflow to complete. */ - timeout?: number | string | Date; + timeout?: StepWaitTimeout; } /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, - * and `step.runWorkflow()`. + * `step.runWorkflow()`, `step.sendSignal()`, and `step.waitForSignal()`. */ export interface StepApi { run: ( @@ -56,6 +64,21 @@ export interface StepApi { options?: Readonly, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + sendSignal: ( + options: Readonly<{ + name?: string; + signal: string; + data?: JsonValue; + }>, + ) => Promise<{ workflowRunIds: string[] }>; + waitForSignal: ( + options: Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ) => Promise; } /** diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index c2ac3187..10846218 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -21,6 +21,9 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + SendSignalParams, + SendSignalResult, + GetSignalDeliveryParams, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; @@ -247,6 +250,98 @@ export class BackendPostgres implements Backend { return workflowRun ?? null; } + async sendSignal(params: SendSignalParams): Promise { + return await this.pg.begin(async (tx): Promise => { + // eslint-disable-next-line sonarjs/todo-tag + const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR + const stepAttemptsTable = this.stepAttemptsTable(sql); + const workflowSignalsTable = this.workflowSignalsTable(sql); + + if (params.idempotencyKey !== null) { + const existing = await sql<{ workflowRunId: string }[]>` + SELECT DISTINCT "workflow_run_id" AS "workflowRunId" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "sender_idempotency_key" = ${params.idempotencyKey} + `; + if (existing.length > 0) { + return { workflowRunIds: existing.map((r) => r.workflowRunId) }; + } + } + const workflowRunsTable = this.workflowRunsTable(sql); + + // find active waiting step attempts & insert a signal delivery row for each + const waiters = await sql<{ id: string; workflowRunId: string }[]>` + SELECT "id", "workflow_run_id" AS "workflowRunId" + FROM ${stepAttemptsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "kind" = 'signal-wait' + AND "status" = 'running' + AND "context"->>'signal' = ${params.signal} + FOR UPDATE + `; + + if (waiters.length === 0) { + return { workflowRunIds: [] }; + } + + for (const w of waiters) { + await sql` + INSERT INTO ${workflowSignalsTable} ( + "namespace_id", "id", "signal", "data", + "sender_idempotency_key", "workflow_run_id", + "step_attempt_id", "created_at" + ) + VALUES ( + ${this.namespaceId}, + gen_random_uuid(), + ${params.signal}, + ${sql.json(params.data)}, + ${params.idempotencyKey}, + ${w.workflowRunId}, + ${w.id}, + NOW() + ) + `; + } + + // wake each waiting workflow run + const runIds = [...new Set(waiters.map((w) => w.workflowRunId))]; + await sql` + UPDATE ${workflowRunsTable} + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > NOW() + THEN NOW() + ELSE "available_at" + END, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ANY(${runIds}) + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `; + + return { workflowRunIds: runIds }; + }); + } + + async getSignalDelivery( + params: GetSignalDeliveryParams, + ): Promise { + const workflowSignalsTable = this.workflowSignalsTable(); + + const [row] = await this.pg<{ data: JsonValue | null }[]>` + SELECT "data" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "step_attempt_id" = ${params.stepAttemptId} + LIMIT 1 + `; + + return row ? (row.data ?? null) : undefined; + } + async listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { @@ -449,17 +544,30 @@ export class BackendPostgres implements Backend { AND wr."id" = ${workflowRunId} AND wr."status" = 'running' AND wr."worker_id" IS NULL - AND EXISTS ( - SELECT 1 - FROM ${stepAttemptsTable} sa - JOIN ${workflowRunsTable} child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = wr."namespace_id" - AND sa."workflow_run_id" = wr."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + AND ( + EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${workflowRunsTable} child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${this.workflowSignalsTable()} ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."step_attempt_id" = sa."id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + ) ) RETURNING wr.* `; @@ -942,6 +1050,10 @@ export class BackendPostgres implements Backend { private stepAttemptsTable(pg: Postgres = this.pg) { return pg`${pg(this.schema)}.${pg("step_attempts")}`; } + + private workflowSignalsTable(pg: Postgres = this.pg) { + return pg`${pg(this.schema)}.${pg("workflow_signals")}`; + } } /** diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 7d641e44..554fc732 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -206,6 +206,39 @@ export function migrations(schema: string): string[] { ON CONFLICT DO NOTHING; COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS ${quotedSchema}."workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "signal" TEXT NOT NULL, + "data" JSONB, + "sender_idempotency_key" TEXT, + "workflow_run_id" TEXT NOT NULL, + "step_attempt_id" TEXT NOT NULL, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY ("namespace_id", "id") + ); + + CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id"); + + CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" + ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key") + WHERE "sender_idempotency_key" IS NOT NULL; + + CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" + ON ${quotedSchema}."step_attempts" ("namespace_id", ("context"->>'signal')) + WHERE "kind" = 'signal-wait' AND "status" = 'running'; + + INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") + VALUES (5) + ON CONFLICT DO NOTHING; + + COMMIT;`, ]; } diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index e12c4aed..9fef2aba 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -20,6 +20,9 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + SendSignalParams, + SendSignalResult, + GetSignalDeliveryParams, toWorkflowRunCounts, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; @@ -236,6 +239,121 @@ export class BackendSqlite implements Backend { return Promise.resolve(row ? rowToWorkflowRun(row) : null); } + sendSignal(params: SendSignalParams): Promise { + const currentTime = now(); + + this.db.exec("BEGIN IMMEDIATE"); + try { + if (params.idempotencyKey !== null) { + const existingStmt = this.db.prepare(` + SELECT DISTINCT "workflow_run_id" + FROM "workflow_signals" + WHERE "namespace_id" = ? AND "sender_idempotency_key" = ? + `); + const existing = existingStmt.all( + this.namespaceId, + params.idempotencyKey, + ) as { workflow_run_id: string }[]; + if (existing.length > 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ + workflowRunIds: existing.map((r) => r.workflow_run_id), + }); + } + } + // find active waiting step attempts & insert a signal delivery row for each + const waitersStmt = this.db.prepare(` + SELECT "id", "workflow_run_id" + FROM "step_attempts" + WHERE "namespace_id" = ? + AND "kind" = 'signal-wait' + AND "status" = 'running' + AND json_extract("context", '$.signal') = ? + `); + const waiters = waitersStmt.all(this.namespaceId, params.signal) as { + id: string; + workflow_run_id: string; + }[]; + + if (waiters.length === 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: [] }); + } + + const insertStmt = this.db.prepare(` + INSERT INTO "workflow_signals" ( + "namespace_id", "id", "signal", "data", + "sender_idempotency_key", "workflow_run_id", + "step_attempt_id", "created_at" + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + for (const w of waiters) { + insertStmt.run( + this.namespaceId, + generateUUID(), + params.signal, + toJSON(params.data), + params.idempotencyKey, + w.workflow_run_id, + w.id, + currentTime, + ); + } + + // wake each waiting workflow run + const runIds = [...new Set(waiters.map((w) => w.workflow_run_id))]; + const wakeStmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `); + for (const runId of runIds) { + wakeStmt.run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + runId, + ); + } + + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: runIds }); + } catch (error) { + this.db.exec("ROLLBACK"); + throw error; + } + } + + getSignalDelivery( + params: GetSignalDeliveryParams, + ): Promise { + const stmt = this.db.prepare(` + SELECT "data" + FROM "workflow_signals" + WHERE "namespace_id" = ? AND "step_attempt_id" = ? + LIMIT 1 + `); + const row = stmt.get(this.namespaceId, params.stepAttemptId) as + | { data: string | null } + | undefined; + + // eslint-disable-next-line unicorn/no-useless-undefined + if (!row) return Promise.resolve(undefined); + return Promise.resolve( + (fromJSON(row.data) as JsonValue | undefined) ?? null, + ); + } + async claimWorkflowRun( params: ClaimWorkflowRunParams, ): Promise { @@ -367,17 +485,30 @@ export class BackendSqlite implements Backend { SET "status" = 'running', "available_at" = CASE - WHEN EXISTS ( - SELECT 1 - FROM "step_attempts" sa - JOIN "workflow_runs" child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = "workflow_runs"."namespace_id" - AND sa."workflow_run_id" = "workflow_runs"."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + WHEN ( + EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_runs" child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_signals" ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."step_attempt_id" = sa."id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + ) ) AND ? > ? THEN ? ELSE ? END, diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index bf3fd163..d43c03a6 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -193,6 +193,37 @@ export function migrations(): string[] { VALUES (4); COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS "workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + "signal" TEXT NOT NULL, + "data" TEXT, + "sender_idempotency_key" TEXT, + "workflow_run_id" TEXT NOT NULL, + "step_attempt_id" TEXT NOT NULL, + "created_at" TEXT NOT NULL, + PRIMARY KEY ("namespace_id", "id") + ); + + CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + ON "workflow_signals" ("namespace_id", "step_attempt_id"); + + CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" + ON "workflow_signals" ("namespace_id", "sender_idempotency_key") + WHERE "sender_idempotency_key" IS NOT NULL; + + CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" + ON "step_attempts" ("namespace_id", json_extract("context", '$.signal')) + WHERE "kind" = 'signal-wait' AND "status" = 'running'; + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (5); + + COMMIT;`, ]; } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 6383cb4c..15c2530b 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -1093,7 +1093,61 @@ export function testBackend(options: TestBackendOptions): void { farFuture.getTime(), ); expect(parentAfter.availableAt.getTime()).toBeLessThanOrEqual( - Date.now() + 1000, + Date.now() + 5000, + ); + + await teardown(backend); + }); + + test("wakes a parked run when a signal delivery arrives before parking", async () => { + const backend = await setup(); + + const run = await createClaimedWorkflowRun(backend); + const workerId = run.workerId ?? ""; + const signalString = `reconcile-${randomUUID()}`; + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId, + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + const sendResult = await backend.sendSignal({ + signal: signalString, + data: { value: "arrived-early" }, + idempotencyKey: null, + }); + expect(sendResult.workflowRunIds).toContain(run.id); + + const farFuture = new Date(Date.now() + 5 * 60 * 1000); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId, + availableAt: farFuture, + }); + + const runAfter = await backend.getWorkflowRun({ + workflowRunId: run.id, + }); + expect(runAfter?.status).toBe("running"); + expect(runAfter?.workerId).toBeNull(); + expect(runAfter?.availableAt).not.toBeNull(); + if (!runAfter?.availableAt) { + throw new Error("Expected availableAt after signal reconciliation"); + } + + expect(runAfter.availableAt.getTime()).toBeLessThan( + farFuture.getTime(), + ); + expect(runAfter.availableAt.getTime()).toBeLessThanOrEqual( + Date.now() + 5000, ); await teardown(backend); @@ -2228,6 +2282,204 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); }); + + describe("sendSignal()", () => { + test("returns empty when no active waiters", async () => { + const result = await backend.sendSignal({ + signal: `no-waiters-${randomUUID()}`, + data: { value: 42 }, + idempotencyKey: null, + }); + expect(result.workflowRunIds).toEqual([]); + }); + + test("delivers to one active waiter and wakes run", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `test-signal-${randomUUID()}`; + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + const result = await backend.sendSignal({ + signal: signalString, + data: { approved: true }, + idempotencyKey: null, + }); + + expect(result.workflowRunIds).toContain(run.id); + + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ approved: true }); + + const wokenRun = await backend.getWorkflowRun({ + workflowRunId: run.id, + }); + expect(wokenRun).not.toBeNull(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + expect(wokenRun!.availableAt).not.toBeNull(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + expect(wokenRun!.availableAt!.getTime()).toBeLessThanOrEqual( + Date.now() + 5000, + ); + }); + + test("fans out to multiple waiters", async () => { + const signalString = `fan-out-${randomUUID()}`; + const runs: WorkflowRun[] = []; + + for (let i = 0; i < 3; i++) { + const run = await createClaimedWorkflowRun(backend); + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + runs.push(run); + } + + const result = await backend.sendSignal({ + signal: signalString, + data: "hello", + idempotencyKey: null, + }); + + expect(result.workflowRunIds).toHaveLength(3); + for (const run of runs) { + expect(result.workflowRunIds).toContain(run.id); + } + }); + + test("idempotent send returns same result", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `idempotent-${randomUUID()}`; + const idempotencyKey = randomUUID(); + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + const first = await backend.sendSignal({ + signal: signalString, + data: { val: 1 }, + idempotencyKey, + }); + + const second = await backend.sendSignal({ + signal: signalString, + data: { val: 2 }, + idempotencyKey, + }); + + expect(first.workflowRunIds).toEqual(second.workflowRunIds); + }); + }); + + describe("getSignalDelivery()", () => { + test("returns undefined when no delivery exists", async () => { + const run = await createClaimedWorkflowRun(backend); + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: `no-delivery-${randomUUID()}`, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + const result = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(result).toBeUndefined(); + }); + + test("returns null data when signal delivered with null", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `null-data-${randomUUID()}`; + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + await backend.sendSignal({ + signal: signalString, + data: null, + idempotencyKey: null, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: run.id, + }); + const step = steps.data.find((s) => s.stepName === "wait-step"); + expect(step).toBeDefined(); + + const result = await backend.getSignalDelivery({ + stepAttemptId: step!.id, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(result).toBeNull(); + }); + }); }); } diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 2dba8674..7ad443eb 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -721,7 +721,7 @@ describe("StepExecutor", () => { expect(status).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Workflow timeout must be a non-negative number/, + /Timeout must be a non-negative number/, ); }); @@ -2480,6 +2480,253 @@ describe("StepExecutor", () => { ); expect(childStatus).toBe("completed"); }); + + // ---- step.sendSignal / step.waitForSignal -------------------------------- + + test("sendSignal sends a signal and returns workflow run IDs", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `test-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-waiter-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + return data; + }, + ); + + const sender = client.defineWorkflow( + { name: `signal-sender-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: signalString, + data: { approved: true }, + }); + return result; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + const senderHandle = await sender.run(); + const senderStatus = await tickUntilTerminal( + backend, + worker, + senderHandle.workflowRun.id, + 20, + 50, + ); + expect(senderStatus).toBe("completed"); + + const senderResult = await senderHandle.result(); + expect(senderResult).toEqual({ + workflowRunIds: [waiterHandle.workflowRun.id], + }); + + const waiterStatus = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(waiterStatus).toBe("completed"); + const waiterResult = await waiterHandle.result(); + expect(waiterResult).toEqual({ approved: true }); + }); + + test("waitForSignal returns null on timeout", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: `never-sent-${randomUUID()}`, + timeout: 1, // 1ms — expires immediately + }); + return data; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + // first tick: creates the signal-wait step and parks (timeout is in the future at creation time) + // subsequent ticks: timeout has expired, completes with null + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toBeNull(); + }); + + test("waitForSignal validates data against schema", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `schema-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-schema-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + schema: z.object({ approved: z.boolean() }), + }); + return data; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + // w/ valid data + await client.sendSignal({ + signal: signalString, + data: { approved: true }, + }); + + const status = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await waiterHandle.result(); + expect(result).toEqual({ approved: true }); + }); + + test("waitForSignal fails step when schema validation fails", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `bad-schema-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-bad-schema-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + schema: z.object({ approved: z.boolean() }), + }); + return data; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + // w/ invalid data + await client.sendSignal({ + signal: signalString, + data: { approved: "not-a-boolean" }, + }); + + const status = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + }); + + test("sendSignal is replay-safe across re-executions", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `replay-signal-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-replay-${randomUUID()}` }, + async ({ step }) => { + const first = await step.sendSignal({ + signal: signalString, + data: { v: 1 }, + }); + const value = await step.run({ name: "compute" }, () => 42); + return { sent: first, value }; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + + const result = await handle.result(); + expect(result).toEqual({ + sent: { workflowRunIds: [] }, + value: 42, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalStep = steps.data.find((s) => s.kind === "signal-send"); + expect(signalStep).toBeDefined(); + expect(signalStep?.status).toBe("completed"); + }); + + test("sendSignal auto-indexes duplicate signal step names", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-autoindex-${randomUUID()}` }, + async ({ step }) => { + await step.sendSignal({ signal: "notify", data: "first" }); + await step.sendSignal({ signal: "notify", data: "second" }); + return "done"; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalSteps = steps.data + .filter((s) => s.kind === "signal-send") + .map((s) => s.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(signalSteps).toEqual(["notify", "notify:1"]); + }); }); describe("executeWorkflow", () => { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index d825d737..448c63bc 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -6,6 +6,7 @@ import { type SerializedError, } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; +import type { StandardSchemaV1 } from "../core/standard-schema.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { getCachedStepAttempt, @@ -14,6 +15,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalWaitContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, @@ -26,6 +28,7 @@ import type { StepApi, StepFunction, StepFunctionConfig, + StepWaitTimeout, WorkflowFunction, WorkflowRunMetadata, } from "../core/workflow-function.js"; @@ -127,7 +130,7 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { * Retry policy for workflow step failures (no retries - the child workflow * is responsible for retries). */ -const WORKFLOW_STEP_FAILURE_RETRY_POLICY: RetryPolicy = { +const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, maximumAttempts: 1, }; @@ -237,16 +240,16 @@ export function createStepExecutionStateFromAttempts( } /** - * Resolve workflow timeout input to an absolute deadline. + * Resolve wait timeout input to an absolute deadline. * @param timeout - Relative/absolute timeout input * @returns Absolute timeout deadline * @throws {Error} When timeout is invalid */ -function resolveWorkflowTimeoutAt( +function resolveWaitTimeoutAt( timeout: number | string | Date | undefined, ): Date { if (timeout === undefined) { - return defaultWorkflowTimeoutAt(); + return defaultWaitTimeoutAt(); } if (timeout instanceof Date) { @@ -255,7 +258,7 @@ function resolveWorkflowTimeoutAt( if (typeof timeout === "number") { if (!Number.isFinite(timeout) || timeout < 0) { - throw new Error("Workflow timeout must be a non-negative number"); + throw new Error("Timeout must be a non-negative number"); } return new Date(Date.now() + timeout); } @@ -268,32 +271,37 @@ function resolveWorkflowTimeoutAt( } /** - * Default workflow timeout: 1 year from a base time. + * Default wait timeout: 1 year from a base time. * @param base - Base timestamp (defaults to now) * @returns Timeout deadline */ -function defaultWorkflowTimeoutAt(base: Readonly = new Date()): Date { +function defaultWaitTimeoutAt(base: Readonly = new Date()): Date { const timeoutAt = new Date(base); timeoutAt.setFullYear(timeoutAt.getFullYear() + 1); return timeoutAt; } /** - * Extract the workflow timeout from a persisted step attempt's context. - * @param attempt - Running workflow step attempt - * @returns Timeout deadline, or null when context is not workflow + * Extract the timeout from a persisted step attempt's context. + * Works for both workflow and signal-wait step types. + * @param attempt - Running step attempt + * @returns Timeout deadline, or null when context has no timeout */ -function getWorkflowTimeoutAt(attempt: Readonly): Date | null { - if (attempt.context?.kind !== "workflow") { +function getContextTimeoutAt(attempt: Readonly): Date | null { + if ( + attempt.context?.kind !== "workflow" && + attempt.context?.kind !== "signal-wait" + ) { return null; } - if (attempt.context.timeoutAt === null) { - // Backward compatibility for previously persisted workflow contexts. - return defaultWorkflowTimeoutAt(attempt.createdAt); + const { timeoutAt } = attempt.context; + if (timeoutAt === null) { + // backward compatibility for previously persisted workflow contexts + // (signal-wait timeoutAt is never null per SignalWaitStepAttemptContext). + return defaultWaitTimeoutAt(attempt.createdAt); } - - return new Date(attempt.context.timeoutAt); + return new Date(timeoutAt); } /** @@ -306,7 +314,7 @@ function hasWorkflowTimedOut( attempt: Readonly, childRun: Readonly, ): boolean { - const timeoutAt = getWorkflowTimeoutAt(attempt); + const timeoutAt = getContextTimeoutAt(attempt); if (!timeoutAt) return false; const timeoutMs = timeoutAt.getTime(); @@ -337,19 +345,26 @@ function getRunningWaitAttemptResumeAt( return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; } + if (attempt.kind === "signal-wait") { + const timeoutAt = + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); + return Number.isFinite(timeoutAt.getTime()) + ? timeoutAt + : defaultWaitTimeoutAt(attempt.createdAt); + } + if (attempt.kind !== "workflow") { return null; } const timeoutAt = - getWorkflowTimeoutAt(attempt) ?? - defaultWorkflowTimeoutAt(attempt.createdAt); + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); if (Number.isFinite(timeoutAt.getTime())) { return timeoutAt; } // Backward compatibility for malformed historical workflow timeout values. - return defaultWorkflowTimeoutAt(attempt.createdAt); + return defaultWaitTimeoutAt(attempt.createdAt); } /** @@ -463,6 +478,19 @@ function buildWorkflowIdempotencyKey(attempt: Readonly): string { return `__workflow:${attempt.namespaceId}:${attempt.id}`; } +/** + * Build deterministic idempotency key for signal send invocation. + * @param workflowRunId - Workflow run id + * @param stepName - Step name + * @returns Stable idempotency key + */ +function buildSignalIdempotencyKey( + workflowRunId: string, + stepName: string, +): string { + return `__signal:${workflowRunId}:${stepName}`; +} + /** * Configures the options for a StepExecutor. */ @@ -502,6 +530,7 @@ class StepExecutor implements StepApi { private readonly expectedNextStepIndexByName: Map; private readonly resolvedStepNames: Set; private readonly executionFence: ExecutionFenceController; + private readonly activeSignalWaitStepNameBySignal: Map; constructor(options: Readonly) { this.backend = options.backend; @@ -518,6 +547,20 @@ class StepExecutor implements StepApi { this.expectedNextStepIndexByName = new Map(); this.resolvedStepNames = new Set(); this.executionFence = options.executionFence; + + // build signal → step name map from running signal-wait attempts. + this.activeSignalWaitStepNameBySignal = new Map(); + for (const [, attempt] of state.runningByStepName) { + if ( + attempt.kind === "signal-wait" && + attempt.context?.kind === "signal-wait" + ) { + this.activeSignalWaitStepNameBySignal.set( + attempt.context.signal, + attempt.stepName, + ); + } + } } private assertExecutionActive(): void { @@ -713,7 +756,7 @@ class StepExecutor implements StepApi { throw new StepError({ stepName, stepFailedAttempts: this.failedCountsByStepName.get(stepName) ?? 1, - retryPolicy: WORKFLOW_STEP_FAILURE_RETRY_POLICY, + retryPolicy: TERMINAL_STEP_RETRY_POLICY, error: failedError, }); } @@ -729,7 +772,7 @@ class StepExecutor implements StepApi { } // First encounter — create the workflow step and child workflow run - const timeoutAt = resolveWorkflowTimeoutAt(request.timeout); + const timeoutAt = resolveWaitTimeoutAt(request.timeout); this.assertExecutionActive(); this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ @@ -784,7 +827,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -798,7 +841,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run "${childId}"`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -808,7 +851,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, new Error("Timed out waiting for child workflow to complete"), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -835,7 +878,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, childError, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -847,16 +890,16 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } // Child still running — sleep until timeout - const timeoutAt = getWorkflowTimeoutAt(workflowAttempt); + const timeoutAt = getContextTimeoutAt(workflowAttempt); const resumeAt = timeoutAt && Number.isFinite(timeoutAt.getTime()) ? timeoutAt - : defaultWorkflowTimeoutAt(workflowAttempt.createdAt); + : defaultWaitTimeoutAt(workflowAttempt.createdAt); throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); } @@ -973,8 +1016,201 @@ class StepExecutor implements StepApi { stepName, stepAttemptId, error, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, + ); + } + + // ---- step.sendSignal ---------------------------------------------------- + + async sendSignal( + options: Readonly<{ + name?: string; + signal: string; + data?: JsonValue; + }>, + ): Promise<{ workflowRunIds: string[] }> { + const stepName = this.resolveStepName(options.name ?? options.signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as { workflowRunIds: string[] }; + } + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-send") { + return await this.resolveSignalSend(stepName, runningAttempt, options); + } + + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-send", + config: {}, + context: null, + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + return await this.resolveSignalSend(stepName, attempt, options); + } + + private async resolveSignalSend( + stepName: string, + attempt: Readonly, + options: Readonly<{ signal: string; data?: JsonValue }>, + ): Promise<{ workflowRunIds: string[] }> { + try { + const result = await this.backend.sendSignal({ + signal: options.signal, + data: options.data ?? null, + idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName), + }); + + const output = { workflowRunIds: result.workflowRunIds }; + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output, + }); + this.cache = addToStepAttemptCache(this.cache, completed); + this.runningByStepName.delete(stepName); + return completed.output as { workflowRunIds: string[] }; + } catch (error) { + return await this.failStepWithError( + stepName, + attempt.id, + error, + TERMINAL_STEP_RETRY_POLICY, + ); + } + } + + // ---- step.waitForSignal ------------------------------------------------ + + async waitForSignal( + options: Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ): Promise { + const stepName = this.resolveStepName(options.name ?? options.signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as Output | null; + } + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-wait") { + return await this.resolveSignalWait( + stepName, + runningAttempt, + options, + ); + } + + const existingWaiter = this.activeSignalWaitStepNameBySignal.get( + options.signal, ); + if (existingWaiter && existingWaiter !== stepName) { + throw new Error( + `Signal "${options.signal}" is already being waited on by step "${existingWaiter}"`, + ); + } + + const timeoutAt = resolveWaitTimeoutAt(options.timeout); + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-wait", + config: {}, + context: createSignalWaitContext(options.signal, timeoutAt), + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + this.activeSignalWaitStepNameBySignal.set(options.signal, stepName); + + return await this.resolveSignalWait(stepName, attempt, options); + } + + private async resolveSignalWait( + stepName: string, + attempt: Readonly, + options: Readonly<{ + schema?: StandardSchemaV1; + }>, + ): Promise { + const signalData = await this.backend.getSignalDelivery({ + stepAttemptId: attempt.id, + }); + + if (signalData !== undefined) { + let outputValue: unknown = signalData; + + if (options.schema) { + const validationResult = await validateInput( + options.schema, + signalData, + ); + if (!validationResult.success) { + return await this.failStepWithError( + stepName, + attempt.id, + new Error( + `Signal schema validation failed: ${validationResult.error}`, + ), + TERMINAL_STEP_RETRY_POLICY, + ); + } + outputValue = validationResult.value; + } + + return await this.completeSignalWaitStep( + attempt, + normalizeStepOutput(outputValue), + ); + } + + const timeoutAt = + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); + if (Date.now() >= timeoutAt.getTime()) { + return await this.completeSignalWaitStep(attempt, null); + } + + throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(timeoutAt)); + } + + /** + * Complete a signal-wait step attempt and update internal maps. + * @param attempt - Step attempt being completed + * @param output - Output value (null for timeout) + * @returns The completed step output + */ + private async completeSignalWaitStep( + attempt: Readonly, + output: JsonValue | null, + ): Promise { + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output, + }); + this.cache = addToStepAttemptCache(this.cache, completed); + this.runningByStepName.delete(attempt.stepName); + if (attempt.context?.kind === "signal-wait") { + this.activeSignalWaitStepNameBySignal.delete(attempt.context.signal); + } + return completed.output as Output | null; } private ensureStepLimitNotReached(): void { From 0e6e8b64b972b4464925de7f890666823af7cda9 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 27 Mar 2026 17:29:15 -0500 Subject: [PATCH 02/11] feat: feedback --- packages/openworkflow/postgres/backend.ts | 1 + packages/openworkflow/postgres/postgres.ts | 2 +- packages/openworkflow/sqlite/backend.ts | 2 +- packages/openworkflow/sqlite/sqlite.ts | 2 +- .../openworkflow/testing/backend.testsuite.ts | 84 +++++++++++++++++++ .../openworkflow/worker/execution.test.ts | 37 ++++++++ 6 files changed, 125 insertions(+), 3 deletions(-) diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 10846218..81a0f153 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -302,6 +302,7 @@ export class BackendPostgres implements Backend { ${w.id}, NOW() ) + ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING `; } diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 554fc732..2c53245f 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -223,7 +223,7 @@ export function migrations(schema: string): string[] { PRIMARY KEY ("namespace_id", "id") ); - CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id"); CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 9fef2aba..30d423f8 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -281,7 +281,7 @@ export class BackendSqlite implements Backend { } const insertStmt = this.db.prepare(` - INSERT INTO "workflow_signals" ( + INSERT OR IGNORE INTO "workflow_signals" ( "namespace_id", "id", "signal", "data", "sender_idempotency_key", "workflow_run_id", "step_attempt_id", "created_at" diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index d43c03a6..155802ad 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -209,7 +209,7 @@ export function migrations(): string[] { PRIMARY KEY ("namespace_id", "id") ); - CREATE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" ON "workflow_signals" ("namespace_id", "step_attempt_id"); CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 15c2530b..7805a0af 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -2416,6 +2416,90 @@ export function testBackend(options: TestBackendOptions): void { expect(first.workflowRunIds).toEqual(second.workflowRunIds); }); + + test("duplicate send to same waiter delivers at most once", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `dup-send-${randomUUID()}`; + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + // send the same signal twice (no idempotency key — different senders) + await backend.sendSignal({ + signal: signalString, + data: { first: true }, + idempotencyKey: null, + }); + await backend.sendSignal({ + signal: signalString, + data: { second: true }, + idempotencyKey: null, + }); + + // only the first delivery should be stored + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ first: true }); + }); + + test("idempotent send does not create duplicate delivery rows", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `idem-dup-${randomUUID()}`; + const idempotencyKey = randomUUID(); + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + await backend.sendSignal({ + signal: signalString, + data: { original: true }, + idempotencyKey, + }); + + // second send with same idempotency key + await backend.sendSignal({ + signal: signalString, + data: { duplicate: true }, + idempotencyKey, + }); + + // should still return the original delivery, not the duplicate + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ original: true }); + }); }); describe("getSignalDelivery()", () => { diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 7ad443eb..3f88af21 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2727,6 +2727,43 @@ describe("StepExecutor", () => { .toSorted((a, b) => a.localeCompare(b)); expect(signalSteps).toEqual(["notify", "notify:1"]); }); + + test("waitForSignal is replay-safe across re-executions", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `replay-wait-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-wait-replay-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + const value = await step.run({ name: "after-signal" }, () => "ok"); + return { data, value }; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + await client.sendSignal({ signal: signalString, data: { x: 1 } }); + + // resume — the first replay hits the cached waitForSignal path + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ data: { x: 1 }, value: "ok" }); + }); }); describe("executeWorkflow", () => { From 2856db33e1827176cd1726fce305939ded8defeb Mon Sep 17 00:00:00 2001 From: James Martinez Date: Tue, 31 Mar 2026 16:40:03 -0500 Subject: [PATCH 03/11] feat: feedback --- packages/openworkflow/postgres/backend.ts | 33 ++++++++++++++-------- packages/openworkflow/postgres/postgres.ts | 2 +- packages/openworkflow/sqlite/backend.ts | 16 +++++++++-- packages/openworkflow/sqlite/sqlite.ts | 2 +- packages/openworkflow/worker/execution.ts | 4 +-- 5 files changed, 38 insertions(+), 19 deletions(-) diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 81a0f153..6f0f03b2 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -251,27 +251,27 @@ export class BackendPostgres implements Backend { } async sendSignal(params: SendSignalParams): Promise { - return await this.pg.begin(async (tx): Promise => { - // eslint-disable-next-line sonarjs/todo-tag - const sql = tx as unknown as Postgres; // todo: come back and clean up before merging PR - const stepAttemptsTable = this.stepAttemptsTable(sql); - const workflowSignalsTable = this.workflowSignalsTable(sql); + return await this.pg.begin(async (sql): Promise => { + const tx = sql as unknown as Postgres; + const stepAttemptsTable = this.stepAttemptsTable(tx); + const workflowSignalsTable = this.workflowSignalsTable(tx); if (params.idempotencyKey !== null) { - const existing = await sql<{ workflowRunId: string }[]>` + const existing = await tx<{ workflowRunId: string }[]>` SELECT DISTINCT "workflow_run_id" AS "workflowRunId" FROM ${workflowSignalsTable} WHERE "namespace_id" = ${this.namespaceId} + AND "signal" = ${params.signal} AND "sender_idempotency_key" = ${params.idempotencyKey} `; if (existing.length > 0) { return { workflowRunIds: existing.map((r) => r.workflowRunId) }; } } - const workflowRunsTable = this.workflowRunsTable(sql); + const workflowRunsTable = this.workflowRunsTable(tx); // find active waiting step attempts & insert a signal delivery row for each - const waiters = await sql<{ id: string; workflowRunId: string }[]>` + const waiters = await tx<{ id: string; workflowRunId: string }[]>` SELECT "id", "workflow_run_id" AS "workflowRunId" FROM ${stepAttemptsTable} WHERE "namespace_id" = ${this.namespaceId} @@ -285,8 +285,9 @@ export class BackendPostgres implements Backend { return { workflowRunIds: [] }; } + const deliveredRunIds = new Set(); for (const w of waiters) { - await sql` + const inserted = await tx` INSERT INTO ${workflowSignalsTable} ( "namespace_id", "id", "signal", "data", "sender_idempotency_key", "workflow_run_id", @@ -296,19 +297,27 @@ export class BackendPostgres implements Backend { ${this.namespaceId}, gen_random_uuid(), ${params.signal}, - ${sql.json(params.data)}, + ${tx.json(params.data)}, ${params.idempotencyKey}, ${w.workflowRunId}, ${w.id}, NOW() ) ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING + RETURNING "workflow_run_id" `; + if (inserted.length > 0) { + deliveredRunIds.add(w.workflowRunId); + } + } + + if (deliveredRunIds.size === 0) { + return { workflowRunIds: [] }; } // wake each waiting workflow run - const runIds = [...new Set(waiters.map((w) => w.workflowRunId))]; - await sql` + const runIds = [...deliveredRunIds]; + await tx` UPDATE ${workflowRunsTable} SET "available_at" = CASE diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 2c53245f..b4620fea 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -227,7 +227,7 @@ export function migrations(schema: string): string[] { ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id"); CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" - ON ${quotedSchema}."workflow_signals" ("namespace_id", "sender_idempotency_key") + ON ${quotedSchema}."workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") WHERE "sender_idempotency_key" IS NOT NULL; CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 30d423f8..4c8f455a 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -248,10 +248,11 @@ export class BackendSqlite implements Backend { const existingStmt = this.db.prepare(` SELECT DISTINCT "workflow_run_id" FROM "workflow_signals" - WHERE "namespace_id" = ? AND "sender_idempotency_key" = ? + WHERE "namespace_id" = ? AND "signal" = ? AND "sender_idempotency_key" = ? `); const existing = existingStmt.all( this.namespaceId, + params.signal, params.idempotencyKey, ) as { workflow_run_id: string }[]; if (existing.length > 0) { @@ -288,8 +289,9 @@ export class BackendSqlite implements Backend { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `); + const deliveredRunIds = new Set(); for (const w of waiters) { - insertStmt.run( + const result = insertStmt.run( this.namespaceId, generateUUID(), params.signal, @@ -299,10 +301,18 @@ export class BackendSqlite implements Backend { w.id, currentTime, ); + if (result.changes > 0) { + deliveredRunIds.add(w.workflow_run_id); + } + } + + if (deliveredRunIds.size === 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: [] }); } // wake each waiting workflow run - const runIds = [...new Set(waiters.map((w) => w.workflow_run_id))]; + const runIds = [...deliveredRunIds]; const wakeStmt = this.db.prepare(` UPDATE "workflow_runs" SET diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index 155802ad..8d11736f 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -213,7 +213,7 @@ export function migrations(): string[] { ON "workflow_signals" ("namespace_id", "step_attempt_id"); CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" - ON "workflow_signals" ("namespace_id", "sender_idempotency_key") + ON "workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") WHERE "sender_idempotency_key" IS NOT NULL; CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 448c63bc..490a4ad1 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -127,8 +127,8 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { }; /** - * Retry policy for workflow step failures (no retries - the child workflow - * is responsible for retries). + * Retry policy for terminal/non-retryable step failures (no retries, the + * caller or child workflow is responsible for handling retries). */ const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, From 03724ef06293981948d93dd7b1af3566514d53fb Mon Sep 17 00:00:00 2001 From: James Martinez Date: Tue, 31 Mar 2026 17:19:25 -0500 Subject: [PATCH 04/11] feat: more tests --- packages/openworkflow/sqlite/backend.test.ts | 32 +++ .../openworkflow/worker/execution.test.ts | 239 ++++++++++++++++++ 2 files changed, 271 insertions(+) diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 05e07d61..81ca4f16 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -566,3 +566,35 @@ describe("BackendSqlite workflow wake-up reconciliation", () => { } }); }); + +describe("BackendSqlite.sendSignal error handling", () => { + test("rolls back transaction and rethrows when an error occurs inside sendSignal", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + try { + const internalBackend = backend as unknown as { + db: Database; + }; + // Make prepare throw to trigger the catch/rollback path. + vi.spyOn(internalBackend.db, "prepare").mockImplementation(() => { + // Restore immediately so the rollback can work + vi.restoreAllMocks(); + throw new Error("simulated prepare failure"); + }); + + expect(() => + backend.sendSignal({ + signal: "test-signal", + data: null, + idempotencyKey: null, + }), + ).toThrow("simulated prepare failure"); + + vi.restoreAllMocks(); + } finally { + await backend.stop(); + } + }); +}); diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 3f88af21..066a1d70 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2764,6 +2764,166 @@ describe("StepExecutor", () => { const result = await handle.result(); expect(result).toEqual({ data: { x: 1 }, value: "ok" }); }); + + test("sendSignal replays from cache when followed by a sleep step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-send-cache-replay-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: "no-one-listening", + data: null, + }); + // sleep forces a park; on resume the workflow replays and sendSignal + // hits the cached step attempt path. + await step.sleep("pause", "10ms"); + return result; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ workflowRunIds: [] }); + }); + + test("waitForSignal replays from cache when followed by a sleep step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `wait-cache-replay-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-wait-cache-replay-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + // sleep forces a park; on resume waitForSignal hits the cached path + await step.sleep("pause", "10ms"); + return data; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + await client.sendSignal({ signal: signalString, data: { cached: true } }); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ cached: true }); + }); + + test("waitForSignal throws when another step already waits on the same signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `dup-signal-${randomUUID()}`; + + // first execution: create signal-wait step "waiter-a" on signalString, + // parks. Then we force a re-execution. Constructor loads running "waiter-a" + // → activeSignalWaitStepNameBySignal maps signalString → "waiter-a". + // Workflow code now calls waitForSignal with name "waiter-b" on the same + // signal → duplicate signal guard fires. + let firstExecution = true; + const workflow = client.defineWorkflow( + { name: `signal-dup-waiter-${randomUUID()}` }, + async ({ step }) => { + if (firstExecution) { + firstExecution = false; + await step.waitForSignal({ + name: "waiter-a", + signal: signalString, + timeout: 30_000, + }); + } else { + await step.waitForSignal({ + name: "waiter-b", + signal: signalString, + timeout: 30_000, + }); + } + return "done"; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + // first tick: creates waiter-a, parks with availableAt ~30s in the future. + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + // force re-execution by resetting availableAt to now via direct SQL. + const pg = ( + backend as unknown as { + pg: { unsafe: (q: string, p?: unknown[]) => Promise }; + } + ).pg; + await pg.unsafe( + `UPDATE "openworkflow"."workflow_runs" SET "available_at" = NOW() WHERE "id" = $1`, + [handle.workflowRun.id], + ); + + // second tick: workflow code calls waiter-b on same signal → fails + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + }); + + test("sendSignal step fails when backend.sendSignal throws", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + // spy on the backend to force an error during signal send + const sendSignalSpy = vi + .spyOn(backend, "sendSignal") + .mockRejectedValueOnce(new Error("signal delivery failed")); + + const workflow = client.defineWorkflow( + { name: `signal-send-error-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: "will-fail", + data: null, + }); + return result; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + sendSignalSpy.mockRestore(); + }); }); describe("executeWorkflow", () => { @@ -3542,6 +3702,85 @@ describe("executeWorkflow", () => { expect(snapshots[0]).toEqual(snapshots[1]); }); }); + + describe("signal step resume from running attempt", () => { + test("sendSignal resolves a pre-existing running signal-send step attempt", async () => { + const runningAttempt = createMockStepAttempt({ + id: "running-signal-send", + stepName: "notify", + kind: "signal-send", + status: "running", + output: null, + finishedAt: null, + }); + + const listStepAttempts = vi.fn(() => + Promise.resolve({ + data: [runningAttempt], + pagination: { next: null, prev: null }, + }), + ); + const sendSignalMock = vi.fn(() => + Promise.resolve({ workflowRunIds: ["run-abc"] }), + ); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + stepName: "notify", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const completeWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "completed", + output: params.output ?? null, + }), + ), + ); + + const workflowRun = createMockWorkflowRun({ + id: "signal-send-resume-run", + workerId: "worker-resume", + }); + + await executeWorkflow({ + backend: { + listStepAttempts, + sendSignal: sendSignalMock, + completeStepAttempt, + completeWorkflowRun, + } as unknown as Backend, + workflowRun, + workflowFn: async ({ step }) => { + const result = await step.sendSignal({ + signal: "notify", + data: { v: 1 }, + }); + return result; + }, + workflowVersion: null, + workerId: "worker-resume", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(sendSignalMock).toHaveBeenCalled(); + expect(completeStepAttempt).toHaveBeenCalledWith( + expect.objectContaining({ stepAttemptId: "running-signal-send" }), + ); + expect(completeWorkflowRun).toHaveBeenCalledWith( + expect.objectContaining({ + output: { workflowRunIds: ["run-abc"] }, + }), + ); + }); + }); }); describe("createStepExecutionStateFromAttempts", () => { From bbb3c4a8029e69183b35ec0bd12e536f69068a31 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 2 Apr 2026 06:53:04 -0500 Subject: [PATCH 05/11] feat: feedback --- packages/openworkflow/worker/execution.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 490a4ad1..319e921c 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -127,8 +127,9 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { }; /** - * Retry policy for terminal/non-retryable step failures (no retries, the - * caller or child workflow is responsible for handling retries). + * No-retry policy for terminal/non-retryable steps (child-workflow results, + * signal sends, signal waits). The caller or child workflow is responsible + * for handling retries. */ const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, From 1df7a40d2ba57aafa7bfa4ef9ebe6a4a651b4318 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 2 Apr 2026 07:01:55 -0500 Subject: [PATCH 06/11] feat: feedback --- packages/dashboard/src/routes/runs/$runId.tsx | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index 0a22579e..10beee0f 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -352,7 +352,7 @@ function RunDetailsPage() { const config = STEP_STATUS_CONFIG[step.status]; const StatusIcon = config.icon; const iconColor = config.color; - const stepTypeLabel = formatStepKindLabel(step.kind); + const stepTypeLabel = step.kind.replaceAll("-", " "); const stepDuration = computeDuration( step.startedAt, step.finishedAt, @@ -1044,20 +1044,6 @@ function getDefaultSelectedStepId( return steps.at(-1)?.id ?? null; } -function formatStepKindLabel(kind: string): string { - switch (kind) { - case "signal-send": { - return "signal send"; - } - case "signal-wait": { - return "signal wait"; - } - default: { - return kind; - } - } -} - function getRunStatusHelp(status: string): string { switch (status as WorkflowRunStatus) { case "pending": { From 9de1d0b34ca76a1ecc7d629f1d499ab8f6b216ad Mon Sep 17 00:00:00 2001 From: James Martinez Date: Thu, 2 Apr 2026 16:13:48 -0500 Subject: [PATCH 07/11] feat: feedback --- packages/openworkflow/worker/execution.ts | 59 ++++++----------------- 1 file changed, 16 insertions(+), 43 deletions(-) diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 319e921c..f341857f 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -346,26 +346,15 @@ function getRunningWaitAttemptResumeAt( return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; } - if (attempt.kind === "signal-wait") { - const timeoutAt = - getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); - return Number.isFinite(timeoutAt.getTime()) - ? timeoutAt - : defaultWaitTimeoutAt(attempt.createdAt); - } - - if (attempt.kind !== "workflow") { + if (attempt.kind !== "signal-wait" && attempt.kind !== "workflow") { return null; } const timeoutAt = getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); - if (Number.isFinite(timeoutAt.getTime())) { - return timeoutAt; - } - - // Backward compatibility for malformed historical workflow timeout values. - return defaultWaitTimeoutAt(attempt.createdAt); + return Number.isFinite(timeoutAt.getTime()) + ? timeoutAt + : defaultWaitTimeoutAt(attempt.createdAt); } /** @@ -531,7 +520,6 @@ class StepExecutor implements StepApi { private readonly expectedNextStepIndexByName: Map; private readonly resolvedStepNames: Set; private readonly executionFence: ExecutionFenceController; - private readonly activeSignalWaitStepNameBySignal: Map; constructor(options: Readonly) { this.backend = options.backend; @@ -548,20 +536,6 @@ class StepExecutor implements StepApi { this.expectedNextStepIndexByName = new Map(); this.resolvedStepNames = new Set(); this.executionFence = options.executionFence; - - // build signal → step name map from running signal-wait attempts. - this.activeSignalWaitStepNameBySignal = new Map(); - for (const [, attempt] of state.runningByStepName) { - if ( - attempt.kind === "signal-wait" && - attempt.context?.kind === "signal-wait" - ) { - this.activeSignalWaitStepNameBySignal.set( - attempt.context.signal, - attempt.stepName, - ); - } - } } private assertExecutionActive(): void { @@ -1070,12 +1044,11 @@ class StepExecutor implements StepApi { idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName), }); - const output = { workflowRunIds: result.workflowRunIds }; const completed = await this.backend.completeStepAttempt({ workflowRunId: this.workflowRunId, stepAttemptId: attempt.id, workerId: this.workerId, - output, + output: { ...result }, }); this.cache = addToStepAttemptCache(this.cache, completed); this.runningByStepName.delete(stepName); @@ -1116,13 +1089,17 @@ class StepExecutor implements StepApi { ); } - const existingWaiter = this.activeSignalWaitStepNameBySignal.get( - options.signal, - ); - if (existingWaiter && existingWaiter !== stepName) { - throw new Error( - `Signal "${options.signal}" is already being waited on by step "${existingWaiter}"`, - ); + for (const [name, a] of this.runningByStepName) { + if ( + name !== stepName && + a.kind === "signal-wait" && + a.context?.kind === "signal-wait" && + a.context.signal === options.signal + ) { + throw new Error( + `Signal "${options.signal}" is already being waited on by step "${name}"`, + ); + } } const timeoutAt = resolveWaitTimeoutAt(options.timeout); @@ -1138,7 +1115,6 @@ class StepExecutor implements StepApi { }); this.stepCount += 1; this.runningByStepName.set(stepName, attempt); - this.activeSignalWaitStepNameBySignal.set(options.signal, stepName); return await this.resolveSignalWait(stepName, attempt, options); } @@ -1208,9 +1184,6 @@ class StepExecutor implements StepApi { }); this.cache = addToStepAttemptCache(this.cache, completed); this.runningByStepName.delete(attempt.stepName); - if (attempt.context?.kind === "signal-wait") { - this.activeSignalWaitStepNameBySignal.delete(attempt.context.signal); - } return completed.output as Output | null; } From a8a60d026e99eda09dc017fbaae457d965ca6a54 Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 3 Apr 2026 08:48:44 -0500 Subject: [PATCH 08/11] feat: feedback --- .../openworkflow/worker/execution.test.ts | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 066a1d70..14be533c 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2574,6 +2574,60 @@ describe("StepExecutor", () => { expect(result).toBeNull(); }); + test("waitForSignal handles legacy null timeoutAt in signal-wait context", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-wait-null-timeout-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: `legacy-null-${randomUUID()}`, + timeout: 1, // expires immediately + }); + return data; + }, + ); + + // corrupt the signal-wait context to have null timeoutAt (simulates + // legacy persisted data where timeoutAt was missing) and backdate + // createdAt so the 1-year default fallback has already expired. + const twoYearsAgo = new Date(); + twoYearsAgo.setFullYear(twoYearsAgo.getFullYear() - 2); + const originalCreateStepAttempt = backend.createStepAttempt.bind(backend); + const createStepAttemptSpy = vi + .spyOn(backend, "createStepAttempt") + .mockImplementation(async (params) => { + const attempt = await originalCreateStepAttempt(params); + if (attempt.kind === "signal-wait") { + return { + ...attempt, + createdAt: twoYearsAgo, + context: null, + }; + } + return attempt; + }); + + try { + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toBeNull(); + } finally { + createStepAttemptSpy.mockRestore(); + } + }); + test("waitForSignal validates data against schema", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); From 2370ff104a9b86e8c8246f43668f7977695dbd7f Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 3 Apr 2026 10:36:40 -0500 Subject: [PATCH 09/11] feat: feedback --- packages/docs/docs/signals.mdx | 5 +++-- packages/openworkflow/worker/execution.test.ts | 12 ++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx index 4c999f7f..4a2a25b5 100644 --- a/packages/docs/docs/signals.mdx +++ b/packages/docs/docs/signals.mdx @@ -176,8 +176,9 @@ await ow.sendSignal({ }); ``` -If a signal with the same idempotency key has already been sent, the call -returns the original result without re-delivering. +If a signal with the same idempotency key has already been sent and delivered +to at least one waiter, the call returns the original result without +re-delivering. ## Fan-Out: One Signal, Many Waiters diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 14be533c..3af6ba5b 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2574,24 +2574,24 @@ describe("StepExecutor", () => { expect(result).toBeNull(); }); - test("waitForSignal handles legacy null timeoutAt in signal-wait context", async () => { + test("waitForSignal handles missing context gracefully", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); const workflow = client.defineWorkflow( - { name: `signal-wait-null-timeout-${randomUUID()}` }, + { name: `signal-wait-null-context-${randomUUID()}` }, async ({ step }) => { const data = await step.waitForSignal({ - signal: `legacy-null-${randomUUID()}`, + signal: `missing-ctx-${randomUUID()}`, timeout: 1, // expires immediately }); return data; }, ); - // corrupt the signal-wait context to have null timeoutAt (simulates - // legacy persisted data where timeoutAt was missing) and backdate - // createdAt so the 1-year default fallback has already expired. + // corrupt the signal-wait step attempt to have a null context (simulates + // a corrupted or partially-written row) and backdate createdAt so the + // 1-year default fallback timeout has already expired. const twoYearsAgo = new Date(); twoYearsAgo.setFullYear(twoYearsAgo.getFullYear() - 2); const originalCreateStepAttempt = backend.createStepAttempt.bind(backend); From a7089cee9939fbe947b537ef295243be19752e6d Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 3 Apr 2026 13:14:34 -0500 Subject: [PATCH 10/11] feat: wrap signal result in envelope --- ARCHITECTURE.md | 4 ++-- packages/docs/docs/signals.mdx | 12 +++++------ packages/docs/docs/steps.mdx | 4 ++-- .../openworkflow/core/workflow-function.ts | 2 +- .../openworkflow/worker/execution.test.ts | 11 ++++++---- packages/openworkflow/worker/execution.ts | 21 +++++++++---------- 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d824cc1c..f1350623 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -254,10 +254,10 @@ await step.sendSignal({ **`step.waitForSignal(options)`**: Parks the workflow until a matching signal arrives or the timeout expires. When a signal is sent targeting this signal name, a delivery row is written to `workflow_signals` and the workflow is woken. -If the timeout expires first, the step resolves with `null`. +Returns `{ data }` on delivery, or `null` on timeout. ```ts -const data = await step.waitForSignal({ +const result = await step.waitForSignal({ signal: `approval:${orderId}`, timeout: "7d", }); diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx index 4a2a25b5..976a77e3 100644 --- a/packages/docs/docs/signals.mdx +++ b/packages/docs/docs/signals.mdx @@ -37,7 +37,7 @@ export const approvalWorkflow = defineWorkflow( timeout: "7d", }); - if (decision?.approved) { + if (decision?.data.approved) { await step.run({ name: "process-order" }, async () => { await orders.process(input.orderId); }); @@ -157,7 +157,7 @@ const decision = await step.waitForSignal({ timeout: "7d", }); -// `decision` is typed as { approved: boolean; reviewedBy: string } | null +// `decision` is typed as { data: { approved: boolean; reviewedBy: string } } | null ``` If the signal data doesn't match the schema, the step fails permanently (no @@ -188,12 +188,12 @@ that signal name. This makes fan-out coordination straightforward: ```ts // Multiple workflows waiting on the same signal const workflowA = defineWorkflow({ name: "listener-a" }, async ({ step }) => { - const data = await step.waitForSignal({ signal: "config-updated" }); + const config = await step.waitForSignal({ signal: "config-updated" }); // handle update }); const workflowB = defineWorkflow({ name: "listener-b" }, async ({ step }) => { - const data = await step.waitForSignal({ signal: "config-updated" }); + const config = await step.waitForSignal({ signal: "config-updated" }); // handle update }); @@ -239,7 +239,7 @@ export const purchaseWorkflow = defineWorkflow( timeout: "3d", }); - if (!approval?.approved) { + if (!approval?.data.approved) { await step.run({ name: "notify-rejected" }, async () => { await email.send({ to: input.requesterEmail, @@ -338,7 +338,7 @@ const consumer = defineWorkflow( if (notification) { await step.run({ name: "process-data" }, async () => { - await processRecords(input.batchId, notification.recordCount); + await processRecords(input.batchId, notification.data.recordCount); }); } }, diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 5ceffd71..8d00cbdd 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -164,11 +164,11 @@ const { workflowRunIds } = await step.sendSignal({ Pauses the workflow until a matching signal is sent, or the timeout expires: ```ts -const data = await step.waitForSignal({ +const result = await step.waitForSignal({ signal: `approval:${orderId}`, timeout: "7d", }); -// data is the signal payload, or null if timed out +// result is { data } or null if timed out ``` See [Signals](/docs/signals) for details. diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index dc322a61..6fc0315b 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -78,7 +78,7 @@ export interface StepApi { timeout?: StepWaitTimeout; schema?: StandardSchemaV1; }>, - ) => Promise; + ) => Promise<{ data: Output } | null>; } /** diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 3af6ba5b..11d0d805 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -2539,7 +2539,7 @@ describe("StepExecutor", () => { ); expect(waiterStatus).toBe("completed"); const waiterResult = await waiterHandle.result(); - expect(waiterResult).toEqual({ approved: true }); + expect(waiterResult).toEqual({ data: { approved: true } }); }); test("waitForSignal returns null on timeout", async () => { @@ -2664,7 +2664,7 @@ describe("StepExecutor", () => { ); expect(status).toBe("completed"); const result = await waiterHandle.result(); - expect(result).toEqual({ approved: true }); + expect(result).toEqual({ data: { approved: true } }); }); test("waitForSignal fails step when schema validation fails", async () => { @@ -2816,7 +2816,10 @@ describe("StepExecutor", () => { ); expect(status).toBe("completed"); const result = await handle.result(); - expect(result).toEqual({ data: { x: 1 }, value: "ok" }); + expect(result).toEqual({ + data: { data: { x: 1 } }, + value: "ok", + }); }); test("sendSignal replays from cache when followed by a sleep step", async () => { @@ -2884,7 +2887,7 @@ describe("StepExecutor", () => { ); expect(status).toBe("completed"); const result = await handle.result(); - expect(result).toEqual({ cached: true }); + expect(result).toEqual({ data: { cached: true } }); }); test("waitForSignal throws when another step already waits on the same signal", async () => { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index f341857f..04754c53 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -1072,12 +1072,12 @@ class StepExecutor implements StepApi { timeout?: StepWaitTimeout; schema?: StandardSchemaV1; }>, - ): Promise { + ): Promise<{ data: Output } | null> { const stepName = this.resolveStepName(options.name ?? options.signal); const existingAttempt = getCachedStepAttempt(this.cache, stepName); if (existingAttempt) { - return existingAttempt.output as Output | null; + return existingAttempt.output as { data: Output } | null; } const runningAttempt = this.runningByStepName.get(stepName); @@ -1125,7 +1125,7 @@ class StepExecutor implements StepApi { options: Readonly<{ schema?: StandardSchemaV1; }>, - ): Promise { + ): Promise<{ data: Output } | null> { const signalData = await this.backend.getSignalDelivery({ stepAttemptId: attempt.id, }); @@ -1151,10 +1151,9 @@ class StepExecutor implements StepApi { outputValue = validationResult.value; } - return await this.completeSignalWaitStep( - attempt, - normalizeStepOutput(outputValue), - ); + return await this.completeSignalWaitStep(attempt, { + data: normalizeStepOutput(outputValue) as Output, + }); } const timeoutAt = @@ -1169,13 +1168,13 @@ class StepExecutor implements StepApi { /** * Complete a signal-wait step attempt and update internal maps. * @param attempt - Step attempt being completed - * @param output - Output value (null for timeout) + * @param output - Envelope with data, or null for timeout * @returns The completed step output */ private async completeSignalWaitStep( attempt: Readonly, - output: JsonValue | null, - ): Promise { + output: { data: Output } | null, + ): Promise<{ data: Output } | null> { const completed = await this.backend.completeStepAttempt({ workflowRunId: this.workflowRunId, stepAttemptId: attempt.id, @@ -1184,7 +1183,7 @@ class StepExecutor implements StepApi { }); this.cache = addToStepAttemptCache(this.cache, completed); this.runningByStepName.delete(attempt.stepName); - return completed.output as Output | null; + return completed.output as { data: Output } | null; } private ensureStepLimitNotReached(): void { From ae07804670e57b325b4e25029ef568cb2e342a2f Mon Sep 17 00:00:00 2001 From: James Martinez Date: Fri, 3 Apr 2026 15:09:00 -0500 Subject: [PATCH 11/11] feat: type err --- packages/openworkflow/worker/execution.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index 04754c53..66606eca 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -1179,7 +1179,7 @@ class StepExecutor implements StepApi { workflowRunId: this.workflowRunId, stepAttemptId: attempt.id, workerId: this.workerId, - output, + output: output as JsonValue | null, }); this.cache = addToStepAttemptCache(this.cache, completed); this.runningByStepName.delete(attempt.stepName);