diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 87a38b79..f1350623 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -36,6 +36,10 @@ durable execution with minimal operational complexity. discovery paths, and optional ignore patterns for CLI commands. It typically imports the shared `backend` from `openworkflow/client.*` so app code and CLI use the same connection. +- **Signal**: A named, point-in-time message sent to all workflows currently + waiting on that signal name. Signals carry an optional JSON payload. When + sent, a row is written to `workflow_signals` for each waiting workflow and + the workflow is woken. If no workflow is waiting, the signal is dropped. - **`availableAt`**: A critical timestamp on a workflow run that controls its visibility to workers. It is used for scheduling, heartbeating, crash recovery, and durable timers. @@ -105,6 +109,7 @@ of coordination. There is no separate orchestrator server. | | | - workflow_runs | | - step_attempts | + | - workflow_signals | +------------------------------+ ``` @@ -122,9 +127,10 @@ of coordination. There is no separate orchestrator server. `npx @openworkflow/cli worker start` with auto-discovery of workflow files. - **Backend**: The source of truth. It stores workflow runs and step attempts. - The `workflow_runs` table serves as the job queue for the workers, while the - `step_attempts` table serves as a record of started and completed work, - enabling memoization. + The `workflow_runs` table serves as the job queue for the workers, the + `step_attempts` table serves as a record of started and completed work + enabling memoization, and the `workflow_signals` table records signal + deliveries so a waking workflow can read the payload. ### 2.3. Basic Execution Flow @@ -149,7 +155,7 @@ of coordination. There is no separate orchestrator server. `step_attempt` record with status `running`, executes the step function, and then updates the `step_attempt` to `completed` upon completion. The Worker continues executing inline until the workflow code completes or encounters a - sleep. + durable wait such as sleep, child-workflow waiting, or signal waiting. 6. **State Update**: The Worker updates the Backend with each `step_attempt` as it is created and completed, and updates the status of the `workflow_run` (e.g., `completed`, `running` for parked waits). @@ -234,10 +240,34 @@ target workflow name in `spec`) and `options.timeout` controls the wait timeout (default 1y). When the timeout is reached, the parent step fails but the child workflow continues running independently. -All step APIs (`step.run`, `step.sleep`, and `step.runWorkflow`) share the same -collision logic for durable keys. If duplicate base names are encountered in one -execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, -and so on so each step call maps to a distinct step attempt. +**`step.sendSignal(options)`**: Sends a named signal to all workflows currently +waiting on it. The send is recorded as a step attempt so it won't repeat on +replay. If no workflow is waiting, the signal is silently dropped. + +```ts +await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + +**`step.waitForSignal(options)`**: Parks the workflow until a matching signal +arrives or the timeout expires. When a signal is sent targeting this signal +name, a delivery row is written to `workflow_signals` and the workflow is woken. +Returns `{ data }` on delivery, or `null` on timeout. + +```ts +const result = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "7d", +}); +``` + +All step APIs (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, +and `step.waitForSignal`) share the same collision logic for durable keys. If +duplicate base names are encountered in one execution pass, OpenWorkflow +auto-indexes them as `name`, `name:1`, `name:2`, and so on so each step call +maps to a distinct step attempt. ## 4. Error Handling & Retries diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index 4952f4cd..10beee0f 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -352,8 +352,7 @@ function RunDetailsPage() { const config = STEP_STATUS_CONFIG[step.status]; const StatusIcon = config.icon; const iconColor = config.color; - const stepTypeLabel = - step.kind === "function" ? "function" : step.kind; + const stepTypeLabel = step.kind.replaceAll("-", " "); const stepDuration = computeDuration( step.startedAt, step.finishedAt, diff --git a/packages/docs/docs.json b/packages/docs/docs.json index 13eb3f90..4d34e3bf 100644 --- a/packages/docs/docs.json +++ b/packages/docs/docs.json @@ -42,6 +42,7 @@ "docs/parallel-steps", "docs/dynamic-steps", "docs/child-workflows", + "docs/signals", "docs/retries", "docs/type-safety", "docs/versioning", diff --git a/packages/docs/docs/overview.mdx b/packages/docs/docs/overview.mdx index 75ac999c..08edaf17 100644 --- a/packages/docs/docs/overview.mdx +++ b/packages/docs/docs/overview.mdx @@ -37,6 +37,8 @@ work. - **Memoized steps** prevent duplicate side effects on retries - **Durable sleep** (`step.sleep`) pauses runs without holding worker capacity +- **Signals** (`step.sendSignal`, `step.waitForSignal`) enable runtime + communication to & between workflows - **Heartbeats + leases** (`availableAt`) allow automatic crash recovery - **Database as source of truth** avoids a separate orchestration service diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index f97bc187..6a51854c 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -20,10 +20,10 @@ description: What's coming next for OpenWorkflow - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint - ✅ Child workflows (`step.runWorkflow`) +- ✅ Signals (`step.sendSignal`, `step.waitForSignal`) ## Coming Soon -- Signals - Cron / scheduling - Rollback / compensation functions - Priority and concurrency controls diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx new file mode 100644 index 00000000..976a77e3 --- /dev/null +++ b/packages/docs/docs/signals.mdx @@ -0,0 +1,346 @@ +--- +title: Signals +description: Send data between workflows at runtime without polling +--- + +Signals let workflows communicate at runtime. A workflow can pause and wait for +a signal, and another workflow (or your application code) can send that signal +to wake it up with data attached. + +This is useful any time a workflow needs to wait for something that isn't on a +timer: a human approval, a webhook callback, a payment confirmation, or a +coordination message from another workflow. + +## Basic Usage + +### Waiting for a Signal + +Use `step.waitForSignal()` inside a workflow to pause until a matching signal +arrives: + +```ts +import { defineWorkflow } from "openworkflow"; + +export const approvalWorkflow = defineWorkflow( + { name: "approval-workflow" }, + async ({ input, step }) => { + await step.run({ name: "request-approval" }, async () => { + await slack.send({ + channel: "#approvals", + text: `Please approve order ${input.orderId}`, + }); + }); + + // wait until someone sends the "approval" signal + const decision = await step.waitForSignal<{ approved: boolean }>({ + signal: `approval:${input.orderId}`, + timeout: "7d", + }); + + if (decision?.data.approved) { + await step.run({ name: "process-order" }, async () => { + await orders.process(input.orderId); + }); + } + }, +); +``` + +### Sending a Signal + +Send a signal from another workflow using `step.sendSignal()`: + +```ts +export const reviewWorkflow = defineWorkflow( + { name: "review-workflow" }, + async ({ input, step }) => { + const verdict = await step.run({ name: "run-review" }, async () => { + return await reviewService.evaluate(input.orderId); + }); + + await step.sendSignal({ + signal: `approval:${input.orderId}`, + data: { approved: verdict.passed }, + }); + }, +); +``` + +Or send a signal from your application code using the client: + +```ts +import { ow } from "./openworkflow/client"; + +// from an API route, webhook handler, etc. +await ow.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + + + Signals are not buffered. If you send a signal before any workflow is waiting + for it, the signal is lost. + + +## Signal Names + +Signal names are arbitrary strings scoped to the backend namespace. Use +descriptive, unique names — often including an entity ID — to avoid collisions: + +```ts +// Good - scoped to a specific entity +await step.waitForSignal({ signal: `payment:${invoiceId}` }); +await step.waitForSignal({ signal: `approval:order:${orderId}` }); + +// Bad - too generic, could collide across workflows +await step.waitForSignal({ signal: "done" }); +``` + +## Step Names + +Like other step types, signal steps need unique names within a workflow. If you +don't provide one, OpenWorkflow uses the signal name as the step name. + +```ts +// Implicit — step name defaults to signal name +await step.waitForSignal({ signal: `payment:${invoiceId}` }); + +// Explicit step name +await step.waitForSignal({ + name: "wait-for-payment", + signal: `payment:${invoiceId}`, +}); +``` + +## Timeout + +`step.waitForSignal` accepts an optional `timeout`. If the signal doesn't +arrive before the timeout, the step resolves with `null` instead of blocking +forever. + +```ts +const result = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "24h", +}); + +if (result === null) { + // timed out — no signal arrived within 24 hours + await step.run({ name: "escalate" }, async () => { + await alerts.send("Approval timed out"); + }); +} +``` + +`timeout` accepts a [duration string](/docs/sleeping#duration-formats), a +number of milliseconds, or a `Date`. + +If no timeout is specified, the wait defaults to **1 year**. + +## Schema Validation + +Validate signal payloads at receive time using any +[Standard Schema](/docs/standard-schema) compatible validator: + +```ts +import { z } from "zod"; + +const approvalSchema = z.object({ + approved: z.boolean(), + reviewedBy: z.string(), +}); + +const decision = await step.waitForSignal({ + signal: `approval:${orderId}`, + schema: approvalSchema, + timeout: "7d", +}); + +// `decision` is typed as { data: { approved: boolean; reviewedBy: string } } | null +``` + +If the signal data doesn't match the schema, the step fails permanently (no +retries) to surface the contract violation immediately. + +## Idempotency + +When sending signals from the client, you can provide an idempotency key to +safely retry without delivering the signal twice: + +```ts +await ow.sendSignal({ + signal: `payment:${invoiceId}`, + data: { amount: 99.99 }, + idempotencyKey: `payment-confirmed:${invoiceId}`, +}); +``` + +If a signal with the same idempotency key has already been sent and delivered +to at least one waiter, the call returns the original result without +re-delivering. + +## Fan-Out: One Signal, Many Waiters + +A single `sendSignal` call delivers to **every** workflow currently waiting on +that signal name. This makes fan-out coordination straightforward: + +```ts +// Multiple workflows waiting on the same signal +const workflowA = defineWorkflow({ name: "listener-a" }, async ({ step }) => { + const config = await step.waitForSignal({ signal: "config-updated" }); + // handle update +}); + +const workflowB = defineWorkflow({ name: "listener-b" }, async ({ step }) => { + const config = await step.waitForSignal({ signal: "config-updated" }); + // handle update +}); + +// One signal wakes both workflows +await ow.sendSignal({ + signal: "config-updated", + data: { version: 42 }, +}); +``` + +## Sending Signals from Workflows + +Use `step.sendSignal()` inside a workflow to send signals durably. The send is +recorded as a step attempt, so it won't be repeated on replay: + +```ts +const result = await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); + +// result.workflowRunIds contains IDs of workflows that received the signal +``` + +## Common Patterns + +### Human-in-the-Loop Approval + +```ts +export const purchaseWorkflow = defineWorkflow( + { name: "purchase" }, + async ({ input, step }) => { + await step.run({ name: "send-approval-request" }, async () => { + await email.send({ + to: input.managerEmail, + subject: `Approve purchase: $${input.amount}`, + body: `Click to approve: ${approvalUrl(input.purchaseId)}`, + }); + }); + + const approval = await step.waitForSignal<{ approved: boolean }>({ + signal: `purchase-approval:${input.purchaseId}`, + timeout: "3d", + }); + + if (!approval?.data.approved) { + await step.run({ name: "notify-rejected" }, async () => { + await email.send({ + to: input.requesterEmail, + subject: "Purchase request denied", + }); + }); + return { status: "rejected" }; + } + + await step.run({ name: "process-purchase" }, async () => { + await purchasing.submit(input.purchaseId); + }); + + return { status: "approved" }; + }, +); + +// in your API route handler: +app.post("/approve/:purchaseId", async (req, res) => { + await ow.sendSignal({ + signal: `purchase-approval:${req.params.purchaseId}`, + data: { approved: req.body.approved }, + }); + res.json({ ok: true }); +}); +``` + +### Webhook Callback + +```ts +export const paymentWorkflow = defineWorkflow( + { name: "payment" }, + async ({ input, step }) => { + const checkout = await step.run({ name: "create-checkout" }, async () => { + return await stripe.checkout.sessions.create({ + metadata: { workflowSignal: `payment:${input.orderId}` }, + // ... + }); + }); + + const payment = await step.waitForSignal({ + signal: `payment:${input.orderId}`, + timeout: "1h", + }); + + if (!payment) { + return { status: "expired" }; + } + + await step.run({ name: "fulfill-order" }, async () => { + await orders.fulfill(input.orderId); + }); + + return { status: "paid" }; + }, +); + +// in your Stripe webhook handler: +app.post("/webhooks/stripe", async (req, res) => { + const event = req.body; + if (event.type === "checkout.session.completed") { + await ow.sendSignal({ + signal: event.data.object.metadata.workflowSignal, + data: { sessionId: event.data.object.id }, + idempotencyKey: event.id, + }); + } + res.sendStatus(200); +}); +``` + +### Workflow-to-Workflow Coordination + +```ts +const producer = defineWorkflow( + { name: "data-producer" }, + async ({ input, step }) => { + const data = await step.run({ name: "generate-data" }, async () => { + return await heavyComputation(input); + }); + + await step.sendSignal({ + signal: `data-ready:${input.batchId}`, + data: { recordCount: data.length }, + }); + }, +); + +const consumer = defineWorkflow( + { name: "data-consumer" }, + async ({ input, step }) => { + const notification = await step.waitForSignal({ + signal: `data-ready:${input.batchId}`, + timeout: "1h", + }); + + if (notification) { + await step.run({ name: "process-data" }, async () => { + await processRecords(input.batchId, notification.data.recordCount); + }); + } + }, +); +``` diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 5eaa2f80..8d00cbdd 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -115,7 +115,7 @@ await step.run({ name: "log-event" }, async () => { ## Step Types -OpenWorkflow provides three step types: +OpenWorkflow provides five step types: ### `step.run()` @@ -148,6 +148,31 @@ const childOutput = await step.runWorkflow( ); ``` +### `step.sendSignal()` + +Sends a signal to all workflows currently waiting on that signal name: + +```ts +const { workflowRunIds } = await step.sendSignal({ + signal: `approval:${orderId}`, + data: { approved: true }, +}); +``` + +### `step.waitForSignal()` + +Pauses the workflow until a matching signal is sent, or the timeout expires: + +```ts +const result = await step.waitForSignal({ + signal: `approval:${orderId}`, + timeout: "7d", +}); +// result is { data } or null if timed out +``` + +See [Signals](/docs/signals) for details. + ## Retry Policy (Optional) Control backoff and retry limits for an individual step: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 5d8ae39e..ca480568 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -213,12 +213,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | --------------------------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | -------------------------------------------------------------------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, `step.waitForSignal`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index bb936dce..7f7029d0 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -1,5 +1,6 @@ -import type { Backend } from "../core/backend.js"; +import type { Backend, SendSignalResult } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; +import { JsonValue } from "../core/json.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; import { calculateDateFromDuration } from "../core/step-attempt.js"; import { @@ -173,6 +174,33 @@ export class OpenWorkflow { async cancelWorkflowRun(workflowRunId: string): Promise { await this.backend.cancelWorkflowRun({ workflowRunId }); } + + /** + * Send a signal to all workflows currently waiting on the given signal + * string. If no workflow is waiting, the signal is silently dropped. + * @param options - Signal options + * @returns IDs of workflow runs that received the signal + * @example + * ```ts + * const { workflowRunIds } = await ow.sendSignal({ + * signal: "approval:order_456", + * data: { approved: true }, + * }); + * ``` + */ + async sendSignal( + options: Readonly<{ + signal: string; + data?: JsonValue; + idempotencyKey?: string; + }>, + ): Promise { + return this.backend.sendSignal({ + signal: options.signal, + data: options.data ?? null, + idempotencyKey: options.idempotencyKey ?? null, + }); + } } /** diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index d79dc5e6..9edf79be 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -68,6 +68,12 @@ export interface Backend { params: Readonly, ): Promise; + // Signals + sendSignal(params: Readonly): Promise; + getSignalDelivery( + params: Readonly, + ): Promise; + // Lifecycle stop(): Promise; } @@ -173,6 +179,20 @@ export interface SetStepAttemptChildWorkflowRunParams { childWorkflowRunId: string; } +export interface SendSignalParams { + signal: string; + data: JsonValue | null; + idempotencyKey: string | null; +} + +export interface SendSignalResult { + workflowRunIds: string[]; +} + +export interface GetSignalDeliveryParams { + stepAttemptId: string; +} + export interface PaginationOptions { limit?: number; after?: string; diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts index ce5f207f..502693bd 100644 --- a/packages/openworkflow/core/step-attempt.test.ts +++ b/packages/openworkflow/core/step-attempt.test.ts @@ -7,6 +7,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalWaitContext, } from "./step-attempt.js"; import type { StepAttempt, StepAttemptCache } from "./step-attempt.js"; import { describe, expect, test } from "vitest"; @@ -339,6 +340,26 @@ describe("createWorkflowContext", () => { }); }); +describe("createSignalWaitContext", () => { + test("creates signal-wait context with signal and timeout", () => { + const timeoutAt = new Date("2025-06-15T10:30:00.000Z"); + const context = createSignalWaitContext("approval:order_456", timeoutAt); + + expect(context).toEqual({ + kind: "signal-wait", + signal: "approval:order_456", + timeoutAt: "2025-06-15T10:30:00.000Z", + }); + }); + + test("preserves millisecond precision in timeout", () => { + const timeoutAt = new Date("2025-01-01T00:00:00.123Z"); + const context = createSignalWaitContext("test-signal", timeoutAt); + + expect(context.timeoutAt).toBe("2025-01-01T00:00:00.123Z"); + }); +}); + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 888120ac..3730db67 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,12 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "workflow"; +export type StepKind = + | "function" + | "sleep" + | "workflow" + | "signal-send" + | "signal-wait"; /** * Status of a step attempt through its lifecycle. @@ -34,12 +39,22 @@ export interface WorkflowStepAttemptContext { timeoutAt: string | null; } +/** + * Context for a signal-wait step attempt. + */ +export interface SignalWaitStepAttemptContext { + kind: "signal-wait"; + signal: string; + timeoutAt: string; +} + /** * Context for a step attempt. */ export type StepAttemptContext = | SleepStepAttemptContext - | WorkflowStepAttemptContext; + | WorkflowStepAttemptContext + | SignalWaitStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -171,3 +186,20 @@ export function createWorkflowContext( timeoutAt: timeoutAt?.toISOString() ?? null, }; } + +/** + * Create the context object for a signal-wait step attempt. + * @param signal - Signal address string + * @param timeoutAt - Wait timeout deadline + * @returns The context object for the signal-wait step + */ +export function createSignalWaitContext( + signal: string, + timeoutAt: Readonly, +): SignalWaitStepAttemptContext { + return { + kind: "signal-wait", + signal, + timeoutAt: timeoutAt.toISOString(), + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 01745679..6fc0315b 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,7 +1,15 @@ import type { DurationString } from "./duration.js"; +import type { JsonValue } from "./json.js"; +import type { StandardSchemaV1 } from "./standard-schema.js"; import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; +/** + * Timeout for a step wait. Accepts milliseconds (number), a duration string + * (e.g. "5m", "1h"), or an absolute Date. + */ +export type StepWaitTimeout = number | string | Date; + /** * Config for an individual step defined with `step.run()`. */ @@ -37,13 +45,13 @@ export interface StepRunWorkflowOptions { /** * Maximum time to wait for the child workflow to complete. */ - timeout?: number | string | Date; + timeout?: StepWaitTimeout; } /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, - * and `step.runWorkflow()`. + * `step.runWorkflow()`, `step.sendSignal()`, and `step.waitForSignal()`. */ export interface StepApi { run: ( @@ -56,6 +64,21 @@ export interface StepApi { options?: Readonly, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + sendSignal: ( + options: Readonly<{ + name?: string; + signal: string; + data?: JsonValue; + }>, + ) => Promise<{ workflowRunIds: string[] }>; + waitForSignal: ( + options: Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ) => Promise<{ data: Output } | null>; } /** diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index c2ac3187..6f0f03b2 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -21,6 +21,9 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + SendSignalParams, + SendSignalResult, + GetSignalDeliveryParams, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; import { JsonValue } from "../core/json.js"; @@ -247,6 +250,108 @@ export class BackendPostgres implements Backend { return workflowRun ?? null; } + async sendSignal(params: SendSignalParams): Promise { + return await this.pg.begin(async (sql): Promise => { + const tx = sql as unknown as Postgres; + const stepAttemptsTable = this.stepAttemptsTable(tx); + const workflowSignalsTable = this.workflowSignalsTable(tx); + + if (params.idempotencyKey !== null) { + const existing = await tx<{ workflowRunId: string }[]>` + SELECT DISTINCT "workflow_run_id" AS "workflowRunId" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "signal" = ${params.signal} + AND "sender_idempotency_key" = ${params.idempotencyKey} + `; + if (existing.length > 0) { + return { workflowRunIds: existing.map((r) => r.workflowRunId) }; + } + } + const workflowRunsTable = this.workflowRunsTable(tx); + + // find active waiting step attempts & insert a signal delivery row for each + const waiters = await tx<{ id: string; workflowRunId: string }[]>` + SELECT "id", "workflow_run_id" AS "workflowRunId" + FROM ${stepAttemptsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "kind" = 'signal-wait' + AND "status" = 'running' + AND "context"->>'signal' = ${params.signal} + FOR UPDATE + `; + + if (waiters.length === 0) { + return { workflowRunIds: [] }; + } + + const deliveredRunIds = new Set(); + for (const w of waiters) { + const inserted = await tx` + INSERT INTO ${workflowSignalsTable} ( + "namespace_id", "id", "signal", "data", + "sender_idempotency_key", "workflow_run_id", + "step_attempt_id", "created_at" + ) + VALUES ( + ${this.namespaceId}, + gen_random_uuid(), + ${params.signal}, + ${tx.json(params.data)}, + ${params.idempotencyKey}, + ${w.workflowRunId}, + ${w.id}, + NOW() + ) + ON CONFLICT ("namespace_id", "step_attempt_id") DO NOTHING + RETURNING "workflow_run_id" + `; + if (inserted.length > 0) { + deliveredRunIds.add(w.workflowRunId); + } + } + + if (deliveredRunIds.size === 0) { + return { workflowRunIds: [] }; + } + + // wake each waiting workflow run + const runIds = [...deliveredRunIds]; + await tx` + UPDATE ${workflowRunsTable} + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > NOW() + THEN NOW() + ELSE "available_at" + END, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ANY(${runIds}) + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `; + + return { workflowRunIds: runIds }; + }); + } + + async getSignalDelivery( + params: GetSignalDeliveryParams, + ): Promise { + const workflowSignalsTable = this.workflowSignalsTable(); + + const [row] = await this.pg<{ data: JsonValue | null }[]>` + SELECT "data" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "step_attempt_id" = ${params.stepAttemptId} + LIMIT 1 + `; + + return row ? (row.data ?? null) : undefined; + } + async listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { @@ -449,17 +554,30 @@ export class BackendPostgres implements Backend { AND wr."id" = ${workflowRunId} AND wr."status" = 'running' AND wr."worker_id" IS NULL - AND EXISTS ( - SELECT 1 - FROM ${stepAttemptsTable} sa - JOIN ${workflowRunsTable} child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = wr."namespace_id" - AND sa."workflow_run_id" = wr."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + AND ( + EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${workflowRunsTable} child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${this.workflowSignalsTable()} ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."step_attempt_id" = sa."id" + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + ) ) RETURNING wr.* `; @@ -942,6 +1060,10 @@ export class BackendPostgres implements Backend { private stepAttemptsTable(pg: Postgres = this.pg) { return pg`${pg(this.schema)}.${pg("step_attempts")}`; } + + private workflowSignalsTable(pg: Postgres = this.pg) { + return pg`${pg(this.schema)}.${pg("workflow_signals")}`; + } } /** diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 7d641e44..b4620fea 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -206,6 +206,39 @@ export function migrations(schema: string): string[] { ON CONFLICT DO NOTHING; COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS ${quotedSchema}."workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "signal" TEXT NOT NULL, + "data" JSONB, + "sender_idempotency_key" TEXT, + "workflow_run_id" TEXT NOT NULL, + "step_attempt_id" TEXT NOT NULL, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY ("namespace_id", "id") + ); + + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + ON ${quotedSchema}."workflow_signals" ("namespace_id", "step_attempt_id"); + + CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" + ON ${quotedSchema}."workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") + WHERE "sender_idempotency_key" IS NOT NULL; + + CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" + ON ${quotedSchema}."step_attempts" ("namespace_id", ("context"->>'signal')) + WHERE "kind" = 'signal-wait' AND "status" = 'running'; + + INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") + VALUES (5) + ON CONFLICT DO NOTHING; + + COMMIT;`, ]; } diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 05e07d61..81ca4f16 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -566,3 +566,35 @@ describe("BackendSqlite workflow wake-up reconciliation", () => { } }); }); + +describe("BackendSqlite.sendSignal error handling", () => { + test("rolls back transaction and rethrows when an error occurs inside sendSignal", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + try { + const internalBackend = backend as unknown as { + db: Database; + }; + // Make prepare throw to trigger the catch/rollback path. + vi.spyOn(internalBackend.db, "prepare").mockImplementation(() => { + // Restore immediately so the rollback can work + vi.restoreAllMocks(); + throw new Error("simulated prepare failure"); + }); + + expect(() => + backend.sendSignal({ + signal: "test-signal", + data: null, + idempotencyKey: null, + }), + ).toThrow("simulated prepare failure"); + + vi.restoreAllMocks(); + } finally { + await backend.stop(); + } + }); +}); diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index e12c4aed..4c8f455a 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -20,6 +20,9 @@ import { RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, SleepWorkflowRunParams, + SendSignalParams, + SendSignalResult, + GetSignalDeliveryParams, toWorkflowRunCounts, } from "../core/backend.js"; import { wrapError } from "../core/error.js"; @@ -236,6 +239,131 @@ export class BackendSqlite implements Backend { return Promise.resolve(row ? rowToWorkflowRun(row) : null); } + sendSignal(params: SendSignalParams): Promise { + const currentTime = now(); + + this.db.exec("BEGIN IMMEDIATE"); + try { + if (params.idempotencyKey !== null) { + const existingStmt = this.db.prepare(` + SELECT DISTINCT "workflow_run_id" + FROM "workflow_signals" + WHERE "namespace_id" = ? AND "signal" = ? AND "sender_idempotency_key" = ? + `); + const existing = existingStmt.all( + this.namespaceId, + params.signal, + params.idempotencyKey, + ) as { workflow_run_id: string }[]; + if (existing.length > 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ + workflowRunIds: existing.map((r) => r.workflow_run_id), + }); + } + } + // find active waiting step attempts & insert a signal delivery row for each + const waitersStmt = this.db.prepare(` + SELECT "id", "workflow_run_id" + FROM "step_attempts" + WHERE "namespace_id" = ? + AND "kind" = 'signal-wait' + AND "status" = 'running' + AND json_extract("context", '$.signal') = ? + `); + const waiters = waitersStmt.all(this.namespaceId, params.signal) as { + id: string; + workflow_run_id: string; + }[]; + + if (waiters.length === 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: [] }); + } + + const insertStmt = this.db.prepare(` + INSERT OR IGNORE INTO "workflow_signals" ( + "namespace_id", "id", "signal", "data", + "sender_idempotency_key", "workflow_run_id", + "step_attempt_id", "created_at" + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + const deliveredRunIds = new Set(); + for (const w of waiters) { + const result = insertStmt.run( + this.namespaceId, + generateUUID(), + params.signal, + toJSON(params.data), + params.idempotencyKey, + w.workflow_run_id, + w.id, + currentTime, + ); + if (result.changes > 0) { + deliveredRunIds.add(w.workflow_run_id); + } + } + + if (deliveredRunIds.size === 0) { + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: [] }); + } + + // wake each waiting workflow run + const runIds = [...deliveredRunIds]; + const wakeStmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `); + for (const runId of runIds) { + wakeStmt.run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + runId, + ); + } + + this.db.exec("COMMIT"); + return Promise.resolve({ workflowRunIds: runIds }); + } catch (error) { + this.db.exec("ROLLBACK"); + throw error; + } + } + + getSignalDelivery( + params: GetSignalDeliveryParams, + ): Promise { + const stmt = this.db.prepare(` + SELECT "data" + FROM "workflow_signals" + WHERE "namespace_id" = ? AND "step_attempt_id" = ? + LIMIT 1 + `); + const row = stmt.get(this.namespaceId, params.stepAttemptId) as + | { data: string | null } + | undefined; + + // eslint-disable-next-line unicorn/no-useless-undefined + if (!row) return Promise.resolve(undefined); + return Promise.resolve( + (fromJSON(row.data) as JsonValue | undefined) ?? null, + ); + } + async claimWorkflowRun( params: ClaimWorkflowRunParams, ): Promise { @@ -367,17 +495,30 @@ export class BackendSqlite implements Backend { SET "status" = 'running', "available_at" = CASE - WHEN EXISTS ( - SELECT 1 - FROM "step_attempts" sa - JOIN "workflow_runs" child - ON child."namespace_id" = sa."child_workflow_run_namespace_id" - AND child."id" = sa."child_workflow_run_id" - WHERE sa."namespace_id" = "workflow_runs"."namespace_id" - AND sa."workflow_run_id" = "workflow_runs"."id" - AND sa."kind" = 'workflow' - AND sa."status" = 'running' - AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + WHEN ( + EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_runs" child + ON child."namespace_id" = sa."child_workflow_run_namespace_id" + AND child."id" = sa."child_workflow_run_id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'workflow' + AND sa."status" = 'running' + AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) + OR EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_signals" ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."step_attempt_id" = sa."id" + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + ) ) AND ? > ? THEN ? ELSE ? END, diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index bf3fd163..8d11736f 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -193,6 +193,37 @@ export function migrations(): string[] { VALUES (4); COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS "workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + "signal" TEXT NOT NULL, + "data" TEXT, + "sender_idempotency_key" TEXT, + "workflow_run_id" TEXT NOT NULL, + "step_attempt_id" TEXT NOT NULL, + "created_at" TEXT NOT NULL, + PRIMARY KEY ("namespace_id", "id") + ); + + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_step_attempt_idx" + ON "workflow_signals" ("namespace_id", "step_attempt_id"); + + CREATE INDEX IF NOT EXISTS "workflow_signals_idempotency_idx" + ON "workflow_signals" ("namespace_id", "signal", "sender_idempotency_key") + WHERE "sender_idempotency_key" IS NOT NULL; + + CREATE INDEX IF NOT EXISTS "step_attempts_signal_wait_idx" + ON "step_attempts" ("namespace_id", json_extract("context", '$.signal')) + WHERE "kind" = 'signal-wait' AND "status" = 'running'; + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (5); + + COMMIT;`, ]; } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 6383cb4c..7805a0af 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -1093,7 +1093,61 @@ export function testBackend(options: TestBackendOptions): void { farFuture.getTime(), ); expect(parentAfter.availableAt.getTime()).toBeLessThanOrEqual( - Date.now() + 1000, + Date.now() + 5000, + ); + + await teardown(backend); + }); + + test("wakes a parked run when a signal delivery arrives before parking", async () => { + const backend = await setup(); + + const run = await createClaimedWorkflowRun(backend); + const workerId = run.workerId ?? ""; + const signalString = `reconcile-${randomUUID()}`; + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId, + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + const sendResult = await backend.sendSignal({ + signal: signalString, + data: { value: "arrived-early" }, + idempotencyKey: null, + }); + expect(sendResult.workflowRunIds).toContain(run.id); + + const farFuture = new Date(Date.now() + 5 * 60 * 1000); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId, + availableAt: farFuture, + }); + + const runAfter = await backend.getWorkflowRun({ + workflowRunId: run.id, + }); + expect(runAfter?.status).toBe("running"); + expect(runAfter?.workerId).toBeNull(); + expect(runAfter?.availableAt).not.toBeNull(); + if (!runAfter?.availableAt) { + throw new Error("Expected availableAt after signal reconciliation"); + } + + expect(runAfter.availableAt.getTime()).toBeLessThan( + farFuture.getTime(), + ); + expect(runAfter.availableAt.getTime()).toBeLessThanOrEqual( + Date.now() + 5000, ); await teardown(backend); @@ -2228,6 +2282,288 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); }); + + describe("sendSignal()", () => { + test("returns empty when no active waiters", async () => { + const result = await backend.sendSignal({ + signal: `no-waiters-${randomUUID()}`, + data: { value: 42 }, + idempotencyKey: null, + }); + expect(result.workflowRunIds).toEqual([]); + }); + + test("delivers to one active waiter and wakes run", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `test-signal-${randomUUID()}`; + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + const result = await backend.sendSignal({ + signal: signalString, + data: { approved: true }, + idempotencyKey: null, + }); + + expect(result.workflowRunIds).toContain(run.id); + + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ approved: true }); + + const wokenRun = await backend.getWorkflowRun({ + workflowRunId: run.id, + }); + expect(wokenRun).not.toBeNull(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + expect(wokenRun!.availableAt).not.toBeNull(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + expect(wokenRun!.availableAt!.getTime()).toBeLessThanOrEqual( + Date.now() + 5000, + ); + }); + + test("fans out to multiple waiters", async () => { + const signalString = `fan-out-${randomUUID()}`; + const runs: WorkflowRun[] = []; + + for (let i = 0; i < 3; i++) { + const run = await createClaimedWorkflowRun(backend); + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + runs.push(run); + } + + const result = await backend.sendSignal({ + signal: signalString, + data: "hello", + idempotencyKey: null, + }); + + expect(result.workflowRunIds).toHaveLength(3); + for (const run of runs) { + expect(result.workflowRunIds).toContain(run.id); + } + }); + + test("idempotent send returns same result", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `idempotent-${randomUUID()}`; + const idempotencyKey = randomUUID(); + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + const first = await backend.sendSignal({ + signal: signalString, + data: { val: 1 }, + idempotencyKey, + }); + + const second = await backend.sendSignal({ + signal: signalString, + data: { val: 2 }, + idempotencyKey, + }); + + expect(first.workflowRunIds).toEqual(second.workflowRunIds); + }); + + test("duplicate send to same waiter delivers at most once", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `dup-send-${randomUUID()}`; + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + // send the same signal twice (no idempotency key — different senders) + await backend.sendSignal({ + signal: signalString, + data: { first: true }, + idempotencyKey: null, + }); + await backend.sendSignal({ + signal: signalString, + data: { second: true }, + idempotencyKey: null, + }); + + // only the first delivery should be stored + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ first: true }); + }); + + test("idempotent send does not create duplicate delivery rows", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `idem-dup-${randomUUID()}`; + const idempotencyKey = randomUUID(); + + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + await backend.sendSignal({ + signal: signalString, + data: { original: true }, + idempotencyKey, + }); + + // second send with same idempotency key + await backend.sendSignal({ + signal: signalString, + data: { duplicate: true }, + idempotencyKey, + }); + + // should still return the original delivery, not the duplicate + const delivered = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(delivered).toEqual({ original: true }); + }); + }); + + describe("getSignalDelivery()", () => { + test("returns undefined when no delivery exists", async () => { + const run = await createClaimedWorkflowRun(backend); + const step = await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: `no-delivery-${randomUUID()}`, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + + const result = await backend.getSignalDelivery({ + stepAttemptId: step.id, + }); + expect(result).toBeUndefined(); + }); + + test("returns null data when signal delivered with null", async () => { + const run = await createClaimedWorkflowRun(backend); + const signalString = `null-data-${randomUUID()}`; + + await backend.createStepAttempt({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: "wait-step", + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: signalString, + timeoutAt: newDateInOneYear().toISOString(), + }, + }); + await backend.sleepWorkflowRun({ + workflowRunId: run.id, + workerId: run.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + await backend.sendSignal({ + signal: signalString, + data: null, + idempotencyKey: null, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: run.id, + }); + const step = steps.data.find((s) => s.stepName === "wait-step"); + expect(step).toBeDefined(); + + const result = await backend.getSignalDelivery({ + stepAttemptId: step!.id, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(result).toBeNull(); + }); + }); }); } diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 2dba8674..11d0d805 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -721,7 +721,7 @@ describe("StepExecutor", () => { expect(status).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Workflow timeout must be a non-negative number/, + /Timeout must be a non-negative number/, ); }); @@ -2480,6 +2480,507 @@ describe("StepExecutor", () => { ); expect(childStatus).toBe("completed"); }); + + // ---- step.sendSignal / step.waitForSignal -------------------------------- + + test("sendSignal sends a signal and returns workflow run IDs", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `test-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-waiter-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + return data; + }, + ); + + const sender = client.defineWorkflow( + { name: `signal-sender-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: signalString, + data: { approved: true }, + }); + return result; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + const senderHandle = await sender.run(); + const senderStatus = await tickUntilTerminal( + backend, + worker, + senderHandle.workflowRun.id, + 20, + 50, + ); + expect(senderStatus).toBe("completed"); + + const senderResult = await senderHandle.result(); + expect(senderResult).toEqual({ + workflowRunIds: [waiterHandle.workflowRun.id], + }); + + const waiterStatus = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(waiterStatus).toBe("completed"); + const waiterResult = await waiterHandle.result(); + expect(waiterResult).toEqual({ data: { approved: true } }); + }); + + test("waitForSignal returns null on timeout", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: `never-sent-${randomUUID()}`, + timeout: 1, // 1ms — expires immediately + }); + return data; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + + // first tick: creates the signal-wait step and parks (timeout is in the future at creation time) + // subsequent ticks: timeout has expired, completes with null + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toBeNull(); + }); + + test("waitForSignal handles missing context gracefully", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-wait-null-context-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: `missing-ctx-${randomUUID()}`, + timeout: 1, // expires immediately + }); + return data; + }, + ); + + // corrupt the signal-wait step attempt to have a null context (simulates + // a corrupted or partially-written row) and backdate createdAt so the + // 1-year default fallback timeout has already expired. + const twoYearsAgo = new Date(); + twoYearsAgo.setFullYear(twoYearsAgo.getFullYear() - 2); + const originalCreateStepAttempt = backend.createStepAttempt.bind(backend); + const createStepAttemptSpy = vi + .spyOn(backend, "createStepAttempt") + .mockImplementation(async (params) => { + const attempt = await originalCreateStepAttempt(params); + if (attempt.kind === "signal-wait") { + return { + ...attempt, + createdAt: twoYearsAgo, + context: null, + }; + } + return attempt; + }); + + try { + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toBeNull(); + } finally { + createStepAttemptSpy.mockRestore(); + } + }); + + test("waitForSignal validates data against schema", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `schema-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-schema-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + schema: z.object({ approved: z.boolean() }), + }); + return data; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + // w/ valid data + await client.sendSignal({ + signal: signalString, + data: { approved: true }, + }); + + const status = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await waiterHandle.result(); + expect(result).toEqual({ data: { approved: true } }); + }); + + test("waitForSignal fails step when schema validation fails", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `bad-schema-signal-${randomUUID()}`; + + const waiter = client.defineWorkflow( + { name: `signal-bad-schema-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + schema: z.object({ approved: z.boolean() }), + }); + return data; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const waiterHandle = await waiter.run(); + await tickUntilParked(backend, worker, waiterHandle.workflowRun.id, 20, 50); + + // w/ invalid data + await client.sendSignal({ + signal: signalString, + data: { approved: "not-a-boolean" }, + }); + + const status = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + }); + + test("sendSignal is replay-safe across re-executions", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `replay-signal-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-replay-${randomUUID()}` }, + async ({ step }) => { + const first = await step.sendSignal({ + signal: signalString, + data: { v: 1 }, + }); + const value = await step.run({ name: "compute" }, () => 42); + return { sent: first, value }; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + + const result = await handle.result(); + expect(result).toEqual({ + sent: { workflowRunIds: [] }, + value: 42, + }); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalStep = steps.data.find((s) => s.kind === "signal-send"); + expect(signalStep).toBeDefined(); + expect(signalStep?.status).toBe("completed"); + }); + + test("sendSignal auto-indexes duplicate signal step names", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-autoindex-${randomUUID()}` }, + async ({ step }) => { + await step.sendSignal({ signal: "notify", data: "first" }); + await step.sendSignal({ signal: "notify", data: "second" }); + return "done"; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + + const steps = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 100, + }); + const signalSteps = steps.data + .filter((s) => s.kind === "signal-send") + .map((s) => s.stepName) + .toSorted((a, b) => a.localeCompare(b)); + expect(signalSteps).toEqual(["notify", "notify:1"]); + }); + + test("waitForSignal is replay-safe across re-executions", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `replay-wait-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-wait-replay-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + const value = await step.run({ name: "after-signal" }, () => "ok"); + return { data, value }; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const handle = await workflow.run(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + await client.sendSignal({ signal: signalString, data: { x: 1 } }); + + // resume — the first replay hits the cached waitForSignal path + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ + data: { data: { x: 1 } }, + value: "ok", + }); + }); + + test("sendSignal replays from cache when followed by a sleep step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-send-cache-replay-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: "no-one-listening", + data: null, + }); + // sleep forces a park; on resume the workflow replays and sendSignal + // hits the cached step attempt path. + await step.sleep("pause", "10ms"); + return result; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ workflowRunIds: [] }); + }); + + test("waitForSignal replays from cache when followed by a sleep step", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `wait-cache-replay-${randomUUID()}`; + + const workflow = client.defineWorkflow( + { name: `signal-wait-cache-replay-${randomUUID()}` }, + async ({ step }) => { + const data = await step.waitForSignal({ + signal: signalString, + timeout: 30_000, + }); + // sleep forces a park; on resume waitForSignal hits the cached path + await step.sleep("pause", "10ms"); + return data; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + await client.sendSignal({ signal: signalString, data: { cached: true } }); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("completed"); + const result = await handle.result(); + expect(result).toEqual({ data: { cached: true } }); + }); + + test("waitForSignal throws when another step already waits on the same signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + const signalString = `dup-signal-${randomUUID()}`; + + // first execution: create signal-wait step "waiter-a" on signalString, + // parks. Then we force a re-execution. Constructor loads running "waiter-a" + // → activeSignalWaitStepNameBySignal maps signalString → "waiter-a". + // Workflow code now calls waitForSignal with name "waiter-b" on the same + // signal → duplicate signal guard fires. + let firstExecution = true; + const workflow = client.defineWorkflow( + { name: `signal-dup-waiter-${randomUUID()}` }, + async ({ step }) => { + if (firstExecution) { + firstExecution = false; + await step.waitForSignal({ + name: "waiter-a", + signal: signalString, + timeout: 30_000, + }); + } else { + await step.waitForSignal({ + name: "waiter-b", + signal: signalString, + timeout: 30_000, + }); + } + return "done"; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + // first tick: creates waiter-a, parks with availableAt ~30s in the future. + await tickUntilParked(backend, worker, handle.workflowRun.id, 20, 50); + + // force re-execution by resetting availableAt to now via direct SQL. + const pg = ( + backend as unknown as { + pg: { unsafe: (q: string, p?: unknown[]) => Promise }; + } + ).pg; + await pg.unsafe( + `UPDATE "openworkflow"."workflow_runs" SET "available_at" = NOW() WHERE "id" = $1`, + [handle.workflowRun.id], + ); + + // second tick: workflow code calls waiter-b on same signal → fails + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + }); + + test("sendSignal step fails when backend.sendSignal throws", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + // spy on the backend to force an error during signal send + const sendSignalSpy = vi + .spyOn(backend, "sendSignal") + .mockRejectedValueOnce(new Error("signal delivery failed")); + + const workflow = client.defineWorkflow( + { name: `signal-send-error-${randomUUID()}` }, + async ({ step }) => { + const result = await step.sendSignal({ + signal: "will-fail", + data: null, + }); + return result; + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 20, + 50, + ); + expect(status).toBe("failed"); + sendSignalSpy.mockRestore(); + }); }); describe("executeWorkflow", () => { @@ -3258,6 +3759,85 @@ describe("executeWorkflow", () => { expect(snapshots[0]).toEqual(snapshots[1]); }); }); + + describe("signal step resume from running attempt", () => { + test("sendSignal resolves a pre-existing running signal-send step attempt", async () => { + const runningAttempt = createMockStepAttempt({ + id: "running-signal-send", + stepName: "notify", + kind: "signal-send", + status: "running", + output: null, + finishedAt: null, + }); + + const listStepAttempts = vi.fn(() => + Promise.resolve({ + data: [runningAttempt], + pagination: { next: null, prev: null }, + }), + ); + const sendSignalMock = vi.fn(() => + Promise.resolve({ workflowRunIds: ["run-abc"] }), + ); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + stepName: "notify", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const completeWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "completed", + output: params.output ?? null, + }), + ), + ); + + const workflowRun = createMockWorkflowRun({ + id: "signal-send-resume-run", + workerId: "worker-resume", + }); + + await executeWorkflow({ + backend: { + listStepAttempts, + sendSignal: sendSignalMock, + completeStepAttempt, + completeWorkflowRun, + } as unknown as Backend, + workflowRun, + workflowFn: async ({ step }) => { + const result = await step.sendSignal({ + signal: "notify", + data: { v: 1 }, + }); + return result; + }, + workflowVersion: null, + workerId: "worker-resume", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(sendSignalMock).toHaveBeenCalled(); + expect(completeStepAttempt).toHaveBeenCalledWith( + expect.objectContaining({ stepAttemptId: "running-signal-send" }), + ); + expect(completeWorkflowRun).toHaveBeenCalledWith( + expect.objectContaining({ + output: { workflowRunIds: ["run-abc"] }, + }), + ); + }); + }); }); describe("createStepExecutionStateFromAttempts", () => { diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index d825d737..04754c53 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -6,6 +6,7 @@ import { type SerializedError, } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; +import type { StandardSchemaV1 } from "../core/standard-schema.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { getCachedStepAttempt, @@ -14,6 +15,7 @@ import { calculateDateFromDuration, createSleepContext, createWorkflowContext, + createSignalWaitContext, } from "../core/step-attempt.js"; import { computeFailedWorkflowRunUpdate, @@ -26,6 +28,7 @@ import type { StepApi, StepFunction, StepFunctionConfig, + StepWaitTimeout, WorkflowFunction, WorkflowRunMetadata, } from "../core/workflow-function.js"; @@ -124,10 +127,11 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { }; /** - * Retry policy for workflow step failures (no retries - the child workflow - * is responsible for retries). + * No-retry policy for terminal/non-retryable steps (child-workflow results, + * signal sends, signal waits). The caller or child workflow is responsible + * for handling retries. */ -const WORKFLOW_STEP_FAILURE_RETRY_POLICY: RetryPolicy = { +const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, maximumAttempts: 1, }; @@ -237,16 +241,16 @@ export function createStepExecutionStateFromAttempts( } /** - * Resolve workflow timeout input to an absolute deadline. + * Resolve wait timeout input to an absolute deadline. * @param timeout - Relative/absolute timeout input * @returns Absolute timeout deadline * @throws {Error} When timeout is invalid */ -function resolveWorkflowTimeoutAt( +function resolveWaitTimeoutAt( timeout: number | string | Date | undefined, ): Date { if (timeout === undefined) { - return defaultWorkflowTimeoutAt(); + return defaultWaitTimeoutAt(); } if (timeout instanceof Date) { @@ -255,7 +259,7 @@ function resolveWorkflowTimeoutAt( if (typeof timeout === "number") { if (!Number.isFinite(timeout) || timeout < 0) { - throw new Error("Workflow timeout must be a non-negative number"); + throw new Error("Timeout must be a non-negative number"); } return new Date(Date.now() + timeout); } @@ -268,32 +272,37 @@ function resolveWorkflowTimeoutAt( } /** - * Default workflow timeout: 1 year from a base time. + * Default wait timeout: 1 year from a base time. * @param base - Base timestamp (defaults to now) * @returns Timeout deadline */ -function defaultWorkflowTimeoutAt(base: Readonly = new Date()): Date { +function defaultWaitTimeoutAt(base: Readonly = new Date()): Date { const timeoutAt = new Date(base); timeoutAt.setFullYear(timeoutAt.getFullYear() + 1); return timeoutAt; } /** - * Extract the workflow timeout from a persisted step attempt's context. - * @param attempt - Running workflow step attempt - * @returns Timeout deadline, or null when context is not workflow + * Extract the timeout from a persisted step attempt's context. + * Works for both workflow and signal-wait step types. + * @param attempt - Running step attempt + * @returns Timeout deadline, or null when context has no timeout */ -function getWorkflowTimeoutAt(attempt: Readonly): Date | null { - if (attempt.context?.kind !== "workflow") { +function getContextTimeoutAt(attempt: Readonly): Date | null { + if ( + attempt.context?.kind !== "workflow" && + attempt.context?.kind !== "signal-wait" + ) { return null; } - if (attempt.context.timeoutAt === null) { - // Backward compatibility for previously persisted workflow contexts. - return defaultWorkflowTimeoutAt(attempt.createdAt); + const { timeoutAt } = attempt.context; + if (timeoutAt === null) { + // backward compatibility for previously persisted workflow contexts + // (signal-wait timeoutAt is never null per SignalWaitStepAttemptContext). + return defaultWaitTimeoutAt(attempt.createdAt); } - - return new Date(attempt.context.timeoutAt); + return new Date(timeoutAt); } /** @@ -306,7 +315,7 @@ function hasWorkflowTimedOut( attempt: Readonly, childRun: Readonly, ): boolean { - const timeoutAt = getWorkflowTimeoutAt(attempt); + const timeoutAt = getContextTimeoutAt(attempt); if (!timeoutAt) return false; const timeoutMs = timeoutAt.getTime(); @@ -337,19 +346,15 @@ function getRunningWaitAttemptResumeAt( return Number.isFinite(resumeAt.getTime()) ? resumeAt : null; } - if (attempt.kind !== "workflow") { + if (attempt.kind !== "signal-wait" && attempt.kind !== "workflow") { return null; } const timeoutAt = - getWorkflowTimeoutAt(attempt) ?? - defaultWorkflowTimeoutAt(attempt.createdAt); - if (Number.isFinite(timeoutAt.getTime())) { - return timeoutAt; - } - - // Backward compatibility for malformed historical workflow timeout values. - return defaultWorkflowTimeoutAt(attempt.createdAt); + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); + return Number.isFinite(timeoutAt.getTime()) + ? timeoutAt + : defaultWaitTimeoutAt(attempt.createdAt); } /** @@ -463,6 +468,19 @@ function buildWorkflowIdempotencyKey(attempt: Readonly): string { return `__workflow:${attempt.namespaceId}:${attempt.id}`; } +/** + * Build deterministic idempotency key for signal send invocation. + * @param workflowRunId - Workflow run id + * @param stepName - Step name + * @returns Stable idempotency key + */ +function buildSignalIdempotencyKey( + workflowRunId: string, + stepName: string, +): string { + return `__signal:${workflowRunId}:${stepName}`; +} + /** * Configures the options for a StepExecutor. */ @@ -713,7 +731,7 @@ class StepExecutor implements StepApi { throw new StepError({ stepName, stepFailedAttempts: this.failedCountsByStepName.get(stepName) ?? 1, - retryPolicy: WORKFLOW_STEP_FAILURE_RETRY_POLICY, + retryPolicy: TERMINAL_STEP_RETRY_POLICY, error: failedError, }); } @@ -729,7 +747,7 @@ class StepExecutor implements StepApi { } // First encounter — create the workflow step and child workflow run - const timeoutAt = resolveWorkflowTimeoutAt(request.timeout); + const timeoutAt = resolveWaitTimeoutAt(request.timeout); this.assertExecutionActive(); this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ @@ -784,7 +802,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -798,7 +816,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run "${childId}"`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -808,7 +826,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, new Error("Timed out waiting for child workflow to complete"), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -835,7 +853,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, childError, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -847,16 +865,16 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } // Child still running — sleep until timeout - const timeoutAt = getWorkflowTimeoutAt(workflowAttempt); + const timeoutAt = getContextTimeoutAt(workflowAttempt); const resumeAt = timeoutAt && Number.isFinite(timeoutAt.getTime()) ? timeoutAt - : defaultWorkflowTimeoutAt(workflowAttempt.createdAt); + : defaultWaitTimeoutAt(workflowAttempt.createdAt); throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); } @@ -973,10 +991,201 @@ class StepExecutor implements StepApi { stepName, stepAttemptId, error, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } + // ---- step.sendSignal ---------------------------------------------------- + + async sendSignal( + options: Readonly<{ + name?: string; + signal: string; + data?: JsonValue; + }>, + ): Promise<{ workflowRunIds: string[] }> { + const stepName = this.resolveStepName(options.name ?? options.signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as { workflowRunIds: string[] }; + } + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-send") { + return await this.resolveSignalSend(stepName, runningAttempt, options); + } + + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-send", + config: {}, + context: null, + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + return await this.resolveSignalSend(stepName, attempt, options); + } + + private async resolveSignalSend( + stepName: string, + attempt: Readonly, + options: Readonly<{ signal: string; data?: JsonValue }>, + ): Promise<{ workflowRunIds: string[] }> { + try { + const result = await this.backend.sendSignal({ + signal: options.signal, + data: options.data ?? null, + idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName), + }); + + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output: { ...result }, + }); + this.cache = addToStepAttemptCache(this.cache, completed); + this.runningByStepName.delete(stepName); + return completed.output as { workflowRunIds: string[] }; + } catch (error) { + return await this.failStepWithError( + stepName, + attempt.id, + error, + TERMINAL_STEP_RETRY_POLICY, + ); + } + } + + // ---- step.waitForSignal ------------------------------------------------ + + async waitForSignal( + options: Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ): Promise<{ data: Output } | null> { + const stepName = this.resolveStepName(options.name ?? options.signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as { data: Output } | null; + } + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-wait") { + return await this.resolveSignalWait( + stepName, + runningAttempt, + options, + ); + } + + for (const [name, a] of this.runningByStepName) { + if ( + name !== stepName && + a.kind === "signal-wait" && + a.context?.kind === "signal-wait" && + a.context.signal === options.signal + ) { + throw new Error( + `Signal "${options.signal}" is already being waited on by step "${name}"`, + ); + } + } + + const timeoutAt = resolveWaitTimeoutAt(options.timeout); + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-wait", + config: {}, + context: createSignalWaitContext(options.signal, timeoutAt), + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + return await this.resolveSignalWait(stepName, attempt, options); + } + + private async resolveSignalWait( + stepName: string, + attempt: Readonly, + options: Readonly<{ + schema?: StandardSchemaV1; + }>, + ): Promise<{ data: Output } | null> { + const signalData = await this.backend.getSignalDelivery({ + stepAttemptId: attempt.id, + }); + + if (signalData !== undefined) { + let outputValue: unknown = signalData; + + if (options.schema) { + const validationResult = await validateInput( + options.schema, + signalData, + ); + if (!validationResult.success) { + return await this.failStepWithError( + stepName, + attempt.id, + new Error( + `Signal schema validation failed: ${validationResult.error}`, + ), + TERMINAL_STEP_RETRY_POLICY, + ); + } + outputValue = validationResult.value; + } + + return await this.completeSignalWaitStep(attempt, { + data: normalizeStepOutput(outputValue) as Output, + }); + } + + const timeoutAt = + getContextTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); + if (Date.now() >= timeoutAt.getTime()) { + return await this.completeSignalWaitStep(attempt, null); + } + + throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(timeoutAt)); + } + + /** + * Complete a signal-wait step attempt and update internal maps. + * @param attempt - Step attempt being completed + * @param output - Envelope with data, or null for timeout + * @returns The completed step output + */ + private async completeSignalWaitStep( + attempt: Readonly, + output: { data: Output } | null, + ): Promise<{ data: Output } | null> { + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output, + }); + this.cache = addToStepAttemptCache(this.cache, completed); + this.runningByStepName.delete(attempt.stepName); + return completed.output as { data: Output } | null; + } + private ensureStepLimitNotReached(): void { if (this.stepCount >= this.stepLimit) { throw new StepLimitExceededError(this.stepLimit, this.stepCount);