diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index 80d55b91..1bb3c811 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -51,6 +51,7 @@ "vscode-debugger", "worker-specific-task-queues", "worker-versioning", + "workflow-streams", "message-passing/execute-update", "message-passing/introduction", "message-passing/safe-message-handlers", diff --git a/workflow-streams/.eslintignore b/workflow-streams/.eslintignore new file mode 100644 index 00000000..699794d0 --- /dev/null +++ b/workflow-streams/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js diff --git a/workflow-streams/.eslintrc.js b/workflow-streams/.eslintrc.js new file mode 100644 index 00000000..b35cad39 --- /dev/null +++ b/workflow-streams/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts', 'src/llm-workflows.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/workflow-streams/.gitignore b/workflow-streams/.gitignore new file mode 100644 index 00000000..3063f07d --- /dev/null +++ b/workflow-streams/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules diff --git a/workflow-streams/.npmrc b/workflow-streams/.npmrc new file mode 100644 index 00000000..43c97e71 --- /dev/null +++ b/workflow-streams/.npmrc @@ -0,0 +1 @@ +package-lock=false diff --git a/workflow-streams/.nvmrc b/workflow-streams/.nvmrc new file mode 100644 index 00000000..2bd5a0a9 --- /dev/null +++ b/workflow-streams/.nvmrc @@ -0,0 +1 @@ +22 diff --git a/workflow-streams/.prettierignore b/workflow-streams/.prettierignore new file mode 100644 index 00000000..a65b4177 --- /dev/null +++ b/workflow-streams/.prettierignore @@ -0,0 +1 @@ +lib diff --git a/workflow-streams/.prettierrc b/workflow-streams/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/workflow-streams/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/workflow-streams/README.md b/workflow-streams/README.md new file mode 100644 index 00000000..3e595d36 --- /dev/null +++ b/workflow-streams/README.md @@ -0,0 +1,120 @@ +# Workflow Streams + +> **Experimental.** These samples use `@temporalio/workflow-streams`, which is +> currently distributed on the `contrib/pubsub` branch of `sdk-typescript`. The +> module is experimental and its API may change. + +`@temporalio/workflow-streams` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, web backends) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, continue-as-new +hand-off — into a reusable stream. + +The package has no root entrypoint; import from the two subpaths instead: + +- `@temporalio/workflow-streams/workflow` — workflow-safe surface + (`WorkflowStream`, `WorkflowStreamState`, …). Safe to bundle into workflow + code. +- `@temporalio/workflow-streams/client` — client surface + (`WorkflowStreamClient`, …). Pulls in `crypto` and `@temporalio/client`; do + not import from a workflow file. + +This sample has five scenarios. The first four share one worker; the fifth +has its own worker because it needs the `openai` package and an +`OPENAI_API_KEY`. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** + +- `src/workflows.ts` (`order`) — a workflow that hosts a `WorkflowStream` and + publishes status events as it processes an order. +- `src/activities.ts` (`chargeCard`) — an activity that publishes intermediate + progress to the stream via `WorkflowStreamClient.fromActivity()`. +- `src/run-publisher.ts` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +- `src/workflows.ts` (`pipeline`) — a multi-stage pipeline that publishes + stage transitions over ~10 seconds, leaving room for a consumer to + disconnect and reconnect mid-run. +- `src/run-reconnecting-subscriber.ts` — connects, reads a couple of events, + "disconnects," then reopens a fresh client and resumes via + `subscribe(fromOffset)`. This is the central Workflow Streams use case: a + consumer can disappear (page refresh, server restart, laptop closed) and + resume later without missing events or seeing duplicates. + +**Scenario 3 — external (non-Activity) publisher:** + +- `src/workflows.ts` (`hub`) — a passive workflow that does no work of its + own; it exists only to host a `WorkflowStream` and shut down when signaled. +- `src/run-external-publisher.ts` — starts the hub, then publishes events + into it from a plain Node async function using + `WorkflowStreamClient.create(client, workflowId)`. A subscriber runs + alongside; when the publisher is done it emits a sentinel event and signals + `close`. The shape that fits a backend service or scheduled job pushing + events into a workflow it didn't itself start. + +**Scenario 4 — bounded log via `truncate()`:** + +- `src/workflows.ts` (`ticker`) — a long-running workflow that publishes + events at a fixed cadence and calls `stream.truncate(...)` periodically to + bound log growth, keeping only the most recent N entries. +- `src/run-truncating-ticker.ts` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every offset in + order; the slow one falls behind a truncation and silently jumps forward to + the new base offset. The output makes the trade visible: bounded log size + in exchange for intermediate events being invisible to slow consumers. + +**Scenario 5 — LLM streaming:** + +- `src/llm-workflows.ts` (`llmStreaming`) — hosts a `WorkflowStream` and runs + `streamCompletion` as a single activity. The workflow itself does no + streaming; the activity owns the non-deterministic OpenAI call. +- `src/llm-activities.ts` (`streamCompletion`) — calls + `openai.chat.completions.create({ stream: true })`, publishes each token + chunk on the `delta` topic, the final accumulated text on `complete`, and a + `RetryEvent` on `retry` when running on attempt > 1. +- `src/run-llm.ts` — subscribes to all three topics, renders deltas to the + terminal as they arrive, and on a `retry` event uses ANSI escapes to rewind + the printed output before the retried attempt re-publishes. + +Scenario 5 runs on its own worker (`src/llm-worker.ts`, on +`workflow-stream-llm-task-queue`) because it needs the `openai` dependency +and an `OPENAI_API_KEY`, and because killing this worker mid-stream is the +easiest way to demonstrate retry handling without disrupting the other four +scenarios. + +## Run it + +1. `temporal server start-dev` to start [Temporal + Server](https://github.com/temporalio/cli/#installation). +2. `npm install` to install dependencies. + +For scenarios 1–4, start the shared worker: + +```bash +npm run start +``` + +For scenario 5, export the API key and start the LLM worker: + +```bash +export OPENAI_API_KEY=... +npm run start.llm +``` + +Then in another terminal, pick a scenario: + +```bash +npm run workflow.publisher # scenario 1 +npm run workflow.reconnecting # scenario 2 +npm run workflow.external # scenario 3 +npm run workflow.ticker # scenario 4 +npm run workflow.llm # scenario 5 +``` + +To exercise scenario 5's retry path, kill the LLM worker (Ctrl-C) while +output is streaming and start it again. The activity's next attempt sends a +retry event first; the consumer clears its on-screen output via ANSI escapes +and re-renders from scratch. diff --git a/workflow-streams/package.json b/workflow-streams/package.json new file mode 100644 index 00000000..e333535a --- /dev/null +++ b/workflow-streams/package.json @@ -0,0 +1,53 @@ +{ + "name": "temporal-workflow-streams", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "format": "prettier --write .", + "format:check": "prettier --check .", + "lint": "eslint .", + "start": "ts-node src/worker.ts", + "start.watch": "nodemon src/worker.ts", + "start.llm": "ts-node src/llm-worker.ts", + "start.llm.watch": "nodemon src/llm-worker.ts", + "workflow.publisher": "ts-node src/run-publisher.ts", + "workflow.reconnecting": "ts-node src/run-reconnecting-subscriber.ts", + "workflow.external": "ts-node src/run-external-publisher.ts", + "workflow.ticker": "ts-node src/run-truncating-ticker.ts", + "workflow.llm": "ts-node src/run-llm.ts" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.17.2", + "@temporalio/client": "^1.17.2", + "@temporalio/common": "^1.17.2", + "@temporalio/worker": "^1.17.2", + "@temporalio/workflow": "^1.17.2", + "@temporalio/workflow-streams": "file:../../sdk-typescript/contrib/workflow-streams", + "nanoid": "3.x", + "openai": "^4.77.0" + }, + "devDependencies": { + "@tsconfig/node22": "^22.0.0", + "@types/node": "^22.9.1", + "@typescript-eslint/eslint-plugin": "^8.18.0", + "@typescript-eslint/parser": "^8.18.0", + "eslint": "^8.57.1", + "eslint-config-prettier": "^9.1.0", + "eslint-plugin-deprecation": "^3.0.0", + "nodemon": "^3.1.7", + "prettier": "^3.4.2", + "ts-node": "^10.9.2", + "typescript": "^5.6.3" + } +} diff --git a/workflow-streams/src/activities.ts b/workflow-streams/src/activities.ts new file mode 100644 index 00000000..c695847d --- /dev/null +++ b/workflow-streams/src/activities.ts @@ -0,0 +1,20 @@ +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { TOPIC_PROGRESS, type ProgressEvent } from './shared'; + +/** + * Pretend to charge a card, publishing progress to the parent workflow. + * + * `WorkflowStreamClient.fromActivity()` reads the parent workflow id and the + * Temporal client from the activity context, so this activity can push events + * back without any wiring. + */ +export async function chargeCard(orderId: string): Promise { + await using client = WorkflowStreamClient.fromActivity({ batchInterval: '200 milliseconds' }); + const progress = client.topic(TOPIC_PROGRESS); + + progress.publish({ message: 'charging card...' }); + await new Promise((resolve) => setTimeout(resolve, 1000)); + progress.publish({ message: 'card charged' }); + + return `charge-${orderId}`; +} diff --git a/workflow-streams/src/llm-activities.ts b/workflow-streams/src/llm-activities.ts new file mode 100644 index 00000000..cdf0a8df --- /dev/null +++ b/workflow-streams/src/llm-activities.ts @@ -0,0 +1,62 @@ +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import OpenAI from 'openai'; +import { + DEFAULT_LLM_MODEL, + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + type LLMInput, + type RetryEvent, + type TextComplete, + type TextDelta, +} from './llm-shared'; + +/** + * Stream an LLM completion to the parent workflow's stream. + * + * Activity-as-publisher: each delta from the OpenAI streaming API is pushed to + * the workflow's stream as a `TextDelta` event on the `delta` topic. The + * accumulated full text returns as the activity's result and is also published + * on the `complete` topic as a terminator. On retry attempts + * (`Context.current().info.attempt > 1`) a `RetryEvent` lands on the `retry` + * topic before the new attempt's deltas, so consumers can reset their + * accumulated state instead of concatenating the failed attempt's partial + * output with the retried attempt's full output. + * + * No `forceFlush: true`: the 200ms `batchInterval` is fast enough for an + * interactive feel, and the WorkflowStreamClient's asyncDispose flushes + * cleanly on scope exit. + */ +export async function streamCompletion(input: LLMInput): Promise { + await using client = WorkflowStreamClient.fromActivity({ batchInterval: '200 milliseconds' }); + // Disable provider-side retries; let Temporal own retry policy at the + // activity layer. + const openai = new OpenAI({ maxRetries: 0 }); + + const deltas = client.topic(TOPIC_DELTA); + const complete = client.topic(TOPIC_COMPLETE); + const retry = client.topic(TOPIC_RETRY); + + const attempt = Context.current().info.attempt; + if (attempt > 1) { + retry.publish({ attempt }); + } + + const full: string[] = []; + const stream = await openai.chat.completions.create({ + model: input.model ?? DEFAULT_LLM_MODEL, + messages: [{ role: 'user', content: input.prompt }], + stream: true, + }); + for await (const chunk of stream) { + const text = chunk.choices[0]?.delta?.content; + if (!text) continue; + deltas.publish({ text }); + full.push(text); + } + + const fullText = full.join(''); + complete.publish({ fullText }); + return fullText; +} diff --git a/workflow-streams/src/llm-shared.ts b/workflow-streams/src/llm-shared.ts new file mode 100644 index 00000000..b1443b79 --- /dev/null +++ b/workflow-streams/src/llm-shared.ts @@ -0,0 +1,38 @@ +/** + * Types and constants for the LLM-streaming scenario. + * + * Kept separate from `shared.ts` because the other scenarios don't use these, + * and this scenario runs on its own worker and task queue so the `openai` + * dependency stays out of everyone else's path. + */ + +import type { WorkflowStreamState } from '@temporalio/workflow-streams/workflow'; + +// Scenario 5 runs on its own worker so the openai dependency only matters +// for that scenario. +export const LLM_TASK_QUEUE = 'workflow-stream-llm-task-queue'; + +// Topics published by the activity. +export const TOPIC_DELTA = 'delta'; +export const TOPIC_COMPLETE = 'complete'; +export const TOPIC_RETRY = 'retry'; + +export const DEFAULT_LLM_MODEL = 'gpt-5-mini'; + +export interface LLMInput { + prompt: string; + model?: string; + streamState?: WorkflowStreamState; +} + +export interface TextDelta { + text: string; +} + +export interface TextComplete { + fullText: string; +} + +export interface RetryEvent { + attempt: number; +} diff --git a/workflow-streams/src/llm-worker.ts b/workflow-streams/src/llm-worker.ts new file mode 100644 index 00000000..d9f34cb0 --- /dev/null +++ b/workflow-streams/src/llm-worker.ts @@ -0,0 +1,38 @@ +/** + * Worker for the LLM-streaming scenario. + * + * Runs separately from `worker.ts` so the `openai` dependency and the + * `OPENAI_API_KEY` requirement stay isolated to this one scenario. Different + * task queue too — the other four samples won't route work to this worker. + * + * Kill this worker mid-stream while `run-llm.ts` is running to trigger a + * retry: Temporal restarts the activity on the next worker to come up, the + * activity publishes a retry event on its second attempt, and the consumer + * resets its rendered output. + */ +import { NativeConnection, Worker } from '@temporalio/worker'; +import * as activities from './llm-activities'; +import { LLM_TASK_QUEUE } from './llm-shared'; + +async function run() { + const connection = await NativeConnection.connect({ + address: 'localhost:7233', + }); + try { + const worker = await Worker.create({ + connection, + namespace: 'default', + taskQueue: LLM_TASK_QUEUE, + workflowsPath: require.resolve('./llm-workflows'), + activities, + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/llm-workflows.ts b/workflow-streams/src/llm-workflows.ts new file mode 100644 index 00000000..a018b75e --- /dev/null +++ b/workflow-streams/src/llm-workflows.ts @@ -0,0 +1,38 @@ +import { proxyActivities, sleep } from '@temporalio/workflow'; +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; +import type * as activities from './llm-activities'; +import { type LLMInput } from './llm-shared'; + +const { streamCompletion } = proxyActivities({ + startToCloseTimeout: '5 minutes', + retry: { + maximumAttempts: 3, + }, +}); + +/** + * Wrapper for an LLM-streaming activity. + * + * The workflow does no streaming of its own; it hosts the `WorkflowStream` so + * external subscribers can attach by workflow id, kicks off the streaming + * activity, and returns the full text the activity produced. + * + * Streaming is delegated to the activity because the OpenAI call is + * non-deterministic. If the activity fails partway through, Temporal retries + * it (up to `maximumAttempts`); the retried attempt re-publishes from the + * start, so the consumer must reset on the activity's retry event. See + * `llm-activities.ts` and `run-llm.ts`. + */ +export async function llmStreaming(input: LLMInput): Promise { + // Construct the stream as the first statement of the workflow function so + // the publish-signal handler is registered before any external publisher + // (the activity, here) tries to publish. + new WorkflowStream(input.streamState); + + const result = await streamCompletion(input); + // Hold the run open briefly so the consumer's next poll delivers the + // activity's terminal `complete` event before the workflow exits and the + // in-memory log is gone. + await sleep('500 milliseconds'); + return result; +} diff --git a/workflow-streams/src/run-external-publisher.ts b/workflow-streams/src/run-external-publisher.ts new file mode 100644 index 00000000..bf45bd07 --- /dev/null +++ b/workflow-streams/src/run-external-publisher.ts @@ -0,0 +1,85 @@ +/** + * External publisher: a non-Activity process pushes events into a workflow. + * + * The two earlier scenarios publish from inside the workflow itself + * (`order`, `pipeline`) or from an Activity it runs (`chargeCard`). This + * scenario shows the third shape: a backend service, scheduled job, or + * anything else with a Temporal `Client` publishing into a *running* workflow + * it didn't start. Same factory as the subscribe path — + * `WorkflowStreamClient.create` — used for publishing instead. + * + * The script starts a `hub` workflow (which does no work of its own — it + * exists only to host the stream), then runs a publisher and a subscriber + * concurrently. When the publisher is done it signals `close`, the workflow's + * run finishes, and the subscriber's iterator exits normally. + */ +import { Client, Connection } from '@temporalio/client'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { nanoid } from 'nanoid'; +import { TASK_QUEUE, TOPIC_NEWS, type NewsEvent } from './shared'; +import { closeSignal, hub } from './workflows'; + +const HEADLINES = ['rates held', 'merger announced', 'outage resolved', 'earnings beat', 'regulator opens probe']; + +// In-band terminator the publisher emits before signaling close. The +// subscriber recognizes this value and stops polling — without an explicit +// terminator the consumer would have to rely on the workflow returning to +// break the iterator, which means racing the last item delivery against +// workflow completion. +const DONE_HEADLINE = '__done__'; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function run() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + const workflowId = `workflow-stream-hub-${nanoid(8)}`; + const handle = await client.workflow.start(hub, { + args: [{ hubId: workflowId }], + taskQueue: TASK_QUEUE, + workflowId, + }); + + async function publishNews(): Promise { + // `WorkflowStreamClient.create` takes a Temporal client and a workflow id + // — the same factory used elsewhere for subscribing. `await using` + // batches publishes and flushes on scope exit; we additionally call + // `flush()` before signaling close so we know the events landed before + // the workflow shuts down. + await using producer = WorkflowStreamClient.create(client, workflowId); + const news = producer.topic(TOPIC_NEWS); + for (const headline of HEADLINES) { + news.publish({ headline }); + console.log(`[publisher] sent: ${headline}`); + await sleep(500); + } + news.publish({ headline: DONE_HEADLINE }, { forceFlush: true }); + await producer.flush(); + // Tell the hub it can stop. The subscriber has already broken out of its + // for-await loop on the sentinel above. + await handle.signal(closeSignal); + console.log('[publisher] signaled close'); + } + + async function consumeNews(): Promise { + const consumer = WorkflowStreamClient.create(client, workflowId); + for await (const item of consumer.topic(TOPIC_NEWS).subscribe(0)) { + if (item.data.headline === DONE_HEADLINE) return; + console.log(`[subscriber] offset=${item.offset}: ${item.data.headline}`); + } + } + + await Promise.all([publishNews(), consumeNews()]); + + const result = await handle.result(); + console.log(`\nworkflow result: ${result}`); + await connection.close(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/run-llm.ts b/workflow-streams/src/run-llm.ts new file mode 100644 index 00000000..485f5ebf --- /dev/null +++ b/workflow-streams/src/run-llm.ts @@ -0,0 +1,112 @@ +/** + * Stream LLM output to the terminal, handling retries. + * + * Starts an `llmStreaming` workflow, subscribes to its delta / complete / + * retry topics, and renders the model's output to stdout as it arrives. On a + * retry event (the activity is on attempt > 1), the consumer rewinds its + * rendered output with ANSI escapes and starts fresh — so a killed worker + * doesn't leave a half-finished response stuck on screen followed by the + * retried attempt's full output. + * + * Requires `OPENAI_API_KEY` in the environment. + * + * Run the LLM worker first (`npm run start.llm`), then `npm run workflow.llm`. + * + * To see retry handling in action, kill the LLM worker mid-stream (Ctrl-C in + * its terminal) and start it again. The consumer will clear its accumulated + * output on the retry event and re-render the retried attempt's output from + * scratch. + */ +import { Client, Connection } from '@temporalio/client'; +import { defaultPayloadConverter } from '@temporalio/common'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { nanoid } from 'nanoid'; +import { + DEFAULT_LLM_MODEL, + LLM_TASK_QUEUE, + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + type RetryEvent, + type TextComplete, + type TextDelta, +} from './llm-shared'; +import { llmStreaming } from './llm-workflows'; + +// Long enough that you can comfortably kill the worker mid-stream and watch +// the retry render. Adjust to taste. +const DEFAULT_PROMPT = + 'Write a 500-word comparison of Paxos, Raft, and Viewstamped Replication for ' + + 'a new distributed-systems engineer. Cover the core ideas, leader election, ' + + 'normal-case operation, reconfiguration, and the practical tradeoffs that ' + + 'show up when implementing each. Use short paragraphs.'; + +// ANSI cursor save / restore. `\x1b[s` saves the current cursor position, +// `\x1b[u` restores it, `\x1b[J` clears from the cursor to the end of the +// screen. Save once before the first delta, and on retry restore + clear-to- +// end so the failed attempt's rendered output disappears regardless of how it +// was wrapped by the terminal. Save again afterwards so a second retry can +// rewind to the same point. +const ANSI_SAVE = '\x1b[s'; +const ANSI_RESTORE_AND_CLEAR = '\x1b[u\x1b[J'; + +async function run() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + const workflowId = `workflow-stream-llm-${nanoid(8)}`; + const input = { prompt: DEFAULT_PROMPT, model: DEFAULT_LLM_MODEL }; + const handle = await client.workflow.start(llmStreaming, { + args: [input], + taskQueue: LLM_TASK_QUEUE, + workflowId, + }); + + // Print a header so the user sees something immediately. The response will + // start streaming below it once the first delta arrives — until then this + // is the only line on screen. + console.log(`[llm ${workflowId}] streaming response from ${input.model}, awaiting first token...`); + console.log(); + process.stdout.write(ANSI_SAVE); + + const stream = WorkflowStreamClient.create(client, workflowId); + + // Subscribe without `resultType` so `item.data` is the raw `Payload`, then + // dispatch on `item.topic` and decode against the right type per topic. + // The loop ends either on the `complete` terminator (break) or because the + // iterator naturally exhausts when the workflow reaches a terminal state + // without one (activity exhausted retries, etc.). Either way the + // `handle.result()` below either returns the full text or throws the + // workflow's failure. + let done = false; + for await (const item of stream.subscribe([TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE])) { + if (item.topic === TOPIC_RETRY) { + const evt = defaultPayloadConverter.fromPayload(item.data); + process.stdout.write(ANSI_RESTORE_AND_CLEAR); + console.log(`[retry attempt ${evt.attempt}] resetting output`); + console.log(); + process.stdout.write(ANSI_SAVE); + } else if (item.topic === TOPIC_DELTA) { + const delta = defaultPayloadConverter.fromPayload(item.data); + process.stdout.write(delta.text); + } else if (item.topic === TOPIC_COMPLETE) { + // The full text is also in the payload (and returned by the workflow), + // but the consumer has already rendered it incrementally. Just + // terminate the line. + defaultPayloadConverter.fromPayload(item.data); + console.log(); + done = true; + break; + } + } + + const result = await handle.result(); + if (!done) console.log(); + console.log(`[workflow result: ${result.length} chars]`); + await connection.close(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/run-publisher.ts b/workflow-streams/src/run-publisher.ts new file mode 100644 index 00000000..6f886331 --- /dev/null +++ b/workflow-streams/src/run-publisher.ts @@ -0,0 +1,47 @@ +import { Client, Connection } from '@temporalio/client'; +import { defaultPayloadConverter } from '@temporalio/common'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { nanoid } from 'nanoid'; +import { TASK_QUEUE, TOPIC_PROGRESS, TOPIC_STATUS, type ProgressEvent, type StatusEvent } from './shared'; +import { order } from './workflows'; + +async function run() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + const workflowId = `workflow-stream-order-${nanoid(8)}`; + const handle = await client.workflow.start(order, { + args: [{ orderId: 'order-1' }], + taskQueue: TASK_QUEUE, + workflowId, + }); + + const stream = WorkflowStreamClient.create(client, workflowId); + + // Single iterator over both topics — avoids a cancellation race between two + // concurrent subscribers. Without `resultType`, `item.data` is the + // underlying `Payload`, which we decode at the call site so we can dispatch + // heterogeneous events on `item.topic`. The loop ends either on the in-band + // `complete` terminator (break) or because the iterator exhausts when the + // workflow reaches a terminal state without one (e.g. on failure). Either + // way we then await `handle.result()`, which raises if the workflow failed. + for await (const item of stream.subscribe([TOPIC_STATUS, TOPIC_PROGRESS])) { + if (item.topic === TOPIC_STATUS) { + const evt = defaultPayloadConverter.fromPayload(item.data); + console.log(`[status] ${evt.kind}: order=${evt.orderId}`); + if (evt.kind === 'complete') break; + } else if (item.topic === TOPIC_PROGRESS) { + const evt = defaultPayloadConverter.fromPayload(item.data); + console.log(`[progress] ${evt.message}`); + } + } + + const result = await handle.result(); + console.log(`workflow result: ${result}`); + await connection.close(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/run-reconnecting-subscriber.ts b/workflow-streams/src/run-reconnecting-subscriber.ts new file mode 100644 index 00000000..6d568d17 --- /dev/null +++ b/workflow-streams/src/run-reconnecting-subscriber.ts @@ -0,0 +1,140 @@ +/** + * Reconnecting subscriber: read a few events, disconnect, resume. + * + * Demonstrates the central Workflow Streams use case: a consumer can + * disappear mid-stream — page refresh, server restart, laptop closed — and + * resume later without missing events or seeing duplicates. The event log + * lives in the Workflow, so the consumer just remembers where it stopped. + * + * The script runs the pattern in two phases inside one process to keep the + * demo short. The same code shape works across actual process restarts + * because the resume offset is durable in the workflow, not in the consumer. + * + * Output is one line per emit, with current stream stats in a left column and + * a phase / event message in a right column. A background poller calls + * `WorkflowStreamClient.getOffset()` for the whole demo and emits a heartbeat + * line once a second so you can watch `pend` (`available - processed`) grow + * while the consumer is disconnected and shrink as phase 2 catches up. + */ +import { Client, Connection } from '@temporalio/client'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { nanoid } from 'nanoid'; +import { TASK_QUEUE, TOPIC_STATUS, type StageEvent } from './shared'; +import { pipeline } from './workflows'; + +// Number of events read in phase 1 before simulating a disconnect. Picked +// small enough that the workflow is still running after. +const PHASE_1_EVENTS = 2; + +// How long to stay disconnected. +const DISCONNECT_MS = 3000; + +// Background poller cadence. The poller refreshes `available` this often and +// emits a heartbeat line once per HEARTBEAT_MS. +const POLL_INTERVAL_MS = 250; +const HEARTBEAT_MS = 1000; + +// Width of the stats column. Picked to fit the longest stats string. +const LEFT_WIDTH = 30; + +interface State { + processed: number; + available: number; +} + +const pending = (s: State): number => Math.max(0, s.available - s.processed); + +function emit(state: State, message: string): void { + const left = `proc=${String(state.processed).padStart(2)} avail=${String(state.available).padStart(2)} pend=${String(pending(state)).padStart(2)}`; + console.log(`${left.padEnd(LEFT_WIDTH)}│ ${message}`); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function run() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + const workflowId = `workflow-stream-pipeline-${nanoid(8)}`; + const handle = await client.workflow.start(pipeline, { + args: [{ pipelineId: workflowId }], + taskQueue: TASK_QUEUE, + workflowId, + }); + + // In a production web backend the resume offset would live in durable + // storage keyed by (user_id, run_id) — a database row, a Redis key, etc. + // For an in-process demo a `state.processed` field works the same way. + const state: State = { processed: 0, available: 0 }; + const stream = WorkflowStreamClient.create(client, workflowId); + emit(state, `started ${workflowId}`); + + let stopped = false; + const pollerDone = (async () => { + let lastEmit = Date.now(); + while (!stopped) { + try { + state.available = await stream.getOffset(); + } catch { + // ignore + } + const now = Date.now(); + if (now - lastEmit >= HEARTBEAT_MS) { + emit(state, '·'); + lastEmit = now; + } + await sleep(POLL_INTERVAL_MS); + } + })(); + + try { + // ---- Phase 1: connect, read a couple of events, "disconnect". + emit(state, '[phase 1] connecting'); + let seen = 0; + for await (const item of stream.topic(TOPIC_STATUS).subscribe(0)) { + // Remember *one past* the offset just consumed: on resume we want the + // next unseen event, not the one we already showed. + state.processed = item.offset + 1; + emit(state, ` offset=${String(item.offset).padStart(2)} stage=${item.data.stage}`); + seen += 1; + if (seen >= PHASE_1_EVENTS) break; + } + emit(state, '[phase 1] disconnecting'); + + // ---- Disconnect window: nobody reads. The workflow keeps publishing — + // `pend` grows on the heartbeat lines as the offset advances past + // `processed`. + await sleep(DISCONNECT_MS); + + // ---- Phase 2: brand-new client + stream, resume from saved offset. + // Same shape as a different process picking up where the first one left + // off. + emit(state, '[phase 2] reconnecting'); + const connection2 = await Connection.connect({ address: 'localhost:7233' }); + const client2 = new Client({ connection: connection2 }); + const stream2 = WorkflowStreamClient.create(client2, workflowId); + try { + for await (const item of stream2.topic(TOPIC_STATUS).subscribe(state.processed)) { + state.processed = item.offset + 1; + emit(state, ` offset=${String(item.offset).padStart(2)} stage=${item.data.stage}`); + if (item.data.stage === 'complete') break; + } + } finally { + await connection2.close(); + } + + const result = await handle.result(); + emit(state, `workflow result: ${result}`); + } finally { + stopped = true; + await pollerDone; + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/run-truncating-ticker.ts b/workflow-streams/src/run-truncating-ticker.ts new file mode 100644 index 00000000..7a301160 --- /dev/null +++ b/workflow-streams/src/run-truncating-ticker.ts @@ -0,0 +1,113 @@ +/** + * Truncating ticker: bounded log + slow vs. fast subscribers. + * + * The `ticker` workflow publishes `count` events at a fixed interval, calling + * `stream.truncate(...)` periodically to bound log growth. This script + * subscribes twice — once fast, once slow — and prints them in two lanes so + * the trade is visible at a glance: + * + * - **Fast lane** (left). Keeps up. Sees every published offset. + * - **Slow lane** (right). Sleeps between iterations. When a truncation has + * dropped its position by the time it polls again, the iterator silently + * jumps forward to the new base offset; the slow lane prints a + * `↪ jumped N → M (K dropped)` marker for each gap and resumes at the new + * offset. + * + * `truncate()` is unilateral: the workflow does not know who is subscribed + * and does not wait for them. The implicit alternative — never truncating — + * keeps every event around forever, lets slow consumers eventually catch up + * without losses, and pays for it in unbounded workflow history. The + * truncation model is the opposite trade: bounded log, at-best-effort + * delivery to slow consumers, no backpressure on the publisher. Pair it with + * set-semantic events where each event carries enough state to make missing + * the prior ones recoverable. + */ +import { Client, Connection } from '@temporalio/client'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { nanoid } from 'nanoid'; +import { TASK_QUEUE, TOPIC_TICK, type TickEvent } from './shared'; +import { ticker } from './workflows'; + +// Aggressive truncation so the log stays at most KEEP_LAST entries right +// after each truncation, which keeps the slow subscriber's per-poll batch +// tiny. Small batches + a slow per-event sleep mean the slow subscriber +// re-polls often, and most of those polls land after a truncation that has +// passed its position — so it sees several jumps during the run rather than +// one batched at the end. +const TICKER_COUNT = 30; +const INTERVAL_MS = 200; +const TRUNCATE_EVERY = 2; +const KEEP_LAST = 1; +const SLOW_SUBSCRIBER_DELAY_MS = 1500; + +const LANE_WIDTH = 32; +const SEP = '│'; + +function emitFast(message: string): void { + console.log(`${message.padEnd(LANE_WIDTH)} ${SEP}`); +} + +function emitSlow(message: string): void { + console.log(`${' '.repeat(LANE_WIDTH)} ${SEP} ${message}`); +} + +function emitHeader(): void { + const rule = '─'.repeat(LANE_WIDTH); + console.log( + `${'fast (every event)'.padEnd(LANE_WIDTH)} ${SEP} slow (sleeps ${SLOW_SUBSCRIBER_DELAY_MS / 1000}s between events)`, + ); + console.log(`${rule} ${SEP} ${rule}`); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function run() { + const connection = await Connection.connect({ address: 'localhost:7233' }); + const client = new Client({ connection }); + + const workflowId = `workflow-stream-ticker-${nanoid(8)}`; + const handle = await client.workflow.start(ticker, { + args: [{ count: TICKER_COUNT, keepLast: KEEP_LAST, truncateEvery: TRUNCATE_EVERY, intervalMs: INTERVAL_MS }], + taskQueue: TASK_QUEUE, + workflowId, + }); + const stream = WorkflowStreamClient.create(client, workflowId); + const lastN = TICKER_COUNT - 1; + + emitHeader(); + + async function fastSubscriber(): Promise { + for await (const item of stream.topic(TOPIC_TICK).subscribe(0)) { + emitFast(`offset=${String(item.offset).padStart(3)} n=${item.data.n}`); + if (item.data.n === lastN) return; + } + } + + async function slowSubscriber(): Promise { + let lastOffset = -1; + for await (const item of stream.topic(TOPIC_TICK).subscribe(0)) { + if (lastOffset >= 0 && item.offset > lastOffset + 1) { + const gap = item.offset - lastOffset - 1; + emitSlow(`↪ jumped offset=${lastOffset} → ${item.offset} (${gap} dropped)`); + } + emitSlow(`offset=${String(item.offset).padStart(3)} n=${item.data.n}`); + lastOffset = item.offset; + if (item.data.n === lastN) return; + await sleep(SLOW_SUBSCRIBER_DELAY_MS); + } + } + + await Promise.all([fastSubscriber(), slowSubscriber()]); + + const result = await handle.result(); + console.log(); + console.log(`workflow result: ${result}`); + await connection.close(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/shared.ts b/workflow-streams/src/shared.ts new file mode 100644 index 00000000..566edfd6 --- /dev/null +++ b/workflow-streams/src/shared.ts @@ -0,0 +1,61 @@ +import type { WorkflowStreamState } from '@temporalio/workflow-streams/workflow'; + +export const TASK_QUEUE = 'workflow-stream-sample-task-queue'; + +// Topics published by the workflow / activity. +export const TOPIC_STATUS = 'status'; +export const TOPIC_PROGRESS = 'progress'; +export const TOPIC_NEWS = 'news'; +export const TOPIC_TICK = 'tick'; + +export interface OrderInput { + orderId: string; + // Carries stream state across continue-as-new. undefined on a fresh start. + streamState?: WorkflowStreamState; +} + +export interface StatusEvent { + kind: string; + orderId: string; +} + +export interface ProgressEvent { + message: string; +} + +export interface PipelineInput { + pipelineId: string; + streamState?: WorkflowStreamState; +} + +export interface StageEvent { + stage: string; +} + +export interface HubInput { + hubId: string; + streamState?: WorkflowStreamState; +} + +export interface NewsEvent { + headline: string; +} + +export interface TickerInput { + count?: number; + keepLast?: number; + truncateEvery?: number; + intervalMs?: number; + streamState?: WorkflowStreamState; +} + +export const TICKER_DEFAULTS = { + count: 20, + keepLast: 3, + truncateEvery: 5, + intervalMs: 400, +}; + +export interface TickEvent { + n: number; +} diff --git a/workflow-streams/src/worker.ts b/workflow-streams/src/worker.ts new file mode 100644 index 00000000..d0687140 --- /dev/null +++ b/workflow-streams/src/worker.ts @@ -0,0 +1,26 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import * as activities from './activities'; +import { TASK_QUEUE } from './shared'; + +async function run() { + const connection = await NativeConnection.connect({ + address: 'localhost:7233', + }); + try { + const worker = await Worker.create({ + connection, + namespace: 'default', + taskQueue: TASK_QUEUE, + workflowsPath: require.resolve('./workflows'), + activities, + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/workflow-streams/src/workflows.ts b/workflow-streams/src/workflows.ts new file mode 100644 index 00000000..bccfa7c4 --- /dev/null +++ b/workflow-streams/src/workflows.ts @@ -0,0 +1,153 @@ +import { condition, defineSignal, proxyActivities, setHandler, sleep } from '@temporalio/workflow'; +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; +import type * as activities from './activities'; +import { + TICKER_DEFAULTS, + TOPIC_PROGRESS, + TOPIC_STATUS, + TOPIC_TICK, + type HubInput, + type OrderInput, + type PipelineInput, + type ProgressEvent, + type StageEvent, + type StatusEvent, + type TickerInput, + type TickEvent, +} from './shared'; + +const { chargeCard } = proxyActivities({ + startToCloseTimeout: '30 seconds', +}); + +/** + * Process a fake order, publishing status and progress events. + * + * The workflow itself publishes status changes; an activity it runs publishes + * finer-grained progress events using a `WorkflowStreamClient`. A single + * stream carries both topics — subscribers can filter on the topic(s) they + * care about. + * + * Construct the stream as the first statement of the workflow function so the + * publish/poll/offset handlers are registered before the workflow accepts any + * messages. Threading `streamState` lets the workflow survive continue-as-new + * without losing buffered items. + */ +export async function order(input: OrderInput): Promise { + const stream = new WorkflowStream(input.streamState); + const status = stream.topic(TOPIC_STATUS); + const progress = stream.topic(TOPIC_PROGRESS); + + status.publish({ kind: 'received', orderId: input.orderId }); + + const chargeId = await chargeCard(input.orderId); + + status.publish({ kind: 'shipped', orderId: input.orderId }); + progress.publish({ message: `charge id: ${chargeId}` }); + status.publish({ kind: 'complete', orderId: input.orderId }); + // The "complete" status event above is the in-band terminator subscribers + // break on (see run-publisher.ts). Hold the run open briefly so subscribers' + // next poll delivers it before this task returns and the in-memory log is + // gone. + await sleep('500 milliseconds'); + return chargeId; +} + +/** + * Multi-stage pipeline that publishes stage transitions over time. + * + * Stages are spaced out with `sleep` so a subscriber can realistically + * disconnect partway through and reconnect without the pipeline finishing in + * the meantime — the shape needed to demo the "show up late and still see + * what happened" pattern. + */ +export async function pipeline(input: PipelineInput): Promise { + const stream = new WorkflowStream(input.streamState); + const status = stream.topic(TOPIC_STATUS); + + const stages = ['validating', 'loading data', 'transforming', 'writing output', 'verifying', 'complete']; + for (const stage of stages) { + status.publish({ stage }); + if (stage !== 'complete') { + await sleep('2 seconds'); + } + } + // The "complete" stage above is the in-band terminator subscribers break + // on. Hold the run open briefly so the final poll delivers it. + await sleep('500 milliseconds'); + return `pipeline ${input.pipelineId} done`; +} + +export const closeSignal = defineSignal('close'); + +/** + * Passive stream host: starts up, waits, closes when told. + * + * Unlike `order` or `pipeline`, this workflow does no work of its own — it + * exists only to host a `WorkflowStream` that external publishers push events + * into and external subscribers read from. The shape that fits a backend + * service or "event bus" pattern, where the workflow owns durable state but + * the events come from outside. + */ +export async function hub(input: HubInput): Promise { + new WorkflowStream(input.streamState); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await condition(() => closed); + // The publisher publishes its own terminator into the stream before + // signaling close (see run-external-publisher.ts). Hold the run open + // briefly so subscribers' final poll delivers any items still in the log. + await sleep('500 milliseconds'); + return `hub ${input.hubId} closed`; +} + +/** + * Long-running ticker that bounds its event log via `truncate`. + * + * Long-running workflows that publish high volumes of events would otherwise + * grow their event log unboundedly. This workflow shows the truncation + * pattern: every `truncateEvery` events, drop everything except the last + * `keepLast` entries by calling `stream.truncate(safeOffset)`. + * + * Subscribers that fall behind a truncation jump forward to the new base + * offset transparently (the iterator handles the `TruncatedOffset` error + * internally), so consumers stay live but may not see every intermediate + * event. That is the trade: bounded log size in exchange for at-best-effort + * delivery to slow consumers. + * + * To compute the truncation offset the workflow tracks its own published + * count. `WorkflowStream` does not expose a workflow-side head-offset + * accessor, but the running count plus the carried `base_offset` (in + * continue-as-new chains) is sufficient. + */ +export async function ticker(input: TickerInput): Promise { + const count = input.count ?? TICKER_DEFAULTS.count; + const keepLast = input.keepLast ?? TICKER_DEFAULTS.keepLast; + const truncateEvery = input.truncateEvery ?? TICKER_DEFAULTS.truncateEvery; + const intervalMs = input.intervalMs ?? TICKER_DEFAULTS.intervalMs; + + const stream = new WorkflowStream(input.streamState); + const tick = stream.topic(TOPIC_TICK); + // Running count of events published by THIS run. To compute a global + // offset, add the priorState's base_offset (omitted here — this sample + // doesn't continue-as-new). + let published = 0; + + for (let n = 0; n < count; n++) { + tick.publish({ n }); + published += 1; + await sleep(intervalMs); + if (published % truncateEvery === 0 && published > keepLast) { + // Drop everything except the last `keepLast` entries. + stream.truncate(published - keepLast); + } + } + // The final tick (n === count - 1) is the in-band terminator subscribers + // break on. `keepLast` guarantees that final offset survives the last + // truncation so even slow consumers eventually see it. Hold the run open + // briefly so the final poll delivers it. + await sleep('500 milliseconds'); + return `ticker emitted ${published} events`; +} diff --git a/workflow-streams/tsconfig.json b/workflow-streams/tsconfig.json new file mode 100644 index 00000000..488f2c62 --- /dev/null +++ b/workflow-streams/tsconfig.json @@ -0,0 +1,13 @@ +{ + "extends": "@tsconfig/node22/tsconfig.json", + "version": "5.6.3", + "compilerOptions": { + "lib": ["es2021"], + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +}