Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
ca07765
Add pub/sub contrib module for cross-language interop
jssmith Apr 7, 2026
3732492
TLA+-verified dedup rewrite, TTL pruning, truncation, API improvements
jssmith Apr 7, 2026
4a3f7c0
Fix ESLint: remove useless try/catch in _flush
jssmith Apr 7, 2026
9f25e5c
Remove TLA+ proof references from implementation code
jssmith Apr 7, 2026
2f68b15
Update pnpm-lock.yaml
jssmith Apr 7, 2026
efb2820
pubsub: use base64 wire format with native Uint8Array API
jssmith Apr 7, 2026
1f37436
pubsub: remove poll timeout and trim trailing whitespace
jssmith Apr 8, 2026
ca800ab
Add token-level streaming to Vercel AI SDK Temporal plugin
jssmith Apr 8, 2026
0b3d0e8
pubsub: port correctness fixes and features from sdk-python
jssmith Apr 23, 2026
e983a28
pubsub: add README for @temporalio/contrib-pubsub
jssmith Apr 23, 2026
7fb2633
pubsub: add integration tests for @temporalio/contrib-pubsub
jssmith Apr 23, 2026
5056807
pubsub: port test improvements from sdk-python
jssmith Apr 23, 2026
bab3128
pubsub: migrate API from Uint8Array to Payload
jssmith Apr 24, 2026
f4b5e3c
pubsub: add cross-language wire-format interop tests
jssmith Apr 24, 2026
10bb562
pubsub: rename publish priority to forceFlush, split create/fromActivity
jssmith Apr 24, 2026
e559a4e
pubsub: replace initPubSub function with PubSub class
jssmith Apr 24, 2026
c80ee54
ai-sdk: publish raw stream parts, drop normalization layer
jssmith Apr 24, 2026
aa07831
pubsub: prefix internal handler names with __temporal_
jssmith Apr 25, 2026
60492ca
pubsub: raise ApplicationFailure(TruncateOutOfRange) for out-of-range…
jssmith Apr 25, 2026
de6ec1c
pubsub: add public async flush() barrier on PubSubClient
jssmith Apr 25, 2026
a9e7dbf
pubsub: accept a single string for subscribe(topics=...)
jssmith Apr 25, 2026
6727fd5
pubsub: add PubSub.continueAsNew helper
jssmith Apr 28, 2026
6a5294b
pubsub: take Duration on public APIs, store ms internally
jssmith Apr 28, 2026
5a2b990
contrib: rename contrib-pubsub package to contrib-workflow-stream
jssmith Apr 29, 2026
6544751
ai-sdk: import ReadableStream from node:stream/web
jssmith Apr 29, 2026
d2c1c07
ai-sdk: annotate mcp tool execute parameters as unknown
jssmith Apr 29, 2026
e8e62f4
contrib-workflow-stream: rename drain() → detachPollers()
jssmith Apr 30, 2026
d56ac31
contrib-workflow-stream: introduce typed topic handles
jssmith Apr 30, 2026
92767ae
contrib-workflow-stream: TopicHandle.subscribe uses client's converter
jssmith Apr 30, 2026
01de057
sync cargo.lock
brianstrauch May 4, 2026
4e9c481
contrib-workflow-stream: allow __temporal_ wire names for handlers
brianstrauch May 6, 2026
0de0f4f
Merge remote-tracking branch 'origin/main' into contrib/pubsub
brianstrauch May 6, 2026
ddbcabc
ai-sdk: enable Web Streams in workflow sandbox, drop polyfill
brianstrauch May 6, 2026
3671bc5
sync cargo.lock
brianstrauch May 6, 2026
0cfa474
contrib-workflow-stream: handle stream-end races in subscribe
brianstrauch May 6, 2026
9332c0e
contrib-workflow-stream: typed subscribe via resultType
brianstrauch May 6, 2026
40ef218
ai-sdk: configurable streamingTopic in TemporalProviderOptions
brianstrauch May 6, 2026
f37b817
Potential fix for pull request finding
brianstrauch May 6, 2026
822e8cf
Potential fix for pull request finding
brianstrauch May 6, 2026
4f4608e
contrib-workflow-stream: validate input in decodeBase64
brianstrauch May 6, 2026
cb22129
Potential fix for pull request finding
brianstrauch May 6, 2026
f56fd33
Potential fix for pull request finding
brianstrauch May 6, 2026
1a3dd1d
ai-sdk: use await using for the streaming activity's WorkflowStreamCl…
brianstrauch May 7, 2026
ee88044
contrib-workflow-stream: add project references to workspace deps
brianstrauch May 7, 2026
d87a475
contrib-workflow-stream: declare @temporalio/proto as devDependency
brianstrauch May 7, 2026
f5de882
contrib-workflow-stream: move @temporalio/proto back to dependencies
brianstrauch May 7, 2026
0f77d71
common: declare project reference to @temporalio/proto
brianstrauch May 7, 2026
914ff35
ci(docs): bump Node heap to 8GB for docusaurus build
brianstrauch May 7, 2026
340a240
fix lint errors + bump Node heap for lint workflow
brianstrauch May 7, 2026
1fd550b
pin protobufjs to exact 7.5.5
brianstrauch May 7, 2026
9a74d70
Revert "pin protobufjs to exact 7.5.5"
brianstrauch May 7, 2026
0bcc3a7
add pnpm override forcing protobufjs to 7.5.5
brianstrauch May 7, 2026
74c5502
pin protobufjs to exact 7.5.5 across workspace packages
brianstrauch May 7, 2026
52dc79e
contrib-workflow-stream: drop unused @temporalio/proto dependency
brianstrauch May 7, 2026
35edd2a
Revert protobufjs pinning workarounds
brianstrauch May 7, 2026
8a0ddd2
contrib-workflow-stream: throw on unexpected wire type in Payload map…
brianstrauch May 7, 2026
aa67a7d
contrib-workflow-streams: rename package and wire handlers to plural
brianstrauch May 7, 2026
8d71c9e
contrib-workflow-streams: fix README table column alignment
brianstrauch May 7, 2026
1daa154
revert handler names
brianstrauch May 7, 2026
a2d73f7
ai-sdk: drop unused web-streams-polyfill dependency
brianstrauch May 10, 2026
9e199de
contrib-workflow-streams: drop public start()/stop() on client
brianstrauch May 10, 2026
7be68f6
contrib-workflow-streams: fix README code comment spacing
brianstrauch May 10, 2026
b08cf11
Merge branch 'main' into contrib/pubsub
brianstrauch May 14, 2026
23ff62b
workflow-streams: move to contrib/ and rename to @temporalio/workflow…
brianstrauch May 14, 2026
27f8c86
chore: update lockfile for pnpm 10 patchedDependencies format
brianstrauch May 14, 2026
e90c42d
Merge branch 'main' into contrib/pubsub
brianstrauch May 14, 2026
d3747ea
Merge remote-tracking branch 'origin/main' into contrib/pubsub
brianstrauch May 15, 2026
abb93bd
code review first pass
brianstrauch May 15, 2026
e17ff7c
Merge remote-tracking branch 'origin/contrib/pubsub' into contrib/pubsub
brianstrauch May 15, 2026
48cc106
fix lint
brianstrauch May 15, 2026
0be9f4e
pair INTERNAL_HANDLER_NAME_ALLOWLIST names with entity type
brianstrauch May 18, 2026
9c74b98
key text and reasoning accumulators by stream part id
brianstrauch May 18, 2026
23decc2
revert web-streams sandbox injection, restore polyfill
brianstrauch May 20, 2026
e40b663
restore pnpm 10 patchedDependencies lockfile format
brianstrauch May 20, 2026
0848531
Merge branch 'main' into contrib/pubsub
brianstrauch May 21, 2026
ce2a133
split workflow-streams into workflow and client entrypoints
brianstrauch May 22, 2026
cb6d036
Merge remote-tracking branch 'origin/contrib/pubsub' into contrib/pubsub
brianstrauch May 22, 2026
54ea111
fix lint
brianstrauch May 22, 2026
b62f816
expose workflow-streams subpaths without /lib/
brianstrauch May 22, 2026
79eabda
rename fromActivity to fromWithinActivity
brianstrauch May 22, 2026
dd720a2
cover both string and Uint8Array publish paths in tests
brianstrauch May 22, 2026
fc9d3b2
make publishToTopic private; inject as closure to topic handles
brianstrauch May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
# someone opens a pull request.
* @temporalio/sdk

/contrib/ai-sdk/ @temporalio/sdk @temporalio/ai-sdk
/contrib/ai-sdk/ @temporalio/sdk @temporalio/ai-sdk
/contrib/workflow-streams/ @temporalio/sdk @temporalio/ai-sdk
4 changes: 3 additions & 1 deletion .github/workflows/conventions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,7 @@ jobs:

- run: pnpm run lint:check
env:
NODE_OPTIONS: '--max-old-space-size=8192'
NODE_OPTIONS: --max-old-space-size=8192
- run: pnpm run lint:prune
env:
NODE_OPTIONS: --max-old-space-size=8192
1 change: 1 addition & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
run: pnpm run docs
env:
ALGOLIA_API_KEY: ${{ secrets.ALGOLIA_API_KEY }}
NODE_OPTIONS: --max-old-space-size=8192

- name: Publish production docs
if: ${{ inputs.publish_target == 'prod' }}
Expand Down
3 changes: 3 additions & 0 deletions contrib/ai-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
"workerThreads": false
},
"dependencies": {
"@temporalio/activity": "workspace:*",
"@temporalio/client": "workspace:*",
"@temporalio/common": "workspace:*",
"@temporalio/plugin": "workspace:*",
"@temporalio/workflow": "workspace:*",
"@temporalio/workflow-streams": "workspace:*",
"@ungap/structured-clone": "^1.3.0",
"headers-polyfill": "^4.0.3",
"web-streams-polyfill": "^4.2.0"
Expand Down
10 changes: 9 additions & 1 deletion contrib/ai-sdk/src/__tests__/test-ai-sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ import { workflowInterceptorModules } from '@temporalio/testing';
import { bundleWorkflowCode, DefaultLogger, Runtime } from '@temporalio/worker';
import type { InjectedSinks } from '@temporalio/worker';
import type { BaseContext } from '@temporalio/test-helpers';
import { test as anyTest, helpers, Worker, TestWorkflowEnvironment } from '@temporalio/test-helpers';
import {
test as anyTest,
createBaseBundlerOptions,
helpers,
Worker,
TestWorkflowEnvironment,
} from '@temporalio/test-helpers';
import { AiSdkPlugin, createActivities } from '..';
import {
embeddingWorkflow,
Expand Down Expand Up @@ -234,6 +240,7 @@ const test = anyTest as TestFn<BaseContext>;
test.before(async (t) => {
const env = await TestWorkflowEnvironment.createLocal();
const workflowBundle = await bundleWorkflowCode({
...createBaseBundlerOptions(),
workflowsPath: require.resolve('./workflows/ai-sdk'),
workflowInterceptorModules,
logger: new DefaultLogger('WARN'),
Expand Down Expand Up @@ -465,6 +472,7 @@ test('Telemetry', async (t) => {
],
taskQueue: 'test-ai-telemetry',
workflowsPath: require.resolve('./workflows/ai-sdk'),
bundlerOptions: createBaseBundlerOptions(),

interceptors: {
client: {
Expand Down
131 changes: 131 additions & 0 deletions contrib/ai-sdk/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import type {
LanguageModelV3CallOptions,
LanguageModelV3Content,
LanguageModelV3FinishReason,
LanguageModelV3GenerateResult,
LanguageModelV3Usage,
EmbeddingModelV3Result,
SharedV3ProviderOptions,
SharedV3Headers,
SharedV3Warning,
ProviderV3,
} from '@ai-sdk/provider';
import { asSchema, type Schema, type ToolExecutionOptions } from 'ai';
import { ApplicationFailure } from '@temporalio/common';
import { Context } from '@temporalio/activity';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import type { Duration } from '@temporalio/common/lib/time';
import type { McpClientFactories, McpClientFactory } from './mcp';

const encoder = new TextEncoder();

/**
* Arguments for invoking a language model activity.
*/
Expand All @@ -18,6 +27,16 @@ export interface InvokeModelArgs {
options: LanguageModelV3CallOptions;
}

/**
* Arguments for invoking a streaming language model activity.
*/
export interface InvokeModelStreamingArgs extends InvokeModelArgs {
modelId: string;
options: LanguageModelV3CallOptions;
streamingTopic: string;
streamingBatchInterval?: Duration;
}

/**
* Result from a language model invocation.
* This is an alias to the AI SDK's LanguageModelV3GenerateResult for type safety.
Expand Down Expand Up @@ -75,6 +94,118 @@ export function createActivities(provider: ProviderV3, mcpClientFactories?: McpC
const model = provider.languageModel(args.modelId);
return await model.doGenerate(args.options);
},

/**
* Streaming-aware model activity.
*
* Calls `model.doStream()`, publishes each yielded AI SDK stream part
* as JSON to the stream side channel, and returns the assembled
* `LanguageModelV3GenerateResult`. Consumers receive native AI SDK
* stream-part types (text-delta, reasoning-delta, tool-input-delta,
* response-metadata, finish, ...); no normalization happens here.
*/
async invokeModelStreaming(args: InvokeModelStreamingArgs): Promise<InvokeModelResult> {
await using stream = WorkflowStreamClient.fromWithinActivity({
batchInterval: args.streamingBatchInterval ?? '100 milliseconds',
});
const events = stream.topic(args.streamingTopic);

const model = provider.languageModel(args.modelId);
const streamResult = await model.doStream(args.options);

const content: LanguageModelV3Content[] = [];
let finishReason: LanguageModelV3FinishReason = { unified: 'other', raw: undefined };
let usage: LanguageModelV3Usage = {
inputTokens: { total: undefined, noCache: undefined, cacheRead: undefined, cacheWrite: undefined },
outputTokens: { total: undefined, text: undefined, reasoning: undefined },
};
const warnings: SharedV3Warning[] = [];
let responseMetadata: Record<string, unknown> | undefined;

const textBlocks = new Map<string, string>();
const reasoningBlocks = new Map<string, string>();

const reader = streamResult.stream.getReader();

while (true) {
const { done, value: part } = await reader.read();
if (done) break;

Context.current().heartbeat();

// Publish the raw stream part as JSON so consumers can switch on
// the native AI SDK type. Accumulation below is for the final
// assembled result this activity returns.
events.publish(encoder.encode(JSON.stringify(part)));

switch (part.type) {
case 'stream-start':
warnings.push(...part.warnings);
break;
case 'text-start':
textBlocks.set(part.id, '');
break;
case 'text-delta':
textBlocks.set(part.id, (textBlocks.get(part.id) ?? '') + part.delta);
break;
case 'text-end':
content.push({
type: 'text',
text: textBlocks.get(part.id) ?? '',
providerMetadata: part.providerMetadata,
});
textBlocks.delete(part.id);
break;
case 'reasoning-start':
reasoningBlocks.set(part.id, '');
break;
case 'reasoning-delta':
reasoningBlocks.set(part.id, (reasoningBlocks.get(part.id) ?? '') + part.delta);
break;
case 'reasoning-end':
content.push({
type: 'reasoning',
text: reasoningBlocks.get(part.id) ?? '',
providerMetadata: part.providerMetadata,
});
reasoningBlocks.delete(part.id);
break;
case 'response-metadata':
responseMetadata = {
id: part.id,
timestamp: part.timestamp,
modelId: part.modelId,
};
break;
case 'finish':
finishReason = part.finishReason;
usage = part.usage;
break;
default:
// tool-call, tool-result, file, source — collect as content
if (
'type' in part &&
(part.type === 'tool-call' ||
part.type === 'tool-result' ||
part.type === 'file' ||
part.type === 'source')
) {
content.push(part);
}
break;
}
}

return {
content,
finishReason,
usage,
warnings,
request: streamResult.request,
response: responseMetadata ? { ...responseMetadata, ...streamResult.response } : streamResult.response,
};
},

async invokeEmbeddingModel(args: InvokeEmbeddingModelArgs): Promise<InvokeEmbeddingModelResult> {
const model = provider.embeddingModel(args.modelId);
return await model.doEmbed({
Expand Down
2 changes: 1 addition & 1 deletion contrib/ai-sdk/src/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class TemporalMCPClient {
toolName,
{
description: toolResult.description,
execute: async (input, options) => {
execute: async (input: unknown, options: unknown) => {
const activities = workflow.proxyActivities({
summary: toolName,
startToCloseTimeout: '10 minutes',
Expand Down
87 changes: 82 additions & 5 deletions contrib/ai-sdk/src/provider.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ReadableStreamDefaultController } from 'node:stream/web';
import type {
EmbeddingModelV3,
EmbeddingModelV3CallOptions,
Expand All @@ -13,6 +14,11 @@ import type {
import * as workflow from '@temporalio/workflow';
import type { ActivityOptions } from '@temporalio/workflow';
import { ApplicationFailure } from '@temporalio/common';
import type { Duration } from '@temporalio/common/lib/time';

// `ReadableStream` is a sandbox global; type-only import keeps `node:stream/web`
// out of the workflow bundle (es2023 lib has no DOM types).
declare const ReadableStream: typeof import('node:stream/web').ReadableStream;

/**
* Options for configuring the TemporalProvider with per-model activity settings.
Expand All @@ -28,7 +34,22 @@ export interface TemporalProviderOptions {
* Activity options specific to language model calls.
* Merged with default options, with these taking precedence.
*/
languageModel?: ActivityOptions;
languageModel?: ActivityOptions & {
/**
* Topic name on the workflow's stream that streaming model calls publish
* raw stream parts to. When set, `doStream` is enabled and routes through
* the streaming activity; when unset, `doStream` throws. Pick a unique
* name per concurrent streaming call to keep event streams separable.
*/
streamingTopic?: string;

/**
* Batch interval for the per-activity `WorkflowStreamClient` that
* publishes stream parts back to the workflow. Lower values reduce
* latency at the cost of more signal traffic. Defaults to 100ms.
*/
streamingBatchInterval?: Duration;
};

/**
* Activity options specific to embedding model calls.
Expand All @@ -46,11 +67,16 @@ export interface TemporalProviderOptions {
export class TemporalLanguageModel implements LanguageModelV3 {
readonly specificationVersion = 'v3';
readonly provider = 'temporal';
private readonly streamingTopic: string | undefined;
private readonly streamingBatchInterval: Duration | undefined;

constructor(
readonly modelId: string,
readonly options?: ActivityOptions
) {}
readonly options?: ActivityOptions & { streamingTopic?: string; streamingBatchInterval?: Duration }
) {
this.streamingTopic = options?.streamingTopic;
this.streamingBatchInterval = options?.streamingBatchInterval;
}

get supportedUrls(): Record<string, RegExp[]> {
return {};
Expand All @@ -75,8 +101,59 @@ export class TemporalLanguageModel implements LanguageModelV3 {
return result;
}

doStream(_options: LanguageModelV3CallOptions): PromiseLike<LanguageModelV3StreamResult> {
throw ApplicationFailure.nonRetryable('Streaming not supported.');
async doStream(options: LanguageModelV3CallOptions): Promise<LanguageModelV3StreamResult> {
if (this.streamingTopic === undefined) {
throw ApplicationFailure.nonRetryable(
'Streaming not enabled. Set streamingTopic in languageModel provider options.'
);
}

// Call the streaming activity, which publishes tokens via stream
// and returns the accumulated result.
const activities = workflow.proxyActivities({
startToCloseTimeout: '10 minutes',
...this.options,
});
const result = await activities.invokeModelStreaming!({
modelId: this.modelId,
options,
streamingTopic: this.streamingTopic,
streamingBatchInterval: this.streamingBatchInterval,
});
if (result === undefined) {
throw ApplicationFailure.nonRetryable('Received undefined response from streaming model activity.');
}

// Wrap the accumulated result as a ReadableStream that replays the content.
// Real-time token streaming already happened via stream in the activity.
const stream = new ReadableStream({
start(controller: ReadableStreamDefaultController) {
controller.enqueue({ type: 'stream-start', warnings: result.warnings ?? [] });
let partIndex = 0;
for (const item of result.content ?? []) {
const id = `part-${partIndex++}`;
if (item.type === 'text') {
controller.enqueue({ type: 'text-start', id });
controller.enqueue({ type: 'text-delta', id, delta: item.text });
controller.enqueue({ type: 'text-end', id });
} else if (item.type === 'reasoning') {
controller.enqueue({ type: 'reasoning-start', id });
controller.enqueue({ type: 'reasoning-delta', id, delta: item.text });
controller.enqueue({ type: 'reasoning-end', id });
} else {
controller.enqueue(item);
}
}
controller.enqueue({
type: 'finish',
finishReason: result.finishReason,
usage: result.usage,
});
controller.close();
},
});

return { stream, request: result.request, response: result.response };
}
}

Expand Down
Loading
Loading