Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"vscode-debugger",
"worker-specific-task-queues",
"worker-versioning",
"workflow-streams",
"message-passing/execute-update",
"message-passing/introduction",
"message-passing/safe-message-handlers",
Expand Down
3 changes: 3 additions & 0 deletions workflow-streams/.eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
.eslintrc.js
48 changes: 48 additions & 0 deletions workflow-streams/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { builtinModules } = require('module');

const ALLOWED_NODE_BUILTINS = new Set(['assert']);

module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
project: './tsconfig.json',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint', 'deprecation'],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
],
rules: {
// recommended for safety
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
'deprecation/deprecation': 'warn',

// code style preference
'object-shorthand': ['error', 'always'],

// relaxed rules, for convenience
'@typescript-eslint/no-unused-vars': [
'warn',
{
argsIgnorePattern: '^_',
varsIgnorePattern: '^_',
},
],
'@typescript-eslint/no-explicit-any': 'off',
},
overrides: [
{
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts', 'src/llm-workflows.ts'],
rules: {
'no-restricted-imports': [
'error',
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
],
},
},
],
};
2 changes: 2 additions & 0 deletions workflow-streams/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib
node_modules
1 change: 1 addition & 0 deletions workflow-streams/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
1 change: 1 addition & 0 deletions workflow-streams/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
22
1 change: 1 addition & 0 deletions workflow-streams/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
2 changes: 2 additions & 0 deletions workflow-streams/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
printWidth: 120
singleQuote: true
120 changes: 120 additions & 0 deletions workflow-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Workflow Streams

> **Experimental.** These samples use `@temporalio/workflow-streams`, which is
> currently distributed on the `contrib/pubsub` branch of `sdk-typescript`. The
> module is experimental and its API may change.

`@temporalio/workflow-streams` lets a workflow host a durable,
offset-addressed event channel. The workflow holds an append-only log;
external clients (activities, starters, web backends) publish to topics via
signals and subscribe via long-poll updates. This packages the
boilerplate — batching, offset tracking, topic filtering, continue-as-new
hand-off — into a reusable stream.

The package has no root entrypoint; import from the two subpaths instead:

- `@temporalio/workflow-streams/workflow` — workflow-safe surface
(`WorkflowStream`, `WorkflowStreamState`, …). Safe to bundle into workflow
code.
- `@temporalio/workflow-streams/client` — client surface
(`WorkflowStreamClient`, …). Pulls in `crypto` and `@temporalio/client`; do
not import from a workflow file.

This sample has five scenarios. The first four share one worker; the fifth
has its own worker because it needs the `openai` package and an
`OPENAI_API_KEY`.

**Scenario 1 — basic publish/subscribe with heterogeneous topics:**

- `src/workflows.ts` (`order`) — a workflow that hosts a `WorkflowStream` and
publishes status events as it processes an order.
- `src/activities.ts` (`chargeCard`) — an activity that publishes intermediate
progress to the stream via `WorkflowStreamClient.fromActivity()`.
- `src/run-publisher.ts` — starts the workflow, subscribes to both topics,
decodes each by `item.topic`, and prints events as they arrive.

**Scenario 2 — reconnecting subscriber:**

- `src/workflows.ts` (`pipeline`) — a multi-stage pipeline that publishes
stage transitions over ~10 seconds, leaving room for a consumer to
disconnect and reconnect mid-run.
- `src/run-reconnecting-subscriber.ts` — connects, reads a couple of events,
"disconnects," then reopens a fresh client and resumes via
`subscribe(fromOffset)`. This is the central Workflow Streams use case: a
consumer can disappear (page refresh, server restart, laptop closed) and
resume later without missing events or seeing duplicates.

**Scenario 3 — external (non-Activity) publisher:**

- `src/workflows.ts` (`hub`) — a passive workflow that does no work of its
own; it exists only to host a `WorkflowStream` and shut down when signaled.
- `src/run-external-publisher.ts` — starts the hub, then publishes events
into it from a plain Node async function using
`WorkflowStreamClient.create(client, workflowId)`. A subscriber runs
alongside; when the publisher is done it emits a sentinel event and signals
`close`. The shape that fits a backend service or scheduled job pushing
events into a workflow it didn't itself start.

**Scenario 4 — bounded log via `truncate()`:**

- `src/workflows.ts` (`ticker`) — a long-running workflow that publishes
events at a fixed cadence and calls `stream.truncate(...)` periodically to
bound log growth, keeping only the most recent N entries.
- `src/run-truncating-ticker.ts` — runs a fast subscriber and a slow
subscriber side by side. The fast one keeps up and sees every offset in
order; the slow one falls behind a truncation and silently jumps forward to
the new base offset. The output makes the trade visible: bounded log size
in exchange for intermediate events being invisible to slow consumers.

**Scenario 5 — LLM streaming:**

- `src/llm-workflows.ts` (`llmStreaming`) — hosts a `WorkflowStream` and runs
`streamCompletion` as a single activity. The workflow itself does no
streaming; the activity owns the non-deterministic OpenAI call.
- `src/llm-activities.ts` (`streamCompletion`) — calls
`openai.chat.completions.create({ stream: true })`, publishes each token
chunk on the `delta` topic, the final accumulated text on `complete`, and a
`RetryEvent` on `retry` when running on attempt > 1.
- `src/run-llm.ts` — subscribes to all three topics, renders deltas to the
terminal as they arrive, and on a `retry` event uses ANSI escapes to rewind
the printed output before the retried attempt re-publishes.

Scenario 5 runs on its own worker (`src/llm-worker.ts`, on
`workflow-stream-llm-task-queue`) because it needs the `openai` dependency
and an `OPENAI_API_KEY`, and because killing this worker mid-stream is the
easiest way to demonstrate retry handling without disrupting the other four
scenarios.

## Run it

1. `temporal server start-dev` to start [Temporal
Server](https://github.com/temporalio/cli/#installation).
2. `npm install` to install dependencies.

For scenarios 1–4, start the shared worker:

```bash
npm run start
```

For scenario 5, export the API key and start the LLM worker:

```bash
export OPENAI_API_KEY=...
npm run start.llm
```

Then in another terminal, pick a scenario:

```bash
npm run workflow.publisher # scenario 1
npm run workflow.reconnecting # scenario 2
npm run workflow.external # scenario 3
npm run workflow.ticker # scenario 4
npm run workflow.llm # scenario 5
```

To exercise scenario 5's retry path, kill the LLM worker (Ctrl-C) while
output is streaming and start it again. The activity's next attempt sends a
retry event first; the consumer clears its on-screen output via ANSI escapes
and re-renders from scratch.
53 changes: 53 additions & 0 deletions workflow-streams/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"name": "temporal-workflow-streams",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"format": "prettier --write .",
"format:check": "prettier --check .",
"lint": "eslint .",
"start": "ts-node src/worker.ts",
"start.watch": "nodemon src/worker.ts",
"start.llm": "ts-node src/llm-worker.ts",
"start.llm.watch": "nodemon src/llm-worker.ts",
"workflow.publisher": "ts-node src/run-publisher.ts",
"workflow.reconnecting": "ts-node src/run-reconnecting-subscriber.ts",
"workflow.external": "ts-node src/run-external-publisher.ts",
"workflow.ticker": "ts-node src/run-truncating-ticker.ts",
"workflow.llm": "ts-node src/run-llm.ts"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "^1.17.2",
"@temporalio/client": "^1.17.2",
"@temporalio/common": "^1.17.2",
"@temporalio/worker": "^1.17.2",
"@temporalio/workflow": "^1.17.2",
"@temporalio/workflow-streams": "file:../../sdk-typescript/contrib/workflow-streams",
"nanoid": "3.x",
"openai": "^4.77.0"
},
"devDependencies": {
"@tsconfig/node22": "^22.0.0",
"@types/node": "^22.9.1",
"@typescript-eslint/eslint-plugin": "^8.18.0",
"@typescript-eslint/parser": "^8.18.0",
"eslint": "^8.57.1",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-deprecation": "^3.0.0",
"nodemon": "^3.1.7",
"prettier": "^3.4.2",
"ts-node": "^10.9.2",
"typescript": "^5.6.3"
}
}
20 changes: 20 additions & 0 deletions workflow-streams/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import { TOPIC_PROGRESS, type ProgressEvent } from './shared';

/**
* Pretend to charge a card, publishing progress to the parent workflow.
*
* `WorkflowStreamClient.fromActivity()` reads the parent workflow id and the
* Temporal client from the activity context, so this activity can push events
* back without any wiring.
*/
export async function chargeCard(orderId: string): Promise<string> {
await using client = WorkflowStreamClient.fromActivity({ batchInterval: '200 milliseconds' });
const progress = client.topic<ProgressEvent>(TOPIC_PROGRESS);

progress.publish({ message: 'charging card...' });
await new Promise((resolve) => setTimeout(resolve, 1000));
progress.publish({ message: 'card charged' });

return `charge-${orderId}`;
}
62 changes: 62 additions & 0 deletions workflow-streams/src/llm-activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import OpenAI from 'openai';
import {
DEFAULT_LLM_MODEL,
TOPIC_COMPLETE,
TOPIC_DELTA,
TOPIC_RETRY,
type LLMInput,
type RetryEvent,
type TextComplete,
type TextDelta,
} from './llm-shared';

/**
* Stream an LLM completion to the parent workflow's stream.
*
* Activity-as-publisher: each delta from the OpenAI streaming API is pushed to
* the workflow's stream as a `TextDelta` event on the `delta` topic. The
* accumulated full text returns as the activity's result and is also published
* on the `complete` topic as a terminator. On retry attempts
* (`Context.current().info.attempt > 1`) a `RetryEvent` lands on the `retry`
* topic before the new attempt's deltas, so consumers can reset their
* accumulated state instead of concatenating the failed attempt's partial
* output with the retried attempt's full output.
*
* No `forceFlush: true`: the 200ms `batchInterval` is fast enough for an
* interactive feel, and the WorkflowStreamClient's asyncDispose flushes
* cleanly on scope exit.
*/
export async function streamCompletion(input: LLMInput): Promise<string> {
await using client = WorkflowStreamClient.fromActivity({ batchInterval: '200 milliseconds' });
// Disable provider-side retries; let Temporal own retry policy at the
// activity layer.
const openai = new OpenAI({ maxRetries: 0 });

const deltas = client.topic<TextDelta>(TOPIC_DELTA);
const complete = client.topic<TextComplete>(TOPIC_COMPLETE);
const retry = client.topic<RetryEvent>(TOPIC_RETRY);

const attempt = Context.current().info.attempt;
if (attempt > 1) {
retry.publish({ attempt });
}

const full: string[] = [];
const stream = await openai.chat.completions.create({
model: input.model ?? DEFAULT_LLM_MODEL,
messages: [{ role: 'user', content: input.prompt }],
stream: true,
});
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content;
if (!text) continue;
deltas.publish({ text });
full.push(text);
}

const fullText = full.join('');
complete.publish({ fullText });
return fullText;
}
38 changes: 38 additions & 0 deletions workflow-streams/src/llm-shared.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Types and constants for the LLM-streaming scenario.
*
* Kept separate from `shared.ts` because the other scenarios don't use these,
* and this scenario runs on its own worker and task queue so the `openai`
* dependency stays out of everyone else's path.
*/

import type { WorkflowStreamState } from '@temporalio/workflow-streams/workflow';

// Scenario 5 runs on its own worker so the openai dependency only matters
// for that scenario.
export const LLM_TASK_QUEUE = 'workflow-stream-llm-task-queue';

// Topics published by the activity.
export const TOPIC_DELTA = 'delta';
export const TOPIC_COMPLETE = 'complete';
export const TOPIC_RETRY = 'retry';

export const DEFAULT_LLM_MODEL = 'gpt-5-mini';

export interface LLMInput {
prompt: string;
model?: string;
streamState?: WorkflowStreamState;
}

export interface TextDelta {
text: string;
}

export interface TextComplete {
fullText: string;
}

export interface RetryEvent {
attempt: number;
}
Loading
Loading