diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 133b32ee8..57a888f36 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -4,4 +4,5 @@ # someone opens a pull request. * @temporalio/sdk -/contrib/ai-sdk/ @temporalio/sdk @temporalio/ai-sdk +/contrib/ai-sdk/ @temporalio/sdk @temporalio/ai-sdk +/contrib/workflow-streams/ @temporalio/sdk @temporalio/ai-sdk diff --git a/.github/workflows/conventions.yml b/.github/workflows/conventions.yml index 438feb87a..54d5a4585 100644 --- a/.github/workflows/conventions.yml +++ b/.github/workflows/conventions.yml @@ -53,5 +53,7 @@ jobs: - run: pnpm run lint:check env: - NODE_OPTIONS: '--max-old-space-size=8192' + NODE_OPTIONS: --max-old-space-size=8192 - run: pnpm run lint:prune + env: + NODE_OPTIONS: --max-old-space-size=8192 diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 0fbfc0502..77cfdaf9a 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -51,6 +51,7 @@ jobs: run: pnpm run docs env: ALGOLIA_API_KEY: ${{ secrets.ALGOLIA_API_KEY }} + NODE_OPTIONS: --max-old-space-size=8192 - name: Publish production docs if: ${{ inputs.publish_target == 'prod' }} diff --git a/contrib/ai-sdk/package.json b/contrib/ai-sdk/package.json index 49094494c..bae2a1c71 100644 --- a/contrib/ai-sdk/package.json +++ b/contrib/ai-sdk/package.json @@ -24,9 +24,12 @@ "workerThreads": false }, "dependencies": { + "@temporalio/activity": "workspace:*", + "@temporalio/client": "workspace:*", "@temporalio/common": "workspace:*", "@temporalio/plugin": "workspace:*", "@temporalio/workflow": "workspace:*", + "@temporalio/workflow-streams": "workspace:*", "@ungap/structured-clone": "^1.3.0", "headers-polyfill": "^4.0.3", "web-streams-polyfill": "^4.2.0" diff --git a/contrib/ai-sdk/src/__tests__/test-ai-sdk.ts b/contrib/ai-sdk/src/__tests__/test-ai-sdk.ts index 8b63f4e75..1746df625 100644 --- a/contrib/ai-sdk/src/__tests__/test-ai-sdk.ts +++ b/contrib/ai-sdk/src/__tests__/test-ai-sdk.ts @@ -41,7 +41,13 @@ import { workflowInterceptorModules } from '@temporalio/testing'; import { bundleWorkflowCode, DefaultLogger, Runtime } from '@temporalio/worker'; import type { InjectedSinks } from '@temporalio/worker'; import type { BaseContext } from '@temporalio/test-helpers'; -import { test as anyTest, helpers, Worker, TestWorkflowEnvironment } from '@temporalio/test-helpers'; +import { + test as anyTest, + createBaseBundlerOptions, + helpers, + Worker, + TestWorkflowEnvironment, +} from '@temporalio/test-helpers'; import { AiSdkPlugin, createActivities } from '..'; import { embeddingWorkflow, @@ -234,6 +240,7 @@ const test = anyTest as TestFn; test.before(async (t) => { const env = await TestWorkflowEnvironment.createLocal(); const workflowBundle = await bundleWorkflowCode({ + ...createBaseBundlerOptions(), workflowsPath: require.resolve('./workflows/ai-sdk'), workflowInterceptorModules, logger: new DefaultLogger('WARN'), @@ -465,6 +472,7 @@ test('Telemetry', async (t) => { ], taskQueue: 'test-ai-telemetry', workflowsPath: require.resolve('./workflows/ai-sdk'), + bundlerOptions: createBaseBundlerOptions(), interceptors: { client: { diff --git a/contrib/ai-sdk/src/activities.ts b/contrib/ai-sdk/src/activities.ts index 87ea3631e..7c85e6556 100644 --- a/contrib/ai-sdk/src/activities.ts +++ b/contrib/ai-sdk/src/activities.ts @@ -1,15 +1,24 @@ import type { LanguageModelV3CallOptions, + LanguageModelV3Content, + LanguageModelV3FinishReason, LanguageModelV3GenerateResult, + LanguageModelV3Usage, EmbeddingModelV3Result, SharedV3ProviderOptions, SharedV3Headers, + SharedV3Warning, ProviderV3, } from '@ai-sdk/provider'; import { asSchema, type Schema, type ToolExecutionOptions } from 'ai'; import { ApplicationFailure } from '@temporalio/common'; +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import type { Duration } from '@temporalio/common/lib/time'; import type { McpClientFactories, McpClientFactory } from './mcp'; +const encoder = new TextEncoder(); + /** * Arguments for invoking a language model activity. */ @@ -18,6 +27,16 @@ export interface InvokeModelArgs { options: LanguageModelV3CallOptions; } +/** + * Arguments for invoking a streaming language model activity. + */ +export interface InvokeModelStreamingArgs extends InvokeModelArgs { + modelId: string; + options: LanguageModelV3CallOptions; + streamingTopic: string; + streamingBatchInterval?: Duration; +} + /** * Result from a language model invocation. * This is an alias to the AI SDK's LanguageModelV3GenerateResult for type safety. @@ -75,6 +94,118 @@ export function createActivities(provider: ProviderV3, mcpClientFactories?: McpC const model = provider.languageModel(args.modelId); return await model.doGenerate(args.options); }, + + /** + * Streaming-aware model activity. + * + * Calls `model.doStream()`, publishes each yielded AI SDK stream part + * as JSON to the stream side channel, and returns the assembled + * `LanguageModelV3GenerateResult`. Consumers receive native AI SDK + * stream-part types (text-delta, reasoning-delta, tool-input-delta, + * response-metadata, finish, ...); no normalization happens here. + */ + async invokeModelStreaming(args: InvokeModelStreamingArgs): Promise { + await using stream = WorkflowStreamClient.fromWithinActivity({ + batchInterval: args.streamingBatchInterval ?? '100 milliseconds', + }); + const events = stream.topic(args.streamingTopic); + + const model = provider.languageModel(args.modelId); + const streamResult = await model.doStream(args.options); + + const content: LanguageModelV3Content[] = []; + let finishReason: LanguageModelV3FinishReason = { unified: 'other', raw: undefined }; + let usage: LanguageModelV3Usage = { + inputTokens: { total: undefined, noCache: undefined, cacheRead: undefined, cacheWrite: undefined }, + outputTokens: { total: undefined, text: undefined, reasoning: undefined }, + }; + const warnings: SharedV3Warning[] = []; + let responseMetadata: Record | undefined; + + const textBlocks = new Map(); + const reasoningBlocks = new Map(); + + const reader = streamResult.stream.getReader(); + + while (true) { + const { done, value: part } = await reader.read(); + if (done) break; + + Context.current().heartbeat(); + + // Publish the raw stream part as JSON so consumers can switch on + // the native AI SDK type. Accumulation below is for the final + // assembled result this activity returns. + events.publish(encoder.encode(JSON.stringify(part))); + + switch (part.type) { + case 'stream-start': + warnings.push(...part.warnings); + break; + case 'text-start': + textBlocks.set(part.id, ''); + break; + case 'text-delta': + textBlocks.set(part.id, (textBlocks.get(part.id) ?? '') + part.delta); + break; + case 'text-end': + content.push({ + type: 'text', + text: textBlocks.get(part.id) ?? '', + providerMetadata: part.providerMetadata, + }); + textBlocks.delete(part.id); + break; + case 'reasoning-start': + reasoningBlocks.set(part.id, ''); + break; + case 'reasoning-delta': + reasoningBlocks.set(part.id, (reasoningBlocks.get(part.id) ?? '') + part.delta); + break; + case 'reasoning-end': + content.push({ + type: 'reasoning', + text: reasoningBlocks.get(part.id) ?? '', + providerMetadata: part.providerMetadata, + }); + reasoningBlocks.delete(part.id); + break; + case 'response-metadata': + responseMetadata = { + id: part.id, + timestamp: part.timestamp, + modelId: part.modelId, + }; + break; + case 'finish': + finishReason = part.finishReason; + usage = part.usage; + break; + default: + // tool-call, tool-result, file, source — collect as content + if ( + 'type' in part && + (part.type === 'tool-call' || + part.type === 'tool-result' || + part.type === 'file' || + part.type === 'source') + ) { + content.push(part); + } + break; + } + } + + return { + content, + finishReason, + usage, + warnings, + request: streamResult.request, + response: responseMetadata ? { ...responseMetadata, ...streamResult.response } : streamResult.response, + }; + }, + async invokeEmbeddingModel(args: InvokeEmbeddingModelArgs): Promise { const model = provider.embeddingModel(args.modelId); return await model.doEmbed({ diff --git a/contrib/ai-sdk/src/mcp.ts b/contrib/ai-sdk/src/mcp.ts index 88cf6e32b..2248c0c13 100644 --- a/contrib/ai-sdk/src/mcp.ts +++ b/contrib/ai-sdk/src/mcp.ts @@ -44,7 +44,7 @@ export class TemporalMCPClient { toolName, { description: toolResult.description, - execute: async (input, options) => { + execute: async (input: unknown, options: unknown) => { const activities = workflow.proxyActivities({ summary: toolName, startToCloseTimeout: '10 minutes', diff --git a/contrib/ai-sdk/src/provider.ts b/contrib/ai-sdk/src/provider.ts index b3eaf34ff..e1b9fad6d 100644 --- a/contrib/ai-sdk/src/provider.ts +++ b/contrib/ai-sdk/src/provider.ts @@ -1,3 +1,4 @@ +import type { ReadableStreamDefaultController } from 'node:stream/web'; import type { EmbeddingModelV3, EmbeddingModelV3CallOptions, @@ -13,6 +14,11 @@ import type { import * as workflow from '@temporalio/workflow'; import type { ActivityOptions } from '@temporalio/workflow'; import { ApplicationFailure } from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; + +// `ReadableStream` is a sandbox global; type-only import keeps `node:stream/web` +// out of the workflow bundle (es2023 lib has no DOM types). +declare const ReadableStream: typeof import('node:stream/web').ReadableStream; /** * Options for configuring the TemporalProvider with per-model activity settings. @@ -28,7 +34,22 @@ export interface TemporalProviderOptions { * Activity options specific to language model calls. * Merged with default options, with these taking precedence. */ - languageModel?: ActivityOptions; + languageModel?: ActivityOptions & { + /** + * Topic name on the workflow's stream that streaming model calls publish + * raw stream parts to. When set, `doStream` is enabled and routes through + * the streaming activity; when unset, `doStream` throws. Pick a unique + * name per concurrent streaming call to keep event streams separable. + */ + streamingTopic?: string; + + /** + * Batch interval for the per-activity `WorkflowStreamClient` that + * publishes stream parts back to the workflow. Lower values reduce + * latency at the cost of more signal traffic. Defaults to 100ms. + */ + streamingBatchInterval?: Duration; + }; /** * Activity options specific to embedding model calls. @@ -46,11 +67,16 @@ export interface TemporalProviderOptions { export class TemporalLanguageModel implements LanguageModelV3 { readonly specificationVersion = 'v3'; readonly provider = 'temporal'; + private readonly streamingTopic: string | undefined; + private readonly streamingBatchInterval: Duration | undefined; constructor( readonly modelId: string, - readonly options?: ActivityOptions - ) {} + readonly options?: ActivityOptions & { streamingTopic?: string; streamingBatchInterval?: Duration } + ) { + this.streamingTopic = options?.streamingTopic; + this.streamingBatchInterval = options?.streamingBatchInterval; + } get supportedUrls(): Record { return {}; @@ -75,8 +101,59 @@ export class TemporalLanguageModel implements LanguageModelV3 { return result; } - doStream(_options: LanguageModelV3CallOptions): PromiseLike { - throw ApplicationFailure.nonRetryable('Streaming not supported.'); + async doStream(options: LanguageModelV3CallOptions): Promise { + if (this.streamingTopic === undefined) { + throw ApplicationFailure.nonRetryable( + 'Streaming not enabled. Set streamingTopic in languageModel provider options.' + ); + } + + // Call the streaming activity, which publishes tokens via stream + // and returns the accumulated result. + const activities = workflow.proxyActivities({ + startToCloseTimeout: '10 minutes', + ...this.options, + }); + const result = await activities.invokeModelStreaming!({ + modelId: this.modelId, + options, + streamingTopic: this.streamingTopic, + streamingBatchInterval: this.streamingBatchInterval, + }); + if (result === undefined) { + throw ApplicationFailure.nonRetryable('Received undefined response from streaming model activity.'); + } + + // Wrap the accumulated result as a ReadableStream that replays the content. + // Real-time token streaming already happened via stream in the activity. + const stream = new ReadableStream({ + start(controller: ReadableStreamDefaultController) { + controller.enqueue({ type: 'stream-start', warnings: result.warnings ?? [] }); + let partIndex = 0; + for (const item of result.content ?? []) { + const id = `part-${partIndex++}`; + if (item.type === 'text') { + controller.enqueue({ type: 'text-start', id }); + controller.enqueue({ type: 'text-delta', id, delta: item.text }); + controller.enqueue({ type: 'text-end', id }); + } else if (item.type === 'reasoning') { + controller.enqueue({ type: 'reasoning-start', id }); + controller.enqueue({ type: 'reasoning-delta', id, delta: item.text }); + controller.enqueue({ type: 'reasoning-end', id }); + } else { + controller.enqueue(item); + } + } + controller.enqueue({ + type: 'finish', + finishReason: result.finishReason, + usage: result.usage, + }); + controller.close(); + }, + }); + + return { stream, request: result.request, response: result.response }; } } diff --git a/contrib/workflow-streams/README.md b/contrib/workflow-streams/README.md new file mode 100644 index 000000000..790501f59 --- /dev/null +++ b/contrib/workflow-streams/README.md @@ -0,0 +1,207 @@ +# Temporal Workflow Streams + +**Workflow Streams** — a Temporal SDK contrib library that gives a workflow a +durable, offset-addressed event channel built from Signals and polling Updates +with an SSE bridge. Cost scales with durable batches, not tokens. Latency is +around 100ms per roundtrip; not for ultra-low-latency voice. + +Workflows sometimes need to push incremental updates to external observers. +Examples include providing customer updates during order processing, creating +interactive experiences with AI agents, or reporting progress from a +long-running data pipeline. Temporal's core primitives (workflows, signals, and +updates) already provide the building blocks, but wiring up batching, offset +tracking, topic filtering, and continue-as-new hand-off is non-trivial. + +This module packages that boilerplate into a reusable workflow-side stream +object and external client. The workflow holds an append-only log of +`(topic, data)` entries. Applications can interact directly from the workflow, +or from external clients such as activities, starters, and other workflows. +Under the hood, publishing uses signals (fire-and-forget) while subscribing +uses updates (long-poll). A configurable batching coalesces high-frequency +events, improving efficiency. + +Payloads are Temporal `Payload`s carrying the encoding metadata needed for +typed decode and cross-language interop. The codec chain (encryption, +PII-redaction, compression) runs once on the signal/update envelope that +carries each batch — **not** per item — so there is no double-encryption, and +codec behavior is symmetric between workflow-side and client-side publishing. + +## Quick Start + +### Workflow side + +Construct `new WorkflowStream()` at the start of your workflow function, then +get a typed handle for each topic via `stream.topic(name)` and call +`publish` on the handle: + +```typescript +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; + +interface StatusEvent { + state: 'started' | 'done'; +} + +export async function myWorkflow(input: MyInput): Promise { + const stream = new WorkflowStream(); + const status = stream.topic('status'); + + status.publish({ state: 'started' }); + await doWork(); + status.publish({ state: 'done' }); +} +``` + +The `WorkflowStream` constructor registers the `__temporal_workflow_stream_publish` signal, +`__temporal_workflow_stream_poll` update, and `__temporal_workflow_stream_offset` query handlers on your workflow. +Any value the default payload converter can serialize or +a pre-built `Payload` can be passed to `publish`. The type parameter `T` is +only a compile-time annotation. Repeated calls to `stream.topic('foo')` return the same +handle instance. The type parameter `T` is a compile-time annotation and doesn't affect handle identity. + +### Activity side (publishing) + +Use `WorkflowStreamClient.fromWithinActivity()` with `await using` for batched publishing +from inside an activity. The client and workflow ID are pulled from the +activity context. Bind a topic handle on the client and publish through it, +the same way as on the workflow side: + +```typescript +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; + +export async function streamEvents(): Promise { + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '2 seconds' }); + const events = client.topic('events'); + + for await (const chunk of generateChunks()) { + events.publish(chunk); + Context.current().heartbeat(); + } + // Buffer is flushed automatically on scope exit. +} +``` + +The background flusher starts on the first `publish()` and stops on scope +exit (`await using`). Outside an activity (e.g., a starter or BFF), use +`WorkflowStreamClient.create(temporalClient, workflowId)` the same way. + +Use `forceFlush: true` to trigger an immediate flush for latency-sensitive +events: + +```typescript +events.publish(data, { forceFlush: true }); +``` + +Use `await client.flush()` as an explicit barrier — returns once everything +published before the call has been signaled and acknowledged by the server: + +```typescript +events.publish(phase1Data); +await client.flush(); // phase 1 is durable on the workflow side +events.publish(phase2Data); +``` + +### Subscribing + +Subscribe via the topic handle to get items decoded as `T`: + +```typescript +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; + +const client = WorkflowStreamClient.create(temporalClient, workflowId); +const events = client.topic('events'); +for await (const item of events.subscribe(0)) { + // item.data is decoded to MyType via the default payload converter. + console.log(item.topic, item.offset, item.data); + if (isDone(item.data)) break; +} +``` + +For raw `Payload` access call +`WorkflowStreamClient.subscribe(topics?, fromOffset?)` directly. The yielded +items have `data: Payload` carrying encoding metadata; decode with +`defaultPayloadConverter.fromPayload(item.data)` per-topic. + +## Topics + +Topics allow subscribers to receive a subset of the messages in the workflow stream +system. Subscribers can request a list of specific topics, or provide an empty +list (or omit the argument) to receive messages from all topics. Publishing to +a topic implicitly creates it. + +## Continue-as-new + +Carry both your application state and workflow stream state across continue-as-new +boundaries: + +```typescript +import { workflowInfo } from '@temporalio/workflow'; +import { WorkflowStream, type WorkflowStreamState } from '@temporalio/workflow-streams/workflow'; + +interface WorkflowInput { + itemsProcessed: number; + streamState?: WorkflowStreamState; +} + +export async function myWorkflow(input: WorkflowInput): Promise { + let itemsProcessed = input.itemsProcessed; + const stream = new WorkflowStream(input.streamState); + + // ... do work, updating itemsProcessed ... + + if (workflowInfo().continueAsNewSuggested) { + await stream.continueAsNew((state) => [ + { + itemsProcessed, + streamState: state, + }, + ]); + } +} +``` + +`WorkflowStream.continueAsNew(buildArgs)` detaches waiting pollers, waits for +in-flight handlers to finish, then calls `continueAsNew` with the args +returned by `buildArgs(state)`. The lambda receives the post-detach +`WorkflowStreamState` as its only argument so the snapshot is guaranteed +to happen _after_ pollers detach. Subscribers created via +`WorkflowStreamClient.create()` automatically follow continue-as-new chains. + +If you need to pass other CAN options (search attributes, memo, +non-default `taskQueue`, etc.), fall back to the explicit recipe with +`makeContinueAsNewFunc`: + +```typescript +import { condition, allHandlersFinished, makeContinueAsNewFunc } from '@temporalio/workflow'; + +if (workflowInfo().continueAsNewSuggested) { + stream.detachPollers(); + await condition(allHandlersFinished); + const continueWithOptions = makeContinueAsNewFunc({ + taskQueue: 'other-tq', + }); + await continueWithOptions({ + itemsProcessed, + streamState: stream.getState(), + }); +} +``` + +## Cross-Language Protocol + +Any Temporal client can interact with a workflow stream workflow using these fixed +handler names: + +1. **Publish**: signal `__temporal_workflow_stream_publish` with `PublishInput` +2. **Subscribe**: update `__temporal_workflow_stream_poll` with `PollInput` -> `PollResult` +3. **Offset**: query `__temporal_workflow_stream_offset` -> `number` + +Each `PublishEntry.data` / `WorkflowStreamWireItem.data` is a base64-encoded +`temporal.api.common.v1.Payload` protobuf (`Payload.SerializeToString()` in +Python; equivalent `encodePayloadProto()` in this package). This keeps the +envelope JSON-serializable while preserving `Payload.metadata` for codec and +typed-decode paths. Cross-language clients can publish and subscribe by +following the same base64-of-serialized-`Payload` shape. The envelope types +(`PublishInput`, `PollResult`, `WorkflowStreamState`) require the default (JSON) data +converter — custom converters on the envelope layer break cross-language +interop. diff --git a/contrib/workflow-streams/package.json b/contrib/workflow-streams/package.json new file mode 100644 index 000000000..1217be994 --- /dev/null +++ b/contrib/workflow-streams/package.json @@ -0,0 +1,72 @@ +{ + "name": "@temporalio/workflow-streams", + "version": "1.17.0", + "description": "Temporal.io SDK Workflow Streams contrib module", + "exports": { + "./workflow": { + "types": "./lib/workflow.d.ts", + "default": "./lib/workflow.js" + }, + "./client": { + "types": "./lib/client.d.ts", + "default": "./lib/client.js" + }, + "./package.json": "./package.json" + }, + "typesVersions": { + "*": { + "workflow": ["./lib/workflow.d.ts"], + "client": ["./lib/client.d.ts"] + } + }, + "scripts": { + "build": "tsc --build", + "test": "ava ./lib/__tests__/test-*.js" + }, + "ava": { + "timeout": "120s", + "concurrency": 1, + "workerThreads": false + }, + "keywords": [ + "temporal", + "stream", + "streaming" + ], + "author": "Temporal Technologies Inc. ", + "license": "MIT", + "dependencies": { + "@temporalio/activity": "workspace:*", + "@temporalio/client": "workspace:*", + "@temporalio/common": "workspace:*", + "@temporalio/proto": "workspace:*", + "@temporalio/workflow": "workspace:*" + }, + "devDependencies": { + "@temporalio/test-helpers": "workspace:*", + "@temporalio/testing": "workspace:*", + "@temporalio/worker": "workspace:*", + "ava": "^5.3.1" + }, + "engines": { + "node": ">= 20.0.0" + }, + "bugs": { + "url": "https://github.com/temporalio/sdk-typescript/issues" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/temporalio/sdk-typescript.git", + "directory": "contrib/workflow-streams" + }, + "homepage": "https://github.com/temporalio/sdk-typescript/tree/main/contrib/workflow-streams", + "publishConfig": { + "access": "public" + }, + "files": [ + "src", + "lib", + "!src/__tests__", + "!lib/__tests__" + ] +} diff --git a/contrib/workflow-streams/src/__tests__/activities/workflow-streams.ts b/contrib/workflow-streams/src/__tests__/activities/workflow-streams.ts new file mode 100644 index 000000000..e54b0f5e2 --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/activities/workflow-streams.ts @@ -0,0 +1,80 @@ +/** + * Test activities for @temporalio/workflow-streams. + * + * These activities use `WorkflowStreamClient.fromWithinActivity()` to target the + * current activity's parent workflow from the activity context. + */ + +import { Context } from '@temporalio/activity'; +import { WorkflowStreamClient } from '../../client'; + +export async function publishItems(count: number): Promise { + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '500 milliseconds' }); + const events = client.topic('events'); + for (let i = 0; i < count; i++) { + Context.current().heartbeat(); + events.publish(`item-${i}`); + } +} + +/** Publishes `count` items as `Uint8Array` — exercises the binary/plain encoding path. */ +export async function publishBinaryItems(count: number): Promise { + const encoder = new TextEncoder(); + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '500 milliseconds' }); + const events = client.topic('events'); + for (let i = 0; i < count; i++) { + Context.current().heartbeat(); + events.publish(encoder.encode(`item-${i}`)); + } +} + +export async function publishMultiTopic(count: number): Promise { + const topicNames = ['a', 'b', 'c']; + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '500 milliseconds' }); + const handles = topicNames.map((name) => client.topic(name)); + for (let i = 0; i < count; i++) { + Context.current().heartbeat(); + const idx = i % handles.length; + handles[idx]!.publish(`${topicNames[idx]}-${i}`); + } +} + +export async function publishWithForceFlush(): Promise { + // Long batchInterval AND long post-publish hold ensure that only a + // working forceFlush wakeup can deliver items before dispose flushes. + // The hold is deliberately much longer than the test's collect timeout + // so a regression (forceFlush no-op) surfaces as a missing item rather + // than flaking on slow CI. + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '60 seconds' }); + const events = client.topic('events'); + events.publish('normal-0'); + events.publish('normal-1'); + events.publish('force-flush', { forceFlush: true }); + for (let i = 0; i < 100; i++) { + Context.current().heartbeat(); + await new Promise((resolve) => setTimeout(resolve, 100)); + } +} + +export async function publishBatchTest(count: number): Promise { + await using client = WorkflowStreamClient.fromWithinActivity({ batchInterval: '60 seconds' }); + const events = client.topic('events'); + for (let i = 0; i < count; i++) { + Context.current().heartbeat(); + events.publish(`item-${i}`); + } + // Long batchInterval — only the dispose-driven drain will flush. +} + +export async function publishWithMaxBatch(count: number): Promise { + await using client = WorkflowStreamClient.fromWithinActivity({ + batchInterval: '60 seconds', + maxBatchSize: 3, + }); + const events = client.topic('events'); + for (let i = 0; i < count; i++) { + Context.current().heartbeat(); + events.publish(`item-${i}`); + } + // Long batchInterval — maxBatchSize and dispose-driven drain handle flushing. +} diff --git a/contrib/workflow-streams/src/__tests__/helpers-integration.ts b/contrib/workflow-streams/src/__tests__/helpers-integration.ts new file mode 100644 index 000000000..e2fa2928c --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/helpers-integration.ts @@ -0,0 +1,42 @@ +/** + * Minimal integration-test harness for `@temporalio/workflow-streams` tests. + * + * Mirrors the slice of `packages/test/src/helpers-integration.ts` that these + * tests actually use: a `makeTestFunction` that wires up a TestWorkflowEnvironment + * and a prebuilt workflow bundle in `test.before`, and a `helpers(t)` that + * surfaces `createWorker` / `startWorkflow` per test. + */ + +import type { ExecutionContext, TestFn } from 'ava'; +import { + test as anyTest, + helpers as baseHelpers, + createTestWorkflowEnvironment, + createTestWorkflowBundle, + type BaseContext, + type BaseHelpers, + type TestWorkflowEnvironment, +} from '@temporalio/test-helpers'; + +export type Context = BaseContext; + +export interface TestFunctionOptions { + workflowsPath: string; +} + +export function makeTestFunction(opts: TestFunctionOptions): TestFn { + const test = anyTest as TestFn; + test.before(async (t) => { + const env = await createTestWorkflowEnvironment(); + const workflowBundle = await createTestWorkflowBundle({ workflowsPath: opts.workflowsPath }); + t.context = { env, workflowBundle }; + }); + test.after.always(async (t) => { + await t.context.env?.teardown(); + }); + return test; +} + +export function helpers(t: ExecutionContext): BaseHelpers { + return baseHelpers(t, t.context.env); +} diff --git a/contrib/workflow-streams/src/__tests__/test-workflow-streams-bundling.ts b/contrib/workflow-streams/src/__tests__/test-workflow-streams-bundling.ts new file mode 100644 index 000000000..a86bea8dd --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/test-workflow-streams-bundling.ts @@ -0,0 +1,22 @@ +/** + * Bundling test for `@temporalio/workflow-streams`. + * + * Runs the real `bundleWorkflowCode` walker (no `ignoreModules` allowlist) against + * a workflow file that imports from `@temporalio/workflow-streams/workflow`. + * The workflow entrypoint must not transitively reach `crypto`, `@temporalio/activity`, + * or `@temporalio/client`, otherwise the workflow sandbox check fails. + */ + +import type { TestFn } from 'ava'; +import anyTest from 'ava'; +import { bundleWorkflowCode } from '@temporalio/worker'; + +const test = anyTest as TestFn; + +test('workflow streams workflow entrypoint can be bundled', async (t) => { + await t.notThrowsAsync( + bundleWorkflowCode({ + workflowsPath: require.resolve('./workflows/workflow-streams'), + }) + ); +}); diff --git a/contrib/workflow-streams/src/__tests__/test-workflow-streams-interop.ts b/contrib/workflow-streams/src/__tests__/test-workflow-streams-interop.ts new file mode 100644 index 000000000..30accbed0 --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/test-workflow-streams-interop.ts @@ -0,0 +1,212 @@ +/** + * Wire-format interop tests for @temporalio/workflow-streams. + * + * These tests pin the exact byte layout produced by the TypeScript + * implementation so it stays compatible with the Python SDK, which + * uses `temporalio.api.common.v1.Payload` serialized via protobuf. + * + * Unlike `test-workflow-streams.ts`, these don't need a Temporal server — + * they are pure encode/decode unit tests. + */ + +import type { TestFn } from 'ava'; +import anyTest from 'ava'; +import { defaultPayloadConverter, type Payload } from '@temporalio/common'; +import { + decodeBase64, + decodePayloadProto, + decodePayloadWire, + encodeBase64, + encodePayloadProto, + encodePayloadWire, +} from '../workflow'; + +const test = anyTest as TestFn; +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +test('payload_proto_round_trips_for_default_json_string', (t) => { + // The default JSON converter wraps "hello" as: + // metadata = {encoding: b"json/plain"}, data = b'"hello"' + // + // Proto encoding layout we're pinning: + // field 1 (metadata map entry): "encoding" -> b"json/plain" + // field 2 (data): b'"hello"' + const payload = defaultPayloadConverter.toPayload('hello'); + const wire = encodePayloadWire(payload); + + const expected = encodeBase64( + new Uint8Array([ + 0x0a, + 0x16, // metadata entry: field 1, wire-type 2, len 22 + 0x0a, + 0x08, // inner key: field 1, len 8 + ...encoder.encode('encoding'), + 0x12, + 0x0a, // inner value: field 2, len 10 + ...encoder.encode('json/plain'), + 0x12, + 0x07, // data: field 2, len 7 + ...encoder.encode('"hello"'), + ]) + ); + t.is(wire, expected); + + const decoded = decodePayloadWire(wire); + t.is(decoder.decode(decoded.metadata!['encoding']!), 'json/plain'); + t.deepEqual(decoded.data, encoder.encode('"hello"')); +}); + +test('binary_payload_round_trips_through_wire', (t) => { + const bytes = new Uint8Array([0x00, 0x01, 0x7f, 0x80, 0xff]); + const payload = defaultPayloadConverter.toPayload(bytes); + const wire = encodePayloadWire(payload); + const decoded = decodePayloadWire(wire); + + t.is(decoder.decode(decoded.metadata!['encoding']!), 'binary/plain'); + t.deepEqual(decoded.data, bytes); +}); + +test('decode_accepts_python_generated_bytes', (t) => { + // Construct a Payload exactly as Python's protobuf runtime would emit + // (metadata = {encoding: "binary/plain"}, data = b"ping") and verify our + // decoder round-trips it. + const data = encoder.encode('ping'); + const pythonLikeProto = new Uint8Array([ + 0x0a, + 0x18, // field 1, len 24 + 0x0a, + 0x08, // key tag, len 8 + ...encoder.encode('encoding'), + 0x12, + 0x0c, // value tag, len 12 + ...encoder.encode('binary/plain'), + 0x12, + 0x04, // data tag, len 4 + ...data, + ]); + const wire = encodeBase64(pythonLikeProto); + const decoded = decodePayloadWire(wire); + t.is(decoder.decode(decoded.metadata!['encoding']!), 'binary/plain'); + t.deepEqual(decoded.data, data); +}); + +test('empty_payload_encodes_to_empty_bytes', (t) => { + // Matches Python: Payload().SerializeToString() returns b"". + const wire = encodePayloadWire({ metadata: {}, data: new Uint8Array(0) }); + t.is(wire, ''); + + const decoded = decodePayloadWire(''); + t.deepEqual(decoded.metadata, {}); + t.is(decoded.data?.length ?? 0, 0); +}); + +test('varint_length_handles_multi_byte_prefix', (t) => { + // Sizes >= 128 require a multi-byte varint length prefix. Verify both + // encode and decode handle the boundary correctly (Python protobuf + // uses the same varint encoding). + const big = new Uint8Array(300).fill(0x42); // 300 bytes, varint = [0xac, 0x02] + const payload = defaultPayloadConverter.toPayload(big); + const wire = encodePayloadWire(payload); + const decoded = decodePayloadWire(wire); + t.deepEqual(decoded.data, big); +}); + +test('multiple_metadata_entries_round_trip', (t) => { + const payload: Payload = { + metadata: { + encoding: encoder.encode('json/plain'), + messageType: encoder.encode('MyType'), + }, + data: encoder.encode('{"x":1}'), + }; + const wire = encodePayloadWire(payload); + const decoded = decodePayloadWire(wire); + t.is(decoder.decode(decoded.metadata!['encoding']!), 'json/plain'); + t.is(decoder.decode(decoded.metadata!['messageType']!), 'MyType'); + t.deepEqual(decoded.data, encoder.encode('{"x":1}')); +}); + +test('base64_helpers_match_standard_encoding', (t) => { + // Standard base64 (with padding) must match what Python's base64.b64encode + // produces — RFC 4648 §4. Spot-check the canonical rfc4648 examples. + t.is(encodeBase64(encoder.encode('')), ''); + t.is(encodeBase64(encoder.encode('f')), 'Zg=='); + t.is(encodeBase64(encoder.encode('fo')), 'Zm8='); + t.is(encodeBase64(encoder.encode('foo')), 'Zm9v'); + t.is(encodeBase64(encoder.encode('foob')), 'Zm9vYg=='); + t.is(encodeBase64(encoder.encode('foobar')), 'Zm9vYmFy'); + + t.deepEqual(decodeBase64(''), new Uint8Array(0)); + t.deepEqual(decodeBase64('Zm9vYmFy'), encoder.encode('foobar')); + t.deepEqual(decodeBase64('Zg=='), encoder.encode('f')); +}); + +test('unknown_fields_are_skipped_on_decode', (t) => { + // Forward compatibility: if Payload proto grows a new field (e.g. + // externalPayloads = 3), our decoder skips it without aborting. The + // Python generated class behaves the same way. + const data = encoder.encode('hi'); + const bytesWithUnknownField = new Uint8Array([ + 0x12, + 0x02, + ...data, + // field 3 (unknown), wire-type 2, len 3 — skipped + 0x1a, + 0x03, + 0xaa, + 0xbb, + 0xcc, + ]); + const decoded = decodePayloadProto(bytesWithUnknownField); + t.deepEqual(decoded.data, data); +}); + +test('encode_produces_canonical_bytes', (t) => { + // Pin a known byte sequence. If encodePayloadProto ever produces + // different bytes for this input, Python consumers break — this test + // catches that before it ships. + const payload: Payload = { + metadata: { encoding: encoder.encode('binary/plain') }, + data: encoder.encode('abc'), + }; + const bytes = encodePayloadProto(payload); + const expected = new Uint8Array([ + 0x0a, + 0x18, // field 1, len 24 + 0x0a, + 0x08, // key tag, len 8 + ...encoder.encode('encoding'), + 0x12, + 0x0c, // value tag, len 12 + ...encoder.encode('binary/plain'), + 0x12, + 0x03, // data tag, len 3 + ...encoder.encode('abc'), + ]); + t.deepEqual(bytes, expected); +}); + +test('round_trip_preserves_all_fields', (t) => { + // Hermetic round-trip: every field survives encode -> decode. + const cases: Payload[] = [ + { metadata: { encoding: encoder.encode('json/plain') }, data: encoder.encode('true') }, + { metadata: { encoding: encoder.encode('binary/null') }, data: new Uint8Array(0) }, + { + metadata: { + encoding: encoder.encode('json/protobuf'), + messageType: encoder.encode('my.pkg.Thing'), + }, + data: new Uint8Array([1, 2, 3, 4, 5, 250, 251, 252, 253, 254, 255]), + }, + ]; + for (const input of cases) { + const wire = encodePayloadWire(input); + const out = decodePayloadWire(wire); + t.is(Object.keys(out.metadata ?? {}).length, Object.keys(input.metadata ?? {}).length); + for (const [k, v] of Object.entries(input.metadata ?? {})) { + t.deepEqual(out.metadata?.[k], v); + } + t.deepEqual(out.data ?? new Uint8Array(0), input.data ?? new Uint8Array(0)); + } +}); diff --git a/contrib/workflow-streams/src/__tests__/test-workflow-streams.ts b/contrib/workflow-streams/src/__tests__/test-workflow-streams.ts new file mode 100644 index 000000000..37285f432 --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/test-workflow-streams.ts @@ -0,0 +1,789 @@ +/** + * E2E integration tests for @temporalio/workflow-streams. + * + * Ported from sdk-python tests/contrib/stream/test_stream.py. + */ + +import { randomUUID } from 'crypto'; +import { ApplicationFailure, defaultPayloadConverter, type Payload } from '@temporalio/common'; +import type { WorkflowHandle } from '@temporalio/client'; +import { WorkflowUpdateFailedError } from '@temporalio/client'; +import { + FlushTimeoutError, + WorkflowStreamClient, + type PollInput, + type PollResult, + type WorkflowStreamItem, + type PublishEntry, + type PublishInput, +} from '../client'; +import { + type WorkflowStreamState, + encodePayloadWire, + workflowStreamOffsetQuery, + workflowStreamPublishSignal, + workflowStreamPollUpdate, +} from '../workflow'; +import { helpers, makeTestFunction } from './helpers-integration'; +import { + activityPublishWorkflow, + basicWorkflowStreamWorkflow, + binaryPublishWorkflow, + continueAsNewHelperWorkflow, + continueAsNewTypedWorkflow, + flushOnExitWorkflow, + getStateWithTtlQuery, + maxBatchWorkflow, + multiTopicWorkflow, + forceFlushWorkflow, + publisherSequencesQuery, + truncateUpdate, + truncateWorkflow, + ttlTestWorkflow, + workflowSidePublishWorkflow, +} from './workflows/workflow-streams'; +import * as streamActivities from './activities/workflow-streams'; + +const test = makeTestFunction({ + workflowsPath: require.resolve('./workflows/workflow-streams'), +}); + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +/** + * Build a `PublishEntry` for a literal string. + * + * Mirrors what `WorkflowStreamClient` produces on the encode path: the default + * payload converter wraps the bytes into a `Payload`, which is then + * proto-serialized and base64-encoded for the wire. + */ +function entry(topic: string, data: string): PublishEntry { + const payload = defaultPayloadConverter.toPayload(encoder.encode(data)); + return { topic, data: encodePayloadWire(payload) }; +} + +/** Extract the raw bytes from a binary/plain `Payload`. */ +function payloadBytes(payload: Payload): Uint8Array { + return payload.data ?? new Uint8Array(0); +} + +function payloadString(payload: Payload): string { + // defaultPayloadConverter maps Uint8Array to encoding=binary/plain (raw + // bytes in `data`) and strings to encoding=json/plain. Handle both so + // tests can mix the two publish paths. + const encoding = payload.metadata?.['encoding']; + if (encoding && decoder.decode(encoding) === 'binary/plain') { + return decoder.decode(payloadBytes(payload)); + } + return defaultPayloadConverter.fromPayload(payload); +} + +async function collectItems( + handle: WorkflowHandle, + topics: string[] | undefined, + fromOffset: number, + expectedCount: number, + timeoutMs = 15_000 +): Promise { + const client = new WorkflowStreamClient(handle); + const items: WorkflowStreamItem[] = []; + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + try { + const gen = client.subscribe(topics, fromOffset, { pollCooldown: 0 }); + for await (const item of gen) { + if (controller.signal.aborted) break; + items.push(item); + if (items.length >= expectedCount) { + await gen.return(); + break; + } + } + } catch (err) { + if (!controller.signal.aborted) throw err; + } finally { + clearTimeout(timer); + } + return items; +} + +test('activity_publish_and_subscribe — activity publishes, client subscribes', async (t) => { + const count = 10; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(activityPublishWorkflow, { args: [count] }); + const items = await collectItems(handle, undefined, 0, count + 1); + t.is(items.length, count + 1); + for (let i = 0; i < count; i++) { + t.is(items[i]!.topic, 'events'); + t.is(payloadString(items[i]!.data), `item-${i}`); + } + t.is(items[count]!.topic, 'status'); + t.is(payloadString(items[count]!.data), 'activity_done'); + await handle.signal('close'); + }); +}); + +test('binary_publish — Uint8Array publishes round-trip as binary/plain', async (t) => { + const count = 5; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(binaryPublishWorkflow, { args: [count] }); + const items = await collectItems(handle, undefined, 0, count); + t.is(items.length, count); + for (let i = 0; i < count; i++) { + const encoding = items[i]!.data.metadata?.['encoding']; + t.is(encoding && decoder.decode(encoding), 'binary/plain'); + t.deepEqual(payloadBytes(items[i]!.data), encoder.encode(`item-${i}`)); + } + await handle.signal('close'); + }); +}); + +test('topic_filtering — subscriber gets only requested topics', async (t) => { + const count = 9; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(multiTopicWorkflow, { args: [count] }); + + const aItems = await collectItems(handle, ['a'], 0, 3); + t.is(aItems.length, 3); + t.true(aItems.every((it) => it.topic === 'a')); + + const acItems = await collectItems(handle, ['a', 'c'], 0, 6); + t.is(acItems.length, 6); + t.true(acItems.every((it) => it.topic === 'a' || it.topic === 'c')); + + const allItems = await collectItems(handle, undefined, 0, 9); + t.is(allItems.length, 9); + + await handle.signal('close'); + }); +}); + +test('subscribe_from_offset_and_per_item_offsets — non-zero starts and global offsets', async (t) => { + const count = 5; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(workflowSidePublishWorkflow, { args: [count] }); + + // From offset 0 — all items, offsets 0..count-1. + const allItems = await collectItems(handle, undefined, 0, count); + t.is(allItems.length, count); + for (let i = 0; i < count; i++) { + t.is(allItems[i]!.offset, i); + t.is(payloadString(allItems[i]!.data), `item-${i}`); + } + + // From offset 3 — items 3, 4 with offsets 3, 4. + const laterItems = await collectItems(handle, undefined, 3, 2); + t.is(laterItems.length, 2); + t.is(laterItems[0]!.offset, 3); + t.is(payloadString(laterItems[0]!.data), 'item-3'); + t.is(laterItems[1]!.offset, 4); + t.is(payloadString(laterItems[1]!.data), 'item-4'); + + await handle.signal('close'); + }); +}); + +test('per_item_offsets_with_topic_filter — offsets are global, not per-topic', async (t) => { + const count = 9; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(multiTopicWorkflow, { args: [count] }); + + const aItems = await collectItems(handle, ['a'], 0, 3); + t.is(aItems[0]!.offset, 0); + t.is(aItems[1]!.offset, 3); + t.is(aItems[2]!.offset, 6); + + const bItems = await collectItems(handle, ['b'], 0, 3); + t.is(bItems[0]!.offset, 1); + t.is(bItems[1]!.offset, 4); + t.is(bItems[2]!.offset, 7); + + await handle.signal('close'); + }); +}); + +test('poll_truncated_offset_returns_application_failure', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const { env } = t.context; + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(truncateWorkflow, { args: [] }); + + const items: PublishEntry[] = []; + for (let i = 0; i < 5; i++) items.push(entry('events', `item-${i}`)); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items, + publisher_id: '', + sequence: 0, + }); + // Truncate via update — completion is explicit. + await handle.executeUpdate(truncateUpdate, { args: [3] }); + + // Poll from offset 1 (truncated) must raise WorkflowUpdateFailedError + // with ApplicationFailure cause of type 'TruncatedOffset'. + const rawHandle = env.client.workflow.getHandle(handle.workflowId); + const err = (await t.throwsAsync( + rawHandle.executeUpdate(workflowStreamPollUpdate, { + args: [{ topics: [], from_offset: 1 }], + }), + { instanceOf: WorkflowUpdateFailedError } + )) as WorkflowUpdateFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is((err.cause as ApplicationFailure).type, 'TruncatedOffset'); + + // Workflow is still usable. + const after = await collectItems(handle, undefined, 3, 2); + t.is(after.length, 2); + t.is(after[0]!.offset, 3); + + await handle.signal('close'); + }); +}); + +test('subscribe_recovers_from_truncation — client auto-restarts from 0', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(truncateWorkflow, { args: [] }); + + const items: PublishEntry[] = []; + for (let i = 0; i < 5; i++) items.push(entry('events', `item-${i}`)); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items, + publisher_id: '', + sequence: 0, + }); + // Truncate via update — explicit completion. + await handle.executeUpdate(truncateUpdate, { args: [3] }); + + // subscribe() from offset 1 (truncated) — client should recover and + // deliver items from baseOffset (3) onward. + const received = await collectItems(handle, undefined, 1, 2); + t.is(received.length, 2); + t.is(received[0]!.offset, 3); + + await handle.signal('close'); + }); +}); + +test('force_flush — forceFlush wakes flusher despite 60s interval', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(forceFlushWorkflow, { args: [] }); + // The activity holds for ~10s after the forceFlush publish; 5s timeout + // gives plenty of margin for scheduling while staying well below the + // hold so a regression (no forceFlush wakeup) surfaces as a missing + // item, not a pass via the dispose-driven flush at activity exit. + const items = await collectItems(handle, undefined, 0, 3, 5_000); + t.is(items.length, 3); + t.is(payloadString(items[2]!.data), 'force-flush'); + await handle.signal('close'); + }); +}); + +test('dispose_flushes_on_exit — await using drains buffer', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const count = 5; + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(flushOnExitWorkflow, { args: [count] }); + const items = await collectItems(handle, undefined, 0, count, 15_000); + t.is(items.length, count); + for (let i = 0; i < count; i++) { + t.is(payloadString(items[i]!.data), `item-${i}`); + } + await handle.signal('close'); + }); +}); + +test('max_batch_size — triggers flush without waiting for timer', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const count = 7; + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(maxBatchWorkflow, { args: [count] }); + const items = await collectItems(handle, undefined, 0, count + 1, 15_000); + t.is(items.length, count + 1); + for (let i = 0; i < count; i++) { + t.is(payloadString(items[i]!.data), `item-${i}`); + } + await handle.signal('close'); + }); +}); + +test('dedup_rejects_duplicate_signal — same publisher+sequence is dropped', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(basicWorkflowStreamWorkflow, { args: [] }); + + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'item-0')], + publisher_id: 'test-pub', + sequence: 1, + }); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'duplicate')], + publisher_id: 'test-pub', + sequence: 1, + }); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'item-1')], + publisher_id: 'test-pub', + sequence: 2, + }); + + // collectItems' update call acts as a barrier — prior signals processed. + const items = await collectItems(handle, undefined, 0, 2); + t.is(items.length, 2); + t.is(payloadString(items[0]!.data), 'item-0'); + t.is(payloadString(items[1]!.data), 'item-1'); + + const offset = await handle.query(workflowStreamOffsetQuery); + t.is(offset, 2); + + await handle.signal('close'); + }); +}); + +test('truncate_stream — truncate discards prefix and adjusts base', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(truncateWorkflow, { args: [] }); + + const items: PublishEntry[] = []; + for (let i = 0; i < 5; i++) items.push(entry('events', `item-${i}`)); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items, + publisher_id: '', + sequence: 0, + }); + + const first = await collectItems(handle, undefined, 0, 5); + t.is(first.length, 5); + + // Truncate via update — returns after the handler completes. + await handle.executeUpdate(truncateUpdate, { args: [3] }); + + // Offset should still be 5 (truncation moves base_offset, not tail). + const offset = await handle.query(workflowStreamOffsetQuery); + t.is(offset, 5); + + const after = await collectItems(handle, undefined, 3, 2); + t.is(after.length, 2); + t.is(payloadString(after[0]!.data), 'item-3'); + t.is(payloadString(after[1]!.data), 'item-4'); + + await handle.signal('close'); + }); +}); + +test('truncate_past_end_raises_application_failure', async (t) => { + // truncate() with an offset past the end of the log surfaces as + // WorkflowUpdateFailedError with cause type 'TruncateOutOfRange'. + // The workflow task must not be poisoned: a follow-up poll still works. + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(truncateWorkflow, { args: [] }); + + const items: PublishEntry[] = []; + for (let i = 0; i < 2; i++) items.push(entry('events', `item-${i}`)); + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items, + publisher_id: '', + sequence: 0, + }); + + // Only 2 items exist; asking to truncate to offset 5 is out of range. + const err = (await t.throwsAsync(handle.executeUpdate(truncateUpdate, { args: [5] }), { + instanceOf: WorkflowUpdateFailedError, + })) as WorkflowUpdateFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is((err.cause as ApplicationFailure).type, 'TruncateOutOfRange'); + + // Workflow task wasn't poisoned — a valid poll still completes. + const after = await collectItems(handle, undefined, 0, 2); + t.is(after.length, 2); + + await handle.signal('close'); + }); +}); + +test('explicit_flush_barrier — flush() returns once items are confirmed', async (t) => { + // flush() is a synchronization point. With a 60s batchInterval, a + // regression that silently relies on the background timer would hang + // (and miss the per-test timeout) rather than slow-pass. + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(basicWorkflowStreamWorkflow, { args: [] }); + + const stream = new WorkflowStreamClient(handle, { batchInterval: '60 seconds' }); + + // 1. Empty-buffer flush is a no-op (must not block). + t.is(await stream.getOffset(), 0); + await stream.flush(); + t.is(await stream.getOffset(), 0); + + // 2. Flush makes prior publishes visible without waiting on the + // 60s batch timer. + const events = stream.topic('events'); + events.publish(encoder.encode('a')); + events.publish(encoder.encode('b')); + events.publish(encoder.encode('c')); + await stream.flush(); + t.is(await stream.getOffset(), 3); + + // 3. Second flush with no new items is a no-op. + await stream.flush(); + t.is(await stream.getOffset(), 3); + + await handle.signal('close'); + }); +}); + +test('subscribe_accepts_string_topic — single-string convenience', async (t) => { + // subscribe(topics='a') is equivalent to subscribe(topics=['a']). + const count = 9; + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ activities: streamActivities }); + await worker.runUntil(async () => { + const handle = await startWorkflow(multiTopicWorkflow, { args: [count] }); + + const client = new WorkflowStreamClient(handle); + const items: WorkflowStreamItem[] = []; + const gen = client.subscribe('a', 0, { pollCooldown: 0 }); + for await (const item of gen) { + items.push(item); + if (items.length >= 3) { + await gen.return(); + break; + } + } + t.is(items.length, 3); + t.true(items.every((it) => it.topic === 'a')); + + await handle.signal('close'); + }); +}); + +test('ttl_pruning_in_get_state — old publisher pruned, new publisher kept', async (t) => { + // pub-old arrives first, then wall-clock gap, then pub-new. TTL=0.5s + // prunes pub-old (~1s old) but keeps pub-new (~0s). + // + // The gap is generous relative to TTL (1.0s / 0.5s) so the test + // tolerates multi-hundred-ms scheduling jitter in both directions. + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(ttlTestWorkflow, { args: [] }); + + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'old')], + publisher_id: 'pub-old', + sequence: 1, + }); + + // Sanity: pub-old is recorded (generous TTL retains it). + // Generous TTL: 9999 seconds, expressed in ms. + const before = await handle.query(getStateWithTtlQuery, 9999_000); + t.true('pub-old' in before.publisher_sequences); + + // Wall-clock gap so workflow.time() advances between the two signals. + await new Promise((r) => setTimeout(r, 1000)); + + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'new')], + publisher_id: 'pub-new', + sequence: 1, + }); + + // 500 ms TTL: pub-old (~1s old) is pruned, pub-new (~0s old) is kept. + const state = await handle.query(getStateWithTtlQuery, 500); + t.false('pub-old' in state.publisher_sequences); + t.true('pub-new' in state.publisher_sequences); + t.is(state.log.length, 2); + + await handle.signal('close'); + }); +}); + +test('continue_as_new_typed — log, offsets, AND dedup state survive CAN', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const { env } = t.context; + const worker = await createWorker(); + await worker.runUntil(async () => { + const workflowId = `stream-can-${randomUUID()}`; + const handle = await startWorkflow(continueAsNewTypedWorkflow, { + args: [{}], + workflowId, + }); + + // Seed publisher dedup state (pub / sequence=1) so we can verify it + // survives CAN. + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'item-0'), entry('events', 'item-1'), entry('events', 'item-2')], + publisher_id: 'pub', + sequence: 1, + }); + + const before = await collectItems(handle, undefined, 0, 3); + t.is(before.length, 3); + + await handle.signal('triggerContinue'); + + // Wait for CAN (new run-id on a fresh handle). + const deadline = Date.now() + 10_000; + let newRunId: string | undefined; + while (Date.now() < deadline) { + const fresh = env.client.workflow.getHandle(workflowId); + const desc = await fresh.describe(); + if (desc.runId !== handle.firstExecutionRunId) { + newRunId = desc.runId; + break; + } + await new Promise((r) => setTimeout(r, 200)); + } + t.truthy(newRunId, 'CAN should produce a new run id'); + + const newHandle = env.client.workflow.getHandle(workflowId); + + // Log contents and offsets preserved across CAN. + const afterItems = await collectItems(newHandle, undefined, 0, 3); + t.deepEqual( + afterItems.map((i) => payloadString(i.data)), + ['item-0', 'item-1', 'item-2'] + ); + t.deepEqual( + afterItems.map((i) => i.offset), + [0, 1, 2] + ); + + // Dedup state preserved: publisher_sequences carries {pub: 1} after CAN. + const seqsAfterCan = await newHandle.query>(publisherSequencesQuery); + t.deepEqual(seqsAfterCan, { pub: 1 }); + + // Re-sending publisher_id='pub', sequence=1 must be rejected — log and + // publisher_sequences unchanged. + await newHandle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'dup')], + publisher_id: 'pub', + sequence: 1, + }); + const seqsAfterDup = await newHandle.query>(publisherSequencesQuery); + t.deepEqual(seqsAfterDup, { pub: 1 }); + + // Fresh sequence from same publisher accepted; item-3 lands at offset 3. + await newHandle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'item-3')], + publisher_id: 'pub', + sequence: 2, + }); + const seqsAfterAccept = await newHandle.query>(publisherSequencesQuery); + t.deepEqual(seqsAfterAccept, { pub: 2 }); + + const finalItems = await collectItems(newHandle, undefined, 0, 4); + t.deepEqual( + finalItems.map((i) => payloadString(i.data)), + ['item-0', 'item-1', 'item-2', 'item-3'] + ); + t.is(finalItems[3]!.offset, 3); + + await newHandle.signal('close'); + }); +}); + +test('continue_as_new_helper — log and offsets survive CAN via WorkflowStream.continueAsNew', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const { env } = t.context; + const worker = await createWorker(); + await worker.runUntil(async () => { + const workflowId = `stream-can-helper-${randomUUID()}`; + const handle = await startWorkflow(continueAsNewHelperWorkflow, { + args: [{}], + workflowId, + }); + + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [entry('events', 'item-0'), entry('events', 'item-1')], + publisher_id: 'pub', + sequence: 1, + }); + + const before = await collectItems(handle, undefined, 0, 2); + t.is(before.length, 2); + + await handle.signal('triggerContinue'); + + const deadline = Date.now() + 10_000; + let newRunId: string | undefined; + while (Date.now() < deadline) { + const fresh = env.client.workflow.getHandle(workflowId); + const desc = await fresh.describe(); + if (desc.runId !== handle.firstExecutionRunId) { + newRunId = desc.runId; + break; + } + await new Promise((r) => setTimeout(r, 200)); + } + t.truthy(newRunId, 'CAN should produce a new run id'); + + const newHandle = env.client.workflow.getHandle(workflowId); + const afterItems = await collectItems(newHandle, undefined, 0, 2); + t.deepEqual( + afterItems.map((i) => payloadString(i.data)), + ['item-0', 'item-1'] + ); + t.deepEqual( + afterItems.map((i) => i.offset), + [0, 1] + ); + + await newHandle.signal('close'); + }); +}); + +test('poll_more_ready_when_response_exceeds_size_limit — 1MB cap', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const { env } = t.context; + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(basicWorkflowStreamWorkflow, { args: [] }); + + const chunk = new Uint8Array(200_000).fill('x'.charCodeAt(0)); + const chunkPayload = defaultPayloadConverter.toPayload(chunk); + for (let i = 0; i < 8; i++) { + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [{ topic: 'big', data: encodePayloadWire(chunkPayload) }], + publisher_id: '', + sequence: 0, + }); + } + + // The update acts as a barrier for all prior publish signals. + const rawHandle = env.client.workflow.getHandle(handle.workflowId); + const first = await rawHandle.executeUpdate(workflowStreamPollUpdate, { + args: [{ topics: [], from_offset: 0 }], + }); + t.is(first.more_ready, true); + t.true(first.items.length < 8); + t.true(first.next_offset < 8); + + // Drain the rest. + let gathered = first.items.length; + let offset = first.next_offset; + let last: PollResult = first; + while (gathered < 8) { + last = await rawHandle.executeUpdate(workflowStreamPollUpdate, { + args: [{ topics: [], from_offset: offset }], + }); + gathered += last.items.length; + offset = last.next_offset; + } + t.is(gathered, 8); + // The final poll that drained the log should set more_ready=false. + t.is(last.more_ready, false); + + await handle.signal('close'); + }); +}); + +test('subscribe_iterates_through_more_ready — caller sees all items', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(basicWorkflowStreamWorkflow, { args: [] }); + const chunk = new Uint8Array(200_000).fill('x'.charCodeAt(0)); + const chunkPayload = defaultPayloadConverter.toPayload(chunk); + for (let i = 0; i < 8; i++) { + await handle.signal<[PublishInput]>(workflowStreamPublishSignal, { + items: [{ topic: 'big', data: encodePayloadWire(chunkPayload) }], + publisher_id: '', + sequence: 0, + }); + } + const items = await collectItems(handle, undefined, 0, 8, 15_000); + t.is(items.length, 8); + for (const item of items) { + t.is(payloadBytes(item.data).length, chunk.length); + } + await handle.signal('close'); + }); +}); + +test('flush_retry_preserves_items_after_failures — behavioral retry coverage', async (t) => { + // Inject signal failures on the handle so the client exercises its retry + // path. Then let delivery succeed and verify items arrive in publish + // order, exactly once — no drops, no duplicates, no reorderings. + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(basicWorkflowStreamWorkflow, { args: [] }); + + const stream = new WorkflowStreamClient(handle); + const realSignal = handle.signal.bind(handle); + let failRemaining = 2; + (handle as unknown as { signal: typeof handle.signal }).signal = (async (...args: unknown[]) => { + if (failRemaining > 0) { + failRemaining -= 1; + throw new Error('simulated delivery failure'); + } + return realSignal(...(args as Parameters)); + }) as typeof handle.signal; + + const events = stream.topic('events'); + events.publish(encoder.encode('item-0')); + events.publish(encoder.encode('item-1')); + await t.throwsAsync((stream as unknown as { _doFlush(): Promise })._doFlush(), { + message: /simulated/, + }); + + // Publish more during the failed state — must not overtake the pending + // retry on eventual delivery. + events.publish(encoder.encode('item-2')); + await t.throwsAsync((stream as unknown as { _doFlush(): Promise })._doFlush(), { + message: /simulated/, + }); + + // Third flush delivers the pending retry batch. + await (stream as unknown as { _doFlush(): Promise })._doFlush(); + // Fourth flush delivers the buffered 'item-2'. + await (stream as unknown as { _doFlush(): Promise })._doFlush(); + + const items = await collectItems(handle, undefined, 0, 3); + t.deepEqual( + items.map((i) => payloadString(i.data)), + ['item-0', 'item-1', 'item-2'] + ); + + await handle.signal('close'); + }); +}); + +test('flush_raises_after_max_retry_duration — timeout surfaces, client resumes', async (t) => { + // When the retry window expires, stop() must rethrow FlushTimeoutError; + // the client stays usable and subsequent publishes succeed. + const { env } = t.context; + const bogus = env.client.workflow.getHandle(`no-such-workflow-${randomUUID()}`); + const client = new WorkflowStreamClient(bogus, { + batchInterval: '100 milliseconds', + maxRetryDuration: '200 milliseconds', + }); + client.topic('events').publish(encoder.encode('will-be-lost')); + await new Promise((r) => setTimeout(r, 1500)); + await t.throwsAsync(client[Symbol.asyncDispose](), { instanceOf: FlushTimeoutError }); +}); diff --git a/contrib/workflow-streams/src/__tests__/workflows/workflow-streams.ts b/contrib/workflow-streams/src/__tests__/workflows/workflow-streams.ts new file mode 100644 index 000000000..4aabe81c1 --- /dev/null +++ b/contrib/workflow-streams/src/__tests__/workflows/workflow-streams.ts @@ -0,0 +1,194 @@ +/** + * Test workflows for @temporalio/workflow-streams. + */ + +import { + condition, + continueAsNew, + defineQuery, + defineSignal, + defineUpdate, + proxyActivities, + setHandler, +} from '@temporalio/workflow'; +import { WorkflowStream, type WorkflowStreamState } from '../../workflow'; +import type * as activities from '../activities/workflow-streams'; + +const { + publishItems, + publishBinaryItems, + publishMultiTopic, + publishWithForceFlush, + publishBatchTest, + publishWithMaxBatch, +} = proxyActivities({ + startToCloseTimeout: '30 seconds', + heartbeatTimeout: '10 seconds', +}); + +export const closeSignal = defineSignal('close'); +export const triggerContinueSignal = defineSignal('triggerContinue'); +export const truncateUpdate = defineUpdate('truncate'); +export const getStateWithTtlQuery = defineQuery('getStateWithTtl'); +export const publisherSequencesQuery = defineQuery>('publisherSequences'); + +/** A minimal stream-host workflow — initializes WorkflowStream and waits for close. */ +export async function basicWorkflowStreamWorkflow(): Promise { + new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await condition(() => closed); +} + +/** Publishes `count` items directly from the workflow, then waits. */ +export async function workflowSidePublishWorkflow(count: number): Promise { + const stream = new WorkflowStream(); + const events = stream.topic('events'); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + for (let i = 0; i < count; i++) { + events.publish(`item-${i}`); + } + await condition(() => closed); +} + +/** Executes publishMultiTopic activity then waits. */ +export async function multiTopicWorkflow(count: number): Promise { + new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishMultiTopic(count); + await condition(() => closed); +} + +/** Executes publishItems activity then appends activity_done status. */ +export async function activityPublishWorkflow(count: number): Promise { + const stream = new WorkflowStream(); + const status = stream.topic('status'); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishItems(count); + status.publish('activity_done'); + await condition(() => closed); +} + +/** Executes publishBinaryItems activity then waits — exercises the Uint8Array publish path. */ +export async function binaryPublishWorkflow(count: number): Promise { + new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishBinaryItems(count); + await condition(() => closed); +} + +/** Workflow that accepts a truncate update (explicit completion). */ +export async function truncateWorkflow(): Promise { + const stream = new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + setHandler(truncateUpdate, (upToOffset: number) => { + stream.truncate(upToOffset); + }); + await condition(() => closed); +} + +/** Workflow that exposes getState via query for TTL testing. */ +export async function ttlTestWorkflow(): Promise { + const stream = new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + setHandler(getStateWithTtlQuery, (ttl: number) => stream.getState(ttl)); + await condition(() => closed); +} + +/** Workflow that runs publishWithForceFlush activity. */ +export async function forceFlushWorkflow(): Promise { + new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishWithForceFlush(); + await condition(() => closed); +} + +/** Workflow that runs publishBatchTest activity. */ +export async function flushOnExitWorkflow(count: number): Promise { + new WorkflowStream(); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishBatchTest(count); + await condition(() => closed); +} + +/** Workflow that runs publishWithMaxBatch activity. */ +export async function maxBatchWorkflow(count: number): Promise { + const stream = new WorkflowStream(); + const status = stream.topic('status'); + let closed = false; + setHandler(closeSignal, () => { + closed = true; + }); + await publishWithMaxBatch(count); + status.publish('activity_done'); + await condition(() => closed); +} + +/** Typed input for the continue-as-new workflow. */ +export interface CANWorkflowInput { + streamState?: WorkflowStreamState; +} + +/** CAN workflow using properly-typed streamState (explicit recipe). */ +export async function continueAsNewTypedWorkflow(input: CANWorkflowInput): Promise { + const stream = new WorkflowStream(input.streamState); + let closed = false; + let shouldContinue = false; + setHandler(closeSignal, () => { + closed = true; + }); + setHandler(triggerContinueSignal, () => { + shouldContinue = true; + }); + // Expose publisher_sequences for CAN dedup-survival test. Use a very + // large TTL so we read the current state without pruning. + setHandler(publisherSequencesQuery, () => stream.getState(Number.MAX_SAFE_INTEGER).publisher_sequences); + await condition(() => shouldContinue || closed); + if (closed) return; + stream.detachPollers(); + await continueAsNew({ + streamState: stream.getState(), + }); +} + +/** CAN workflow that uses the packaged `WorkflowStream.continueAsNew` helper. */ +export async function continueAsNewHelperWorkflow(input: CANWorkflowInput): Promise { + const stream = new WorkflowStream(input.streamState); + let closed = false; + let shouldContinue = false; + setHandler(closeSignal, () => { + closed = true; + }); + setHandler(triggerContinueSignal, () => { + shouldContinue = true; + }); + await condition(() => shouldContinue || closed); + if (closed) return; + await stream.continueAsNew((state) => [{ streamState: state }]); +} diff --git a/contrib/workflow-streams/src/client.ts b/contrib/workflow-streams/src/client.ts new file mode 100644 index 000000000..1f290d847 --- /dev/null +++ b/contrib/workflow-streams/src/client.ts @@ -0,0 +1,557 @@ +/** + * External-side workflow stream client. + * + * Used by activities, starters, and any code with a workflow handle to publish + * messages and subscribe to topics on a workflow stream workflow. + * + * Each published value is turned into a `Payload` via the client's payload + * converter. The codec chain (encryption, PII-redaction, compression) is + * NOT run per item — it runs once on the signal/update envelope that + * carries each batch. Running the codec per item would double-encrypt + * because the envelope path already covers the items. The per-item + * `Payload` still carries encoding metadata so consumers can decode + * with a payload converter. + */ + +import { randomUUID } from 'crypto'; +import { Context as ActivityContext } from '@temporalio/activity'; +import type { Client, WorkflowHandle } from '@temporalio/client'; +import { WorkflowUpdateFailedError, WorkflowUpdateRPCTimeoutOrCancelledError } from '@temporalio/client'; +import { + ApplicationFailure, + defaultPayloadConverter, + WorkflowNotFoundError, + type Payload, + type PayloadConverter, +} from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; +import { msToNumber } from '@temporalio/common/lib/time'; +import { decodePayloadWire, encodePayloadWire } from './codec'; +import type { PollInput, PollResult, WorkflowStreamItem, PublishEntry, PublishInput } from './types'; +import { TopicHandle } from './topic-handle'; + +/** Thrown when a flush retry exceeds maxRetryDuration. */ +export class FlushTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = 'FlushTimeoutError'; + } +} + +export interface WorkflowStreamClientOptions { + /** Interval between automatic flushes. Default: 2 seconds. */ + batchInterval?: Duration; + /** Auto-flush when buffer reaches this size. */ + maxBatchSize?: number; + /** + * Maximum time to retry a failed flush before throwing. Must be less + * than the workflow's `publisherTtl` (default 15 minutes) to preserve + * exactly-once delivery. Default: 10 minutes. + */ + maxRetryDuration?: Duration; +} + +export interface SubscribeOptions { + /** + * Minimum interval between polls to avoid overwhelming the workflow + * when items arrive faster than the poll round-trip. Default: 100ms. + */ + pollCooldown?: Duration; +} + +/** + * A resolvable event: multiple callers `await wait()` for the same promise, + * `set()` resolves it once, and `clear()` re-arms it for the next cycle. + */ +class ResolvableEvent { + private resolver: (() => void) | null = null; + private promise: Promise; + + constructor() { + this.promise = new Promise((resolve) => { + this.resolver = resolve; + }); + } + + wait(): Promise { + return this.promise; + } + + set(): void { + if (this.resolver) { + const r = this.resolver; + this.resolver = null; + r(); + } + } + + clear(): void { + if (this.resolver !== null) return; // already armed + this.promise = new Promise((resolve) => { + this.resolver = resolve; + }); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** Type guard for a Payload (as opposed to an arbitrary user value). */ +function isPayload(value: unknown): value is Payload { + if (value === null || typeof value !== 'object') return false; + const v = value as { metadata?: unknown; data?: unknown }; + // Metadata must exist (at minimum with an encoding key) to be a Payload. + return ( + 'metadata' in v && + v.metadata != null && + typeof v.metadata === 'object' && + !Array.isArray(v.metadata) && + // Distinguish from plain `{ metadata, data }` user objects by requiring + // that values in the metadata map look like Uint8Array instances. + Object.values(v.metadata).every((x) => x instanceof Uint8Array) + ); +} + +export class WorkflowStreamClient { + private handle: WorkflowHandle; + private client: Client | undefined; + private readonly workflowId: string; + private readonly batchIntervalMs: number; + private readonly maxBatchSize: number | undefined; + private readonly maxRetryDurationMs: number; + private readonly payloadConverter: PayloadConverter; + private buffer: Array<{ topic: string; value: unknown }> = []; + private pending: PublishEntry[] | null = null; + private pendingSeq = 0; + private pendingStartedAt: number | null = null; + private publisherId: string = randomUUID().replace(/-/g, '').slice(0, 16); + private sequence = 0; + + private readonly flushEvent = new ResolvableEvent(); + private flusherTask: Promise | undefined; + private flusherStopped = false; + private flusherError: Error | undefined; + private currentFlush: Promise | null = null; + private readonly topicHandles = new Map>(); + + constructor(handle: WorkflowHandle, options?: WorkflowStreamClientOptions) { + this.handle = handle; + this.workflowId = handle.workflowId; + this.batchIntervalMs = msToNumber(options?.batchInterval ?? '2 seconds'); + this.maxBatchSize = options?.maxBatchSize; + this.maxRetryDurationMs = msToNumber(options?.maxRetryDuration ?? '10 minutes'); + this.payloadConverter = defaultPayloadConverter; + } + + /** + * Create a WorkflowStreamClient from an explicit Temporal client and workflow ID. + * + * Use this when the caller has an explicit `Client` and `workflowId` in + * hand (starters, BFFs, other workflows' activities). For code running + * inside an activity that targets its own parent workflow, use + * {@link WorkflowStreamClient.fromWithinActivity}. + * + * A client created through this method follows continue-as-new chains in + * `subscribe()` and uses the client's payload converter for per-item + * `Payload` construction. + */ + static create(client: Client, workflowId: string, options?: WorkflowStreamClientOptions): WorkflowStreamClient { + const handle = client.workflow.getHandle(workflowId); + const instance = new WorkflowStreamClient(handle, options); + instance.client = client; + // Prefer the Client's configured converter so custom converters flow + // through; fall back to the default if unset. + const clientConverter = client.options?.loadedDataConverter?.payloadConverter; + if (clientConverter) { + (instance as unknown as { payloadConverter: PayloadConverter }).payloadConverter = clientConverter; + } + return instance; + } + + /** + * Create a WorkflowStreamClient targeting the current activity's parent workflow. + * + * Must be called from within an activity. The Temporal client and + * parent workflow id are taken from the activity context. + */ + static fromWithinActivity(options?: WorkflowStreamClientOptions): WorkflowStreamClient { + const ctx = ActivityContext.current(); + const workflowExecution = ctx.info.workflowExecution; + if (workflowExecution === undefined) { + throw new Error( + 'fromWithinActivity requires an activity scheduled by a workflow; this ' + + 'activity has no parent workflow. From a standalone activity, use ' + + 'WorkflowStreamClient.create(client, workflowId) with the target ' + + 'workflow id passed in explicitly.' + ); + } + return WorkflowStreamClient.create(ctx.client, workflowExecution.workflowId, options); + } + + /** + * Flush buffered (and pending) items and wait for server confirmation. + * + * Returns once the items buffered at call time have been signaled to + * the workflow and acknowledged by the server. Returns immediately if + * there is nothing to send. + * + * In addition to the declarative `forceFlush=true` on {@link publish} + * and to the automatic flush on {@link [Symbol.asyncDispose]}, use + * this when the caller needs proof that prior publications reached + * the server at a moment that does not naturally correspond to a + * specific event. + * + * Safe to call concurrently with `publish()` and with the background + * flusher: the in-flight serialization on `flushOnce` makes signal + * sends sequential. Items added concurrently after entry may + * piggyback on this flush or be deferred to a subsequent one. + * + * Throws {@link FlushTimeoutError} if a pending batch cannot be sent + * within `maxRetryDuration`. Also surfaces any deferred timeout from a + * prior background flusher failure: without that, `flush()` could + * return success against an empty buffer while an earlier batch had + * already been dropped, hiding data loss. + */ + async flush(): Promise { + this.throwPendingFlusherError(); + // Snapshot the sequence number that the items present at entry will + // commit at. A concurrent producer that calls publish() during the + // awaits below adds to the buffer at a later sequence — those items + // belong to a future flush and must not extend this barrier. + if (this.pending === null && this.buffer.length === 0) { + return; + } + const baseSeq = this.pending !== null ? this.pendingSeq : this.sequence; + const targetSeq = this.buffer.length > 0 ? baseSeq + 1 : baseSeq; + // `sequence` only advances on a successful send, so reaching + // `targetSeq` proves the entry-time items were confirmed. A later + // batch (queued by a concurrent publisher and picked up by the + // background flusher) may leave `pending` non-null afterward — we + // do not wait on it. + while (this.sequence < targetSeq) { + await this.flushOnce(); + } + this.throwPendingFlusherError(); + } + + private throwPendingFlusherError(): void { + if (this.flusherError) { + const err = this.flusherError; + this.flusherError = undefined; + throw err; + } + } + + /** + * Dispose pattern: stop the flusher and drain remaining items. + * + * Use via `await using client = WorkflowStreamClient.create(...)` so the + * scope exit guarantees a final drain. For tests or call sites that + * cannot use `await using`, invoke this method directly: + * `await client[Symbol.asyncDispose]()`. + */ + async [Symbol.asyncDispose](): Promise { + if (!this.flusherTask) { + // Lazy-start path was never triggered (no publish, or only flush() + // was used). A single flushOnce() drains anything left in the buffer. + await this.flushOnce(); + this.throwPendingFlusherError(); + return; + } + this.flusherStopped = true; + this.flushEvent.set(); + try { + await this.flusherTask; + } finally { + this.flusherTask = undefined; + } + // Final drain after the flusher exits. Repeat while either pending OR the + // producer buffer has items: a single flushOnce() processes either + // `pending` OR the buffer, not both. + while (this.pending !== null || this.buffer.length > 0) { + await this.flushOnce(); + } + this.throwPendingFlusherError(); + } + + /** + * Get a typed handle for publishing to and subscribing from ``name``. + * + * Repeated calls with the same name return the same handle instance. + * The type parameter ``T`` is purely a compile-time annotation — see + * the module note in {@link TopicHandle} for the difference from + * sdk-python's runtime type-uniformity check. + */ + topic(name: string): TopicHandle { + let handle = this.topicHandles.get(name); + if (handle === undefined) { + handle = new TopicHandle(this, name, (topic, value, forceFlush) => + this.publishToTopic(topic, value, forceFlush) + ); + this.topicHandles.set(name, handle as TopicHandle); + } + return handle as TopicHandle; + } + + private publishToTopic(topic: string, value: unknown, forceFlush: boolean): void { + // Lazy-start the background flusher on first publish. Skipped if dispose + // already ran, so a publish-after-dispose surfaces as a buffered item that + // never flushes (which the next dispose would catch) rather than silently + // resurrecting the flusher. + if (this.flusherTask === undefined && !this.flusherStopped) { + this.flusherTask = this.runFlusher(); + } + this.buffer.push({ topic, value }); + if (forceFlush || (this.maxBatchSize !== undefined && this.buffer.length >= this.maxBatchSize)) { + this.flushEvent.set(); + } + } + + private encodeBuffer(entries: Array<{ topic: string; value: unknown }>): PublishEntry[] { + const out: PublishEntry[] = new Array(entries.length); + for (let i = 0; i < entries.length; i++) { + const { topic, value } = entries[i]!; + const payload: Payload = isPayload(value) ? value : this.payloadConverter.toPayload(value); + out[i] = { topic, data: encodePayloadWire(payload) }; + } + return out; + } + + private async runFlusher(): Promise { + while (!this.flusherStopped) { + await Promise.race([this.flushEvent.wait(), sleep(this.batchIntervalMs)]); + this.flushEvent.clear(); + if (this.flusherStopped) break; + try { + await this.flushOnce(); + } catch (err) { + if (err instanceof FlushTimeoutError) { + // Pending batch was dropped and can't be recovered. Stash the + // error and stop the loop; stop() will surface it so data loss + // is never silent. + this.flusherError = err; + break; + } + // Transient failures (network, signal rejection) leave `pending` + // set so the next tick retries with the same sequence. + } + } + } + + /** Serialize concurrent flush calls through a single in-flight promise. */ + private async flushOnce(): Promise { + while (this.currentFlush) { + await this.currentFlush; + } + const p = this._doFlush(); + this.currentFlush = p; + try { + await p; + } finally { + if (this.currentFlush === p) this.currentFlush = null; + } + } + + /** + * Send pending or buffered messages to the workflow via signal. + * + * On failure, the pending batch and sequence are kept for retry. + * Only advances the confirmed sequence on success. + */ + private async _doFlush(): Promise { + let batch: PublishEntry[]; + let seq: number; + + if (this.pending !== null) { + // Retry path: check max_retry_duration + if (this.pendingStartedAt !== null && Date.now() - this.pendingStartedAt > this.maxRetryDurationMs) { + // Advance confirmed sequence so the next batch gets a fresh sequence + // number. Without this, the next batch reuses pendingSeq, which the + // workflow may have already accepted — causing silent dedup (data + // loss). See `retry_timeout_sequence_reuse_causes_data_loss` test. + this.sequence = this.pendingSeq; + this.pending = null; + this.pendingSeq = 0; + this.pendingStartedAt = null; + throw new FlushTimeoutError( + `Flush retry exceeded maxRetryDuration (${this.maxRetryDurationMs}ms). ` + + 'Pending batch dropped. If the signal was delivered, items are in the log. ' + + 'If not, they are lost.' + ); + } + batch = this.pending; + seq = this.pendingSeq; + } else if (this.buffer.length > 0) { + // New batch path: encode at flush time so the payload converter is + // applied once per item. + const raw = this.buffer; + this.buffer = []; + batch = this.encodeBuffer(raw); + seq = this.sequence + 1; + this.pending = batch; + this.pendingSeq = seq; + this.pendingStartedAt = Date.now(); + } else { + return; + } + + // On failure, the signal throws and pending stays set for retry. + // On success, advance confirmed sequence and clear pending. + await this.handle.signal<[PublishInput]>('__temporal_workflow_stream_publish', { + items: batch, + publisher_id: this.publisherId, + sequence: seq, + }); + this.sequence = seq; + this.pending = null; + this.pendingSeq = 0; + this.pendingStartedAt = null; + } + + /** + * Async generator that polls for new items. + * + * Default — yields items with `data: Payload` (no decode). Use a payload + * converter such as `defaultPayloadConverter.fromPayload(item.data)` + * to decode at the call site. + * + * With `resultType: true` — each item is decoded via the client's + * configured payload converter and yielded as the generic `T`. + * + * Automatically follows continue-as-new chains when created via + * {@link WorkflowStreamClient.create}. + * + * @param topics - Topic filter. A single topic name, an array of topic + * names, or undefined. Undefined or an empty array means all topics. + */ + subscribe( + topics?: string | string[], + fromOffset?: number, + options?: SubscribeOptions + ): AsyncGenerator, void, unknown>; + subscribe( + topics: string | string[] | undefined, + fromOffset: number, + options: SubscribeOptions & { resultType: true } + ): AsyncGenerator, void, unknown>; + async *subscribe( + topics?: string | string[], + fromOffset = 0, + options?: SubscribeOptions & { resultType?: boolean } + ): AsyncGenerator, void, unknown> { + const pollCooldownMs = msToNumber(options?.pollCooldown ?? '100 milliseconds'); + const topicFilter: string[] = topics === undefined ? [] : typeof topics === 'string' ? [topics] : topics; + let offset = fromOffset; + + while (true) { + let result: PollResult; + try { + result = await this.handle.executeUpdate('__temporal_workflow_stream_poll', { + args: [{ topics: topicFilter, from_offset: offset }], + }); + } catch (err) { + if (err instanceof WorkflowUpdateFailedError) { + const cause = err.cause; + if (cause instanceof ApplicationFailure && cause.type === 'TruncatedOffset') { + // Subscriber fell behind truncation. Retry from offset 0 which + // the mixin treats as "from the beginning of whatever exists" + // (i.e., from baseOffset). + offset = 0; + continue; + } + if (cause instanceof ApplicationFailure && cause.type === 'AcceptedUpdateCompletedWorkflow') { + // Workflow returned (or continued-as-new) before this poll's + // update completed. Either follow the chain or exit cleanly. + if (await this.followContinueAsNew()) { + continue; + } + return; + } + throw err; + } + if (err instanceof WorkflowUpdateRPCTimeoutOrCancelledError) { + if (await this.followContinueAsNew()) { + continue; + } + return; + } + if (err instanceof WorkflowNotFoundError) { + // Workflow may have completed between polls. subscribe() exits + // cleanly on terminal status so callers don't have to wrap the + // iterator in error handling for the normal end-of-stream case. + if (await this.followContinueAsNew()) { + continue; + } + if (await this.isInTerminalState()) { + return; + } + throw err; + } + throw err; + } + + const decode = options?.resultType === true; + for (const wireItem of result.items) { + const payload = decodePayloadWire(wireItem.data); + yield { + topic: wireItem.topic, + data: decode ? this.payloadConverter.fromPayload(payload) : (payload as unknown as T), + offset: wireItem.offset, + }; + } + offset = result.next_offset; + + if (!result.more_ready && pollCooldownMs > 0) { + await sleep(pollCooldownMs); + } + } + } + + /** Query the current global offset. */ + async getOffset(): Promise { + return this.handle.query('__temporal_workflow_stream_offset'); + } + + /** + * Check if the workflow continued-as-new and re-target the handle. + * Returns true if the handle was updated (caller should retry). + */ + private async followContinueAsNew(): Promise { + if (!this.client) return false; + try { + const desc = await this.handle.describe(); + if (desc.status.name === 'CONTINUED_AS_NEW') { + this.handle = this.client.workflow.getHandle(this.workflowId); + return true; + } + } catch { + return false; + } + return false; + } + + /** + * Return true if the workflow has reached a terminal state. Used by + * `subscribe()` to distinguish "workflow finished — stream is done" from + * "wrong workflow id" when a poll surfaces `WorkflowNotFoundError`. + */ + private async isInTerminalState(): Promise { + try { + const desc = await this.handle.describe(); + const name = desc.status.name; + return ( + name === 'COMPLETED' || + name === 'FAILED' || + name === 'CANCELLED' || + name === 'TERMINATED' || + name === 'TIMED_OUT' + ); + } catch { + return false; + } + } +} + +export { TopicHandle } from './topic-handle'; +export type { WorkflowStreamItem, PublishEntry, PublishInput, PollInput, PollResult } from './types'; diff --git a/contrib/workflow-streams/src/codec.ts b/contrib/workflow-streams/src/codec.ts new file mode 100644 index 000000000..94b38d45b --- /dev/null +++ b/contrib/workflow-streams/src/codec.ts @@ -0,0 +1,195 @@ +/** + * Wire codec for workflow stream payloads. + * + * Two layers: + * + * 1. Base64 (no `Buffer` dependency, for workflow sandbox compat). + * 2. Hand-rolled protobuf encoder/decoder for `temporal.api.common.v1.Payload`. + * Avoids pulling the protobufjs runtime into the workflow sandbox. The + * schema is a fixed public API — the manual encoder cannot silently go + * out of sync with server-side expectations. + * + * Payload schema: + * message Payload { + * map metadata = 1; + * bytes data = 2; + * } + */ + +import type { Payload } from '@temporalio/common'; + +// --------------------------------------------------------------------------- +// Base64 helpers +// --------------------------------------------------------------------------- + +const B64 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'; + +/** Encode bytes to standard base64 (padded). */ +export function encodeBase64(data: Uint8Array): string { + let result = ''; + for (let i = 0; i < data.length; i += 3) { + const b0 = data[i]!; + const b1 = i + 1 < data.length ? data[i + 1]! : 0; + const b2 = i + 2 < data.length ? data[i + 2]! : 0; + result += B64[(b0 >> 2) & 0x3f]; + result += B64[((b0 << 4) | (b1 >> 4)) & 0x3f]; + result += i + 1 < data.length ? B64[((b1 << 2) | (b2 >> 6)) & 0x3f] : '='; + result += i + 2 < data.length ? B64[b2 & 0x3f] : '='; + } + return result; +} + +/** Decode standard base64 to bytes. Throws TypeError on malformed input. */ +export function decodeBase64(data: string): Uint8Array { + const clean = data.replace(/=+$/, ''); + if (data.length - clean.length > 2) { + throw new TypeError(`Invalid base64 input: too many '=' padding characters`); + } + if (clean.length % 4 === 1) { + throw new TypeError(`Invalid base64 input: length ${data.length} is not valid`); + } + const len = (clean.length * 3) >> 2; + const out = new Uint8Array(len); + let j = 0; + for (let i = 0; i < clean.length; i += 4) { + const a = B64.indexOf(clean.charAt(i)); + const b = i + 1 < clean.length ? B64.indexOf(clean.charAt(i + 1)) : 0; + const c = i + 2 < clean.length ? B64.indexOf(clean.charAt(i + 2)) : 0; + const d = i + 3 < clean.length ? B64.indexOf(clean.charAt(i + 3)) : 0; + if (a < 0 || b < 0 || c < 0 || d < 0) { + throw new TypeError(`Invalid base64 input: non-alphabet character at offset ${i}`); + } + out[j++] = (a << 2) | (b >> 4); + if (j < len) out[j++] = ((b << 4) | (c >> 2)) & 0xff; + if (j < len) out[j++] = ((c << 6) | d) & 0xff; + } + return out; +} + +// --------------------------------------------------------------------------- +// Protobuf codec for temporal.api.common.v1.Payload +// --------------------------------------------------------------------------- +// +// Map entries are encoded as embedded messages: +// message MapEntry { string key = 1; bytes value = 2; } + +function writeVarint(buf: number[], n: number): void { + while (n >= 0x80) { + buf.push((n & 0x7f) | 0x80); + n = Math.floor(n / 128); + } + buf.push(n & 0x7f); +} + +function readVarint(bytes: Uint8Array, pos: { i: number }): number { + let result = 0; + let shift = 0; + while (true) { + if (pos.i >= bytes.length) { + throw new Error('unexpected end of varint'); + } + const b = bytes[pos.i++]!; + result += (b & 0x7f) * Math.pow(2, shift); + if ((b & 0x80) === 0) break; + shift += 7; + if (shift > 35) throw new Error('varint too large'); + } + return result; +} + +function writeTagLenBytes(buf: number[], tag: number, bytes: Uint8Array): void { + buf.push(tag); + writeVarint(buf, bytes.length); + for (let i = 0; i < bytes.length; i++) buf.push(bytes[i]!); +} + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +function utf8Encode(s: string): Uint8Array { + return textEncoder.encode(s); +} + +function utf8Decode(b: Uint8Array): string { + return textDecoder.decode(b); +} + +/** Encode a Payload to its protobuf binary representation. */ +export function encodePayloadProto(payload: Payload): Uint8Array { + const buf: number[] = []; + const metadata = payload.metadata ?? {}; + for (const key of Object.keys(metadata)) { + const value = metadata[key]; + if (value == null) continue; + // Inner: key (tag 0x0A, wire-type 2) + value (tag 0x12, wire-type 2) + const entry: number[] = []; + writeTagLenBytes(entry, 0x0a, utf8Encode(key)); + writeTagLenBytes(entry, 0x12, value); + // Outer: metadata field 1, wire-type 2 + writeTagLenBytes(buf, 0x0a, new Uint8Array(entry)); + } + const data = payload.data; + if (data && data.length > 0) { + writeTagLenBytes(buf, 0x12, data); + } + return new Uint8Array(buf); +} + +/** Decode protobuf binary bytes to a Payload. */ +export function decodePayloadProto(bytes: Uint8Array): Payload { + const pos = { i: 0 }; + const metadata: Record = {}; + let data: Uint8Array = new Uint8Array(0); + while (pos.i < bytes.length) { + const tag = bytes[pos.i++]!; + const fieldNumber = tag >>> 3; + const wireType = tag & 0x07; + if (wireType === 2) { + const len = readVarint(bytes, pos); + const chunk = bytes.subarray(pos.i, pos.i + len); + pos.i += len; + if (fieldNumber === 1) { + // Parse inner map entry message + const p2 = { i: 0 }; + let key = ''; + let value = new Uint8Array(0); + while (p2.i < chunk.length) { + const itag = chunk[p2.i++]!; + const ifn = itag >>> 3; + const iwt = itag & 0x07; + if (iwt !== 2) { + throw new Error(`unsupported wire type ${iwt} in Payload metadata entry`); + } + const ilen = readVarint(chunk, p2); + const ival = chunk.subarray(p2.i, p2.i + ilen); + p2.i += ilen; + if (ifn === 1) key = utf8Decode(ival); + else if (ifn === 2) value = new Uint8Array(ival); + } + metadata[key] = value; + } else if (fieldNumber === 2) { + data = new Uint8Array(chunk); + } + // Other fields (e.g. externalPayloads = 3) are ignored. + } else if (wireType === 0) { + readVarint(bytes, pos); + } else if (wireType === 1) { + pos.i += 8; + } else if (wireType === 5) { + pos.i += 4; + } else { + throw new Error(`unsupported wire type ${wireType}`); + } + } + return { metadata, data }; +} + +/** Convenience: encode a Payload to the base64 wire format used by stream. */ +export function encodePayloadWire(payload: Payload): string { + return encodeBase64(encodePayloadProto(payload)); +} + +/** Convenience: decode the base64 wire format to a Payload. */ +export function decodePayloadWire(wire: string): Payload { + return decodePayloadProto(decodeBase64(wire)); +} diff --git a/contrib/workflow-streams/src/topic-handle.ts b/contrib/workflow-streams/src/topic-handle.ts new file mode 100644 index 000000000..a7f811c66 --- /dev/null +++ b/contrib/workflow-streams/src/topic-handle.ts @@ -0,0 +1,100 @@ +/** + * Typed topic handles for Workflow Streams. + * + * A topic handle is a thin typed view over an underlying publisher. It + * carries the topic name and the value type ``T`` so call sites do not + * have to repeat them on every publish, and so subscribers reading the + * same handle decode to the matching type. + * + * Unlike sdk-python, ``T`` here is purely a compile-time annotation: + * TypeScript has no runtime type representation, so per-topic type + * uniformity cannot be enforced by the runtime. {@link WorkflowStream} + * and {@link WorkflowStreamClient} do memoize handles by name, so two + * `topic('foo')` calls return the same handle instance — but the two + * calls' ``T`` parameters are erased and not compared. + */ + +import type { Payload } from '@temporalio/common'; +import type { SubscribeOptions, WorkflowStreamClient } from './client'; +import type { WorkflowStreamItem } from './types'; + +/** + * Client-side handle for publishing to and subscribing from a single topic. + * + * Constructed via {@link WorkflowStreamClient.topic}. Publishes share the + * underlying client's batching, dedup, and codec path; this object holds + * only the topic name and the type binding. + * + * @experimental + */ +export class TopicHandle { + /** @internal */ + constructor( + private readonly client: WorkflowStreamClient, + public readonly name: string, + private readonly publishFn: (topic: string, value: unknown, forceFlush: boolean) => void + ) {} + + /** + * Buffer ``value`` for publishing on this topic. + * + * Equivalent to the underlying client's publish path; the value flows + * through the same buffer, batch interval, and dedup sequence. + * + * @param value Value to publish. Goes through the client's payload + * converter at flush time. A pre-built {@link Payload} bypasses + * conversion (zero-copy fast path), regardless of the handle's + * bound type. + * @param options.forceFlush If true, wake the flusher to send + * immediately (fire-and-forget — does not block the caller). + */ + publish(value: T | Payload, options?: { forceFlush?: boolean }): void { + this.publishFn(this.name, value, options?.forceFlush ?? false); + } + + /** + * Async iterator over items on this topic, decoded as ``T`` via the + * client's configured payload converter (custom converter on the + * underlying {@link WorkflowStreamClient.create | Client} flows + * through; otherwise the default). + * + * For raw {@link Payload} access, or any other decode path that + * differs from the handle's bound ``T``, call + * {@link WorkflowStreamClient.subscribe} directly — it yields + * {@link WorkflowStreamItem | WorkflowStreamItem}. + * + * @param fromOffset Global offset to start reading from. + * @param options.pollCooldown Minimum interval between polls when + * there are no new items. Default 100ms. + */ + subscribe(fromOffset?: number, options?: SubscribeOptions): AsyncGenerator, void, unknown> { + return this.client.subscribe(this.name, fromOffset ?? 0, { ...options, resultType: true }); + } +} + +/** + * Workflow-side handle for publishing to a single topic. + * + * Constructed via {@link WorkflowStream.topic}. Has no ``subscribe``: + * workflows do not consume their own stream. + * + * @experimental + */ +export class WorkflowTopicHandle { + /** @internal */ + constructor( + public readonly name: string, + private readonly publishFn: (topic: string, value: unknown) => void + ) {} + + /** + * Append ``value`` to the workflow stream on this topic. + * + * @param value Value to publish. Goes through the workflow's default + * payload converter. A pre-built {@link Payload} bypasses + * conversion, regardless of the handle's bound type. + */ + publish(value: T | Payload): void { + this.publishFn(this.name, value); + } +} diff --git a/contrib/workflow-streams/src/types.ts b/contrib/workflow-streams/src/types.ts new file mode 100644 index 000000000..d6e5fdf13 --- /dev/null +++ b/contrib/workflow-streams/src/types.ts @@ -0,0 +1,92 @@ +/** + * Shared data types for the workflow stream contrib module. + * + * User-facing `data` fields on {@link WorkflowStreamItem} are Temporal + * {@link Payload}s so that per-item metadata (encoding, messageType) + * round-trips to consumers. See README §"Cross-Language Protocol". + * + * The wire representation (`PublishEntry`, `WorkflowStreamWireItem`) uses + * base64-encoded `Payload` protobuf bytes because the default JSON + * converter cannot serialize a `Payload` object embedded inside a + * plain (non-top-level) field. Using a base64 proto bytes string + * keeps the envelope JSON-serializable while preserving Payload + * metadata for codec and typed-decode paths. See `./codec` for the + * encoder/decoder. + */ + +import type { Payload } from '@temporalio/common'; + +/** + * A single item in the workflow stream log (user-facing). + * + * Generic on the decoded ``data`` type ``T``. Default ``T = Payload`` + * matches what {@link WorkflowStreamClient.subscribe} yields — the raw + * payload, with ``metadata.encoding`` available for downstream decode. + * Subscribing through a {@link TopicHandle} narrows ``T`` to the + * handle's bound type, with the default payload converter applied per + * item. + * + * The ``offset`` field is populated by the poll handler from the item's + * position in the global log. + */ +export interface WorkflowStreamItem { + topic: string; + data: T; + offset: number; +} + +/** A single entry to publish via signal (wire type). */ +export interface PublishEntry { + topic: string; + /** Base64-encoded Payload protobuf bytes. */ + data: string; +} + +/** + * Wire representation of a WorkflowStreamItem (base64 of serialized Payload). + * + * The `offset` field is populated by the poll handler from the item's + * position in the global log. It is unused in the `getState()` snapshot + * (offsets there are re-derivable from `base_offset + index`). + */ +export interface WorkflowStreamWireItem { + topic: string; + /** Base64-encoded Payload protobuf bytes. */ + data: string; + offset: number; +} + +/** Signal payload: batch of entries to publish with dedup fields. */ +export interface PublishInput { + items: PublishEntry[]; + publisher_id: string; + sequence: number; +} + +/** Update payload: request to poll for new items. */ +export interface PollInput { + topics: string[]; + from_offset: number; +} + +/** + * Update response: items matching the poll request (wire type). + * + * When `more_ready` is true, the response was truncated to stay within size + * limits and the subscriber should poll again immediately rather than applying + * a cooldown delay. + */ +export interface PollResult { + items: WorkflowStreamWireItem[]; + next_offset: number; + more_ready: boolean; +} + +/** Serializable snapshot of workflow stream state for continue-as-new. */ +export interface WorkflowStreamState { + log: WorkflowStreamWireItem[]; + base_offset: number; + publisher_sequences: Record; + /** Per-publisher last-seen timestamps (seconds) for TTL pruning. */ + publisher_last_seen: Record; +} diff --git a/contrib/workflow-streams/src/workflow.ts b/contrib/workflow-streams/src/workflow.ts new file mode 100644 index 000000000..90476ddd1 --- /dev/null +++ b/contrib/workflow-streams/src/workflow.ts @@ -0,0 +1,369 @@ +/** + * Workflow-side entrypoint for `@temporalio/workflow-streams`. + * + * Instantiate `WorkflowStream` once at the start of your workflow function; the + * constructor registers the workflow stream signal, update, and query handlers on + * the current workflow via `setHandler`. + * + * For workflows that support continue-as-new, include a + * `WorkflowStreamState | undefined` field on the workflow input and pass it as + * `priorState` — it is `undefined` on fresh starts and carries + * accumulated state on continue-as-new. + * + * Both workflow-side `WorkflowStream.publish` and client-side + * `WorkflowStreamClient.publish` use the default payload converter for per-item + * `Payload` construction. The codec chain (encryption, PII-redaction, + * compression) is NOT applied per item on either side — it runs once at + * the envelope level when Temporal's SDK encodes the signal/update that + * carries the batch. + * + * This entrypoint exports only the workflow-safe surface so that it can be + * pulled into a workflow bundle. The client-side surface lives at + * `@temporalio/workflow-streams/client` and pulls in `crypto`, + * `@temporalio/activity`, and `@temporalio/client` — none of which can be + * resolved in the workflow sandbox. + * + * @module + */ + +import { + allHandlersFinished, + condition, + continueAsNew as workflowContinueAsNew, + defineSignal, + defineUpdate, + defineQuery, + setHandler, + defaultPayloadConverter, +} from '@temporalio/workflow'; +import { ApplicationFailure, type Payload, type Workflow } from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; +import { msToNumber } from '@temporalio/common/lib/time'; +import { decodePayloadWire, encodePayloadProto, encodePayloadWire, encodeBase64 } from './codec'; +import type { PollInput, PollResult, WorkflowStreamState, PublishInput, WorkflowStreamWireItem } from './types'; +import { WorkflowTopicHandle } from './topic-handle'; + +export type { + WorkflowStreamItem, + PublishEntry, + PublishInput, + PollInput, + PollResult, + WorkflowStreamState, +} from './types'; +export { + encodeBase64, + decodeBase64, + encodePayloadProto, + decodePayloadProto, + encodePayloadWire, + decodePayloadWire, +} from './codec'; +export { WorkflowTopicHandle } from './topic-handle'; + +const BINARY_PLAIN_ENCODING = new TextEncoder().encode('binary/plain'); + +/** + * Cross-realm-safe Uint8Array check. + * + * The workflow sandbox gets its `TextEncoder` from `node:util`, which + * returns a `Uint8Array` tagged to the host realm — so `value instanceof + * Uint8Array` is false against the sandbox's own `Uint8Array` global. + * `Object.prototype.toString` crosses realm boundaries reliably. + */ +function isUint8ArrayLike(value: unknown): value is ArrayLike { + return value != null && typeof value === 'object' && Object.prototype.toString.call(value) === '[object Uint8Array]'; +} + +// Fixed handler names for cross-language interop +export const workflowStreamPublishSignal = defineSignal<[PublishInput]>('__temporal_workflow_stream_publish'); +export const workflowStreamPollUpdate = defineUpdate('__temporal_workflow_stream_poll'); +export const workflowStreamOffsetQuery = defineQuery('__temporal_workflow_stream_offset'); + +const MAX_POLL_RESPONSE_BYTES = 1_000_000; + +/** Approximate poll-response contribution of a single encoded payload. */ +function payloadWireSize(encoded: string, topic: string): number { + // `encoded` is already base64 (the on-wire representation). + return encoded.length + topic.length; +} + +/** Internal log entry: stores decoded Payload for user-facing APIs. */ +interface InternalLogEntry { + topic: string; + payload: Payload; +} + +/** Type guard for Payload — same logic as client.ts's isPayload. */ +function isPayload(value: unknown): value is Payload { + if (value === null || typeof value !== 'object') return false; + const v = value as { metadata?: unknown }; + return ( + 'metadata' in v && + v.metadata != null && + typeof v.metadata === 'object' && + !Array.isArray(v.metadata) && + Object.values(v.metadata).every((x) => isUint8ArrayLike(x)) + ); +} + +/** + * Workflow-side stream object — append-only log with publish/poll handlers. + * + * Construct once at the start of your workflow function; the constructor + * registers the workflow stream signal, update, and query handlers on the current + * workflow. + * + * Registered handlers: + * + * - `__temporal_workflow_stream_publish` signal — external publish with dedup + * - `__temporal_workflow_stream_poll` update — long-poll subscription + * - `__temporal_workflow_stream_offset` query — current log length + * + * For continue-as-new, thread a `WorkflowStreamState | undefined` field through + * the workflow input and pass it as `priorState`. + */ +export class WorkflowStream { + private log: InternalLogEntry[]; + private baseOffset: number; + private readonly publisherSequences: Record; + private readonly publisherLastSeen: Record; + private draining = false; + private readonly topicHandles = new Map>(); + + constructor(priorState?: WorkflowStreamState) { + // Note: sdk-python guards against a second `WorkflowStream(...)` call on the + // same workflow by checking `workflow.get_signal_handler(...)`. The + // TypeScript workflow runtime does not expose that inspection API, + // and `reuseV8Context` shares module-level state across workflow + // executions — so a naive module-level flag would either fire + // spuriously or miss real duplicates. Constructing `WorkflowStream` twice in + // the same workflow silently replaces the handlers; users should + // construct once at the top of the workflow function. + this.log = priorState?.log + ? priorState.log.map((item) => ({ + topic: item.topic, + payload: decodePayloadWire(item.data), + })) + : []; + this.baseOffset = priorState?.base_offset ?? 0; + this.publisherSequences = priorState?.publisher_sequences ? { ...priorState.publisher_sequences } : {}; + this.publisherLastSeen = priorState?.publisher_last_seen ? { ...priorState.publisher_last_seen } : {}; + + setHandler(workflowStreamPublishSignal, (input: PublishInput) => this.onPublish(input)); + setHandler(workflowStreamPollUpdate, (input: PollInput) => this.onPoll(input), { + validator: (_input: PollInput) => { + if (this.draining) { + throw new Error('Workflow is draining for continue-as-new'); + } + }, + }); + setHandler(workflowStreamOffsetQuery, () => this.baseOffset + this.log.length); + } + + /** + * Get a typed handle for publishing to ``name``. + * + * Repeated calls with the same name return the same handle instance. + * The type parameter ``T`` is purely a compile-time annotation — see + * the module note in {@link TopicHandle} for the difference from + * sdk-python's runtime type-uniformity check. + */ + topic(name: string): WorkflowTopicHandle { + let handle = this.topicHandles.get(name); + if (handle === undefined) { + handle = new WorkflowTopicHandle(name, (topic, value) => this.publishToTopic(topic, value)); + this.topicHandles.set(name, handle as WorkflowTopicHandle); + } + return handle as WorkflowTopicHandle; + } + + private publishToTopic(topic: string, value: unknown): void { + let payload: Payload; + if (isPayload(value)) { + payload = value; + } else if (isUint8ArrayLike(value)) { + // Bypass defaultPayloadConverter for cross-realm Uint8Arrays: the + // BinaryPayloadConverter uses `instanceof Uint8Array`, which fails + // against a Uint8Array produced by Node's built-in `TextEncoder` + // (host realm) when evaluated in the workflow sandbox. Construct + // the equivalent binary/plain Payload directly. + payload = { + metadata: { encoding: BINARY_PLAIN_ENCODING }, + data: new Uint8Array(value), + }; + } else { + payload = defaultPayloadConverter.toPayload(value); + } + this.log.push({ topic, payload }); + } + + /** Unblock all waiting poll handlers and reject new polls for CAN. */ + detachPollers(): void { + this.draining = true; + } + + /** + * Return a serializable snapshot of workflow stream state for continue-as-new. + * Prunes publisher dedup entries older than `publisherTtl`. Defaults + * to 15 minutes. + */ + getState(publisherTtl: Duration = '15 minutes'): WorkflowStreamState { + const ttlSeconds = msToNumber(publisherTtl) / 1000; + const now = Date.now() / 1000; + const activeSeqs: Record = {}; + const activeSeen: Record = {}; + for (const pid of Object.keys(this.publisherSequences)) { + // Missing timestamps are pruned (matches sdk-python). The signal + // handler always sets both maps together, so absence indicates a + // malformed snapshot rather than a supported upgrade path. + const ts = this.publisherLastSeen[pid] ?? 0; + if (now - ts < ttlSeconds) { + activeSeqs[pid] = this.publisherSequences[pid] ?? 0; + activeSeen[pid] = ts; + } + } + return { + // Per-item offset is re-derivable from base_offset + index on reload, + // so we leave it at 0 here. + log: this.log.map((entry) => ({ + topic: entry.topic, + data: encodePayloadWire(entry.payload), + offset: 0, + })), + base_offset: this.baseOffset, + publisher_sequences: activeSeqs, + publisher_last_seen: activeSeen, + }; + } + + /** + * Drain, wait for in-flight handlers, then `continueAsNew` with built args. + * + * Replaces the recipe `detachPollers()` → `condition(allHandlersFinished)` → + * `continueAsNew(...)` for the common case where the only thing that + * varies across CAN boundaries is the workflow's own arguments. + * + * `buildArgs` is invoked *after* pollers detach, with the resulting + * `WorkflowStreamState` as its single argument, and must return the positional + * argument tuple for the new run. + * + * @example + * ```typescript + * await stream.continueAsNew((state) => [{ + * itemsProcessed, + * streamState: state, + * }]); + * ``` + * + * @param buildArgs Receives the post-detach workflow stream state and returns the + * positional args for the new run. + * @param options.publisherTtl Forwarded to `getState`. + * + * Does not return; `continueAsNew` rejects with an internal exception + * that the SDK uses to close the run. + */ + async continueAsNew( + buildArgs: (state: WorkflowStreamState) => Parameters, + options?: { publisherTtl?: Duration } + ): Promise { + this.detachPollers(); + await condition(allHandlersFinished); + return workflowContinueAsNew(...buildArgs(this.getState(options?.publisherTtl))); + } + + /** + * Discard log entries before upToOffset. + * After truncation, polls requesting an offset before the new base + * will receive an error. + * + * Raises `ApplicationFailure` with type `'TruncateOutOfRange'` and + * `nonRetryable: true` when the requested offset is past the end of + * the log. Mirrors how `onPoll` reports `'TruncatedOffset'`: an update + * handler invoking `truncate` surfaces the error to the caller without + * failing the workflow task. + */ + truncate(upToOffset: number): void { + const logIndex = upToOffset - this.baseOffset; + if (logIndex <= 0) return; + if (logIndex > this.log.length) { + throw ApplicationFailure.create({ + message: `Cannot truncate to offset ${upToOffset}: only ${this.baseOffset + this.log.length} items exist`, + type: 'TruncateOutOfRange', + nonRetryable: true, + }); + } + this.log.splice(0, logIndex); + this.baseOffset = upToOffset; + } + + private onPublish(input: PublishInput): void { + if (input.publisher_id) { + const lastSeq = this.publisherSequences[input.publisher_id] ?? 0; + if (input.sequence <= lastSeq) { + return; // duplicate — skip + } + this.publisherSequences[input.publisher_id] = input.sequence; + this.publisherLastSeen[input.publisher_id] = Date.now() / 1000; // seconds + } + for (const entry of input.items) { + this.log.push({ topic: entry.topic, payload: decodePayloadWire(entry.data) }); + } + } + + private async onPoll(input: PollInput): Promise { + let logOffset = input.from_offset - this.baseOffset; + if (logOffset < 0) { + if (input.from_offset === 0) { + // "From the beginning" — start at whatever is available. + logOffset = 0; + } else { + // Subscriber had a specific position that's been truncated. + // ApplicationFailure fails this update (client gets the error) + // without crashing the workflow task — avoids a poison pill + // during replay. + throw ApplicationFailure.create({ + message: + `Requested offset ${input.from_offset} has been truncated. ` + `Current base offset is ${this.baseOffset}.`, + type: 'TruncatedOffset', + nonRetryable: true, + }); + } + } + await condition(() => this.log.length > logOffset || this.draining); + const allNew = this.log.slice(logOffset); + + // Build [globalOffset, entry] candidates, filtering by topic if requested. + const topicSet = input.topics.length > 0 ? new Set(input.topics) : null; + const candidates: Array<[number, InternalLogEntry]> = []; + for (let i = 0; i < allNew.length; i++) { + const entry = allNew[i]!; + if (topicSet !== null && !topicSet.has(entry.topic)) continue; + candidates.push([this.baseOffset + logOffset + i, entry]); + } + + // Cap response size to ~1MB of estimated wire bytes. + const wireItems: WorkflowStreamWireItem[] = []; + let size = 0; + let moreReady = false; + let nextOffset = this.baseOffset + this.log.length; + for (const [off, entry] of candidates) { + const encoded = encodeBase64(encodePayloadProto(entry.payload)); + const itemSize = payloadWireSize(encoded, entry.topic); + if (size + itemSize > MAX_POLL_RESPONSE_BYTES && wireItems.length > 0) { + // Resume from this item on the next poll. + nextOffset = off; + moreReady = true; + break; + } + size += itemSize; + wireItems.push({ topic: entry.topic, data: encoded, offset: off }); + } + + return { + items: wireItems, + next_offset: nextOffset, + more_ready: moreReady, + }; + } +} diff --git a/contrib/workflow-streams/tsconfig.json b/contrib/workflow-streams/tsconfig.json new file mode 100644 index 000000000..876a844c6 --- /dev/null +++ b/contrib/workflow-streams/tsconfig.json @@ -0,0 +1,19 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./lib", + "rootDir": "./src", + "skipLibCheck": true + }, + "references": [ + { "path": "../../packages/activity" }, + { "path": "../../packages/client" }, + { "path": "../../packages/common" }, + { "path": "../../packages/proto" }, + { "path": "../../packages/test-helpers" }, + { "path": "../../packages/testing" }, + { "path": "../../packages/worker" }, + { "path": "../../packages/workflow" } + ], + "include": ["./src/**/*.ts"] +} diff --git a/packages/common/src/reserved.ts b/packages/common/src/reserved.ts index 8e43be3cc..6d87efaf0 100644 --- a/packages/common/src/reserved.ts +++ b/packages/common/src/reserved.ts @@ -7,6 +7,19 @@ export const ENHANCED_STACK_TRACE_QUERY_NAME = '__enhanced_stack_trace'; */ export type ReservedNameEntityType = 'query' | 'signal' | 'update' | 'activity' | 'task queue' | 'sink' | 'workflow'; +/** + * Wire identifiers used by first-party SDK contrib packages. Each entry pairs + * a name with the entity type it's allowed to register as; that pair bypasses + * the {@link TEMPORAL_RESERVED_PREFIX} check at registration time. Registering + * the same name as a different entity type is still rejected. + */ +const INTERNAL_HANDLER_NAME_ALLOWLIST: ReadonlyMap = new Map([ + // @temporalio/workflow-streams + ['__temporal_workflow_stream_publish', 'signal'], + ['__temporal_workflow_stream_poll', 'update'], + ['__temporal_workflow_stream_offset', 'query'], +]); + /** * Validates if the provided name contains any reserved prefixes or matches any reserved names. * Throws a TypeError if validation fails, with a specific message indicating whether the issue @@ -16,7 +29,7 @@ export type ReservedNameEntityType = 'query' | 'signal' | 'update' | 'activity' * @param name The name to check against reserved prefixes/names */ export function throwIfReservedName(type: ReservedNameEntityType, name: string): void { - if (name.startsWith(TEMPORAL_RESERVED_PREFIX)) { + if (name.startsWith(TEMPORAL_RESERVED_PREFIX) && INTERNAL_HANDLER_NAME_ALLOWLIST.get(name) !== type) { throw new TypeError(`Cannot use ${type} name: '${name}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`); } diff --git a/packages/common/tsconfig.json b/packages/common/tsconfig.json index 1e09513a4..d6c63dc33 100644 --- a/packages/common/tsconfig.json +++ b/packages/common/tsconfig.json @@ -4,5 +4,6 @@ "outDir": "./lib", "rootDir": "./src" }, + "references": [{ "path": "../proto" }], "include": ["./src/**/*.ts"] } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 68fe45d77..36ad7e9f5 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -773,8 +773,14 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing query activation attributes'); } - // If query has __temporal_ prefix but no handler exists, throw error - if (queryType.startsWith(TEMPORAL_RESERVED_PREFIX) && !this.queryHandlers.has(queryType)) { + // Reject __temporal_-prefixed queries that would otherwise be routed to the + // user's default handler. A specific registered handler (e.g. from a + // contrib package) is allowed through. + if ( + queryType.startsWith(TEMPORAL_RESERVED_PREFIX) && + !this.queryHandlers.has(queryType) && + this.defaultQueryHandler !== undefined + ) { throw new TypeError(`Cannot use query name: '${queryType}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`); } @@ -809,8 +815,15 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing activation update protocolInstanceId'); } - // If update has __temporal_ prefix but no handler exists, throw error - if (name.startsWith(TEMPORAL_RESERVED_PREFIX) && !this.updateHandlers.get(name)) { + // Reject __temporal_-prefixed updates that would otherwise be routed to the + // user's default handler. A specific registered handler (e.g. from a + // contrib package) is allowed through, and unregistered names without a + // default handler fall through to the buffer-then-reject path below. + if ( + name.startsWith(TEMPORAL_RESERVED_PREFIX) && + !this.updateHandlers.has(name) && + this.defaultUpdateHandler !== undefined + ) { throw new TypeError(`Cannot use update name: '${name}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`); } @@ -989,8 +1002,15 @@ export class Activator implements ActivationHandler { throw new TypeError('Missing activation signalName'); } - // If signal has __temporal_ prefix but no handler exists, throw error - if (signalName.startsWith(TEMPORAL_RESERVED_PREFIX) && !this.signalHandlers.has(signalName)) { + // Reject __temporal_-prefixed signals that would otherwise be routed to the + // user's default handler. A specific registered handler (e.g. from a + // contrib package) is allowed through, and unregistered names without a + // default handler fall through to the buffer-then-reject path below. + if ( + signalName.startsWith(TEMPORAL_RESERVED_PREFIX) && + !this.signalHandlers.has(signalName) && + this.defaultSignalHandler !== undefined + ) { throw new TypeError( `Cannot use signal name: '${signalName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'` ); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index efb38e8fe..084c6cf92 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -150,10 +150,16 @@ importers: version: 8.55.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.6.3) verdaccio: specifier: ^6.5.0 - version: 6.5.0(encoding@0.1.13)(typanion@3.14.0) + version: 6.5.0(typanion@3.14.0) contrib/ai-sdk: dependencies: + '@temporalio/activity': + specifier: workspace:* + version: link:../../packages/activity + '@temporalio/client': + specifier: workspace:* + version: link:../../packages/client '@temporalio/common': specifier: workspace:* version: link:../../packages/common @@ -163,6 +169,9 @@ importers: '@temporalio/workflow': specifier: workspace:* version: link:../../packages/workflow + '@temporalio/workflow-streams': + specifier: workspace:* + version: link:../workflow-streams '@ungap/structured-clone': specifier: ^1.3.0 version: 1.3.0 @@ -197,9 +206,6 @@ importers: '@opentelemetry/semantic-conventions': specifier: ^1.25.1 version: 1.25.1 - '@temporalio/client': - specifier: workspace:* - version: link:../../packages/client '@temporalio/interceptors-opentelemetry': specifier: workspace:* version: link:../interceptors-opentelemetry @@ -292,6 +298,37 @@ importers: specifier: ^11.1.0 version: 11.1.0 + contrib/workflow-streams: + dependencies: + '@temporalio/activity': + specifier: workspace:* + version: link:../../packages/activity + '@temporalio/client': + specifier: workspace:* + version: link:../../packages/client + '@temporalio/common': + specifier: workspace:* + version: link:../../packages/common + '@temporalio/proto': + specifier: workspace:* + version: link:../../packages/proto + '@temporalio/workflow': + specifier: workspace:* + version: link:../../packages/workflow + devDependencies: + '@temporalio/test-helpers': + specifier: workspace:* + version: link:../../packages/test-helpers + '@temporalio/testing': + specifier: workspace:* + version: link:../../packages/testing + '@temporalio/worker': + specifier: workspace:* + version: link:../../packages/worker + ava: + specifier: ^5.3.1 + version: 5.3.1 + packages/activity: dependencies: '@temporalio/client': @@ -2810,9 +2847,6 @@ packages: resolution: {integrity: sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==} engines: {node: '>= 0.8'} - encoding@0.1.13: - resolution: {integrity: sha512-ETBauow1T35Y/WZMkio9jiM0Z5xjHHmJ4XmjZOq1l/dXz3lr2sRn87nJy20RupqSh1F2m3HHPSp8ShIPQJrJ3A==} - end-of-stream@1.4.5: resolution: {integrity: sha512-ooEGc6HP26xXq/N+GCGOT0JKCLDGrq2bQUZrQ7gyrJiZANJ/8YDTxTpQBXGMn+WbIQXNVpyWymm7KYVICQnyOg==} @@ -7277,11 +7311,6 @@ snapshots: encodeurl@2.0.0: {} - encoding@0.1.13: - dependencies: - iconv-lite: 0.6.3 - optional: true - end-of-stream@1.4.5: dependencies: once: 1.4.0 @@ -8671,11 +8700,9 @@ snapshots: nexus-rpc@0.0.2: {} - node-fetch@2.6.7(encoding@0.1.13): + node-fetch@2.6.7: dependencies: whatwg-url: 5.0.0 - optionalDependencies: - encoding: 0.1.13 node-releases@2.0.14: {} @@ -9755,13 +9782,13 @@ snapshots: vary@1.1.2: {} - verdaccio-audit@13.0.0-next-8.37(encoding@0.1.13): + verdaccio-audit@13.0.0-next-8.37: dependencies: '@verdaccio/config': 8.0.0-next-8.37 '@verdaccio/core': 8.0.0-next-8.37 express: 4.22.1 https-proxy-agent: 5.0.1 - node-fetch: 2.6.7(encoding@0.1.13) + node-fetch: 2.6.7 transitivePeerDependencies: - encoding - supports-color @@ -9778,7 +9805,7 @@ snapshots: transitivePeerDependencies: - supports-color - verdaccio@6.5.0(encoding@0.1.13)(typanion@3.14.0): + verdaccio@6.5.0(typanion@3.14.0): dependencies: '@cypress/request': 3.0.10 '@verdaccio/auth': 8.0.0-next-8.37 @@ -9809,7 +9836,7 @@ snapshots: lru-cache: 7.18.3 mime: 3.0.0 semver: 7.7.4 - verdaccio-audit: 13.0.0-next-8.37(encoding@0.1.13) + verdaccio-audit: 13.0.0-next-8.37 verdaccio-htpasswd: 13.0.0-next-8.37 transitivePeerDependencies: - bare-abort-controller diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index cc4e64db4..cf3bd049e 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -19,6 +19,7 @@ packages: - packages/workflow - contrib/ai-sdk - contrib/interceptors-opentelemetry + - contrib/workflow-streams - scripts ignoreScripts: true