Skip to content

contrib: add @temporalio/workflow-streams#2039

Open
brianstrauch wants to merge 82 commits into
mainfrom
contrib/pubsub
Open

contrib: add @temporalio/workflow-streams#2039
brianstrauch wants to merge 82 commits into
mainfrom
contrib/pubsub

Conversation

@brianstrauch
Copy link
Copy Markdown
Member

@brianstrauch brianstrauch commented May 6, 2026

Summary

Introduces @temporalio/workflow-streams — a durable, offset-addressed
event channel for Temporal workflows, built on Signals (publish) and polling
Updates (subscribe). Mirrors the equivalent contrib package in sdk-python so
the same wire format and conventions work cross-SDK.

A workflow holds an append-only log of (topic, data) entries; external
clients (activities, starters, other workflows) publish and subscribe via
the workflow handle. Cost scales with durable batches, not tokens; ~100ms
per round-trip. Suited for chat tokens, agent reasoning streams, progress
events. Not for ultra-low-latency voice.

What's in the package

  • WorkflowStream — workflow-side. Registers publish/poll/offset
    handlers, append-only log with optional truncate(),
    continueAsNew() helper, dedup by (publisher_id, sequence).
  • WorkflowStreamClient — external-side. Buffered publishing with
    batchInterval / maxBatchSize / forceFlush, async flush() barrier,
    retry with maxRetryDuration and explicit data-loss surfacing,
    subscribe() that follows continue-as-new and exits cleanly on
    workflow completion, await using disposal.
  • TopicHandle / WorkflowTopicHandle — bind a topic to a value type
    once; subsequent publish / subscribe don't repeat the name.
  • Cross-SDK interop — handler names (__temporal_workflow_stream_*)
    and base64-of-protobuf-Payload wire format match sdk-python; tests in
    test-contrib-workflow-stream-interop.ts cover the round-trip.

Side changes

  • bundler.ts strips the node: URI scheme so node:stream/web and
    similar imports route through the existing alias map.
  • Workflow sandbox injects Web Streams globals (ReadableStream,
    WritableStream, TransformStream, queuing strategies) — needed by
    @temporalio/ai-sdk's ReadableStream use; lets ai-sdk drop the
    web-streams-polyfill workaround.
  • throwIfReservedName allowlists the three __temporal_workflow_streams_*
    handler names so contrib packages can register under that namespace; the
    dispatch-time prefix check is narrowed to only fire when a default
    handler would otherwise catch the call.

Test plan

  • pnpm -C packages/test exec ava lib/test-contrib-workflow-streams.js — 21 functional tests
  • pnpm -C packages/test exec ava lib/test-contrib-workflow-streams-interop.js — 10 cross-SDK wire
    tests
  • pnpm -C packages/test exec ava lib/test-ai-sdk.js — 8 ai-sdk tests covering the bundler/sandbox
    changes
  • pnpm -C packages/test exec ava lib/test-integration-workflows.js --match "*reserved prefix*"
    confirm reserved-prefix guard still rejects user-defined __temporal_* handlers

jssmith and others added 30 commits April 6, 2026 20:17
TypeScript implementation of temporalio.contrib.pubsub, matching the
Python SDK's wire protocol exactly for cross-language interoperability.

Workflow side (mixin.ts):
- initPubSub() returns a PubSubHandle for publish/drain/getState
- Signal __pubsub_publish with publisher_id + sequence dedup
- Update __pubsub_poll with long-poll via condition() + drain validator
- Query __pubsub_offset for global offset
- Continue-as-new via PubSubState serialization

Client side (client.ts):
- PubSubClient with batching (start/stop lifecycle)
- Flush with asyncio lock equivalent + buffer swap + dedup sequence
- subscribe() async generator with CAN following via describe()
- Configurable poll interval (default 0.1s)
- forWorkflow() factory for CAN-aware handles

Wire format (types.ts):
- data fields use number[] (not Uint8Array) to match Python's JSON
  serialization of bytes as numeric arrays
- snake_case field names match Python dataclass field names
- toWireBytes()/fromWireBytes() helpers for string conversion
- All handler names match Python: __pubsub_publish, __pubsub_poll,
  __pubsub_offset

Verified via cross-language interop test: TypeScript client successfully
publishes to, subscribes from, queries, and dedup-tests a Python
PubSubMixin workflow.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Mirror all Python pub/sub changes to TypeScript:
- Dedup rewrite: separate pending batch, retry with same sequence
- TTL pruning in getState() with legacy state preservation
- max_retry_duration (default 600s) with FlushTimeoutError
- truncate() method on PubSubHandle
- publisher_last_seen timestamps via Date.now()
- forWorkflow→create, flush removed, pollInterval→pollCooldown
- Publisher ID shortened to 16 hex chars

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode
data as base64 strings for cross-language compatibility. User-facing
PubSubItem uses Uint8Array.

Base64 encode/decode uses pure JS (no Buffer dependency) for workflow
sandbox compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the bounded poll wait from PubSubMixin and fix minor
whitespace in client and types.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add opt-in streaming to the AI SDK integration. When enabled, the
invokeModelStreaming activity calls model.doStream(), publishes
TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via PubSubClient,
and returns the accumulated LanguageModelV3GenerateResult.

- New invokeModelStreaming activity in createActivities()
- doStream() implemented on TemporalLanguageModel (returns replay stream)
- streaming option on TemporalProviderOptions.languageModel
- temporalClient option on AiSdkPluginOptions (required for pubsub)
- Validates prerequisites before making LLM call
- Unique part IDs via incrementing counter in replay stream

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Brings the TypeScript pubsub contrib module up to parity with
temporalio.contrib.pubsub. Major changes grouped by concern:

Correctness (matching sdk-python commit 97be29c0):
  - Client _flush(): advance `this.sequence = this.pendingSeq` before
    clearing pending on retry timeout. Without this, the next batch
    reuses the expired sequence, the workflow may have already accepted
    it, and the batch is silently deduped — causing data loss.

Truncation safety (sdk-python commit 7bc830ae):
  - Mixin poll update throws ApplicationFailure with type
    'TruncatedOffset' (nonRetryable: true) instead of plain Error.
    Plain errors would fail the update handler and risk a poison pill
    on replay.
  - `from_offset === 0` is treated as "from the beginning of whatever
    exists" (starts at log head regardless of base offset).
  - Client subscribe() catches WorkflowUpdateFailedError whose cause is
    ApplicationFailure of type 'TruncatedOffset' and restarts from
    offset 0.

Per-item offsets (sdk-python commit 5a8716ce):
  - PubSubItem and _WireItem gain an `offset: number` field. The mixin
    populates it from the global log position at poll time; the client
    yields it to subscribers.

Response size cap (sdk-python commit 90d753ed):
  - MAX_POLL_RESPONSE_BYTES = 1_000_000 constant in the mixin. Poll
    responses truncate to stay under the cap, set more_ready: true, and
    report the next offset. Subscribers skip the pollCooldown sleep
    when more_ready is true, draining the backlog quickly.

Flusher:
  - Replace setInterval-based flusher with an event-driven loop using
    a ResolvableEvent + in-flight mutex. Priority publishes reset the
    batch interval (via event.set()), matching sdk-python's asyncio
    behavior. A single mutex serializes concurrent _flush() calls,
    fixing a race where timer tick + priority flush could issue
    redundant signals.
  - FlushTimeoutError from a background tick is stashed and rethrown
    from stop()/[Symbol.asyncDispose]() so dropped batches are never
    silent.

Activity ergonomics:
  - PubSubClient.create(client?, workflowId?, options?) — both args
    now optional. When omitted, the activity Context is used
    (Context.current().client / info.workflowExecution.workflowId),
    matching Python's activity.client() / activity.info() pattern.

Dispose ergonomics:
  - [Symbol.asyncDispose] enables `await using client = PubSubClient.create(...)`.
    package.json bumps engines.node to >= 20.4.0 for runtime support.

Types:
  - PollResult gains `more_ready: boolean`.
  - PubSubState.publisher_last_seen becomes required (the field is
    always emitted by getState()).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adapts the sdk-python README to TypeScript idioms:
  - initPubSub() handle returned to the workflow function (in place of
    Python's PubSubMixin class).
  - `await using client = PubSubClient.create(...)` as the preferred
    activity-side pattern (with start()/stop() as the fallback).
  - for-await async generator for subscribe().
  - Uint8Array payload type called out in the Cross-Language Protocol
    section, with base64 on the wire.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ports 21 tests from sdk-python tests/contrib/pubsub/test_pubsub.py.
Coverage:

Baseline:
  - activity_publish_and_subscribe
  - topic_filtering
  - subscribe_from_offset
  - dispose_flushes_on_exit (await using drains buffer)
  - max_batch_size
  - dedup_rejects_duplicate_signal
  - priority_flush
  - truncate_pubsub
  - ttl_pruning_in_get_state
  - continue_as_new_typed

Per-item offset feature:
  - per_item_offsets
  - per_item_offsets_with_topic_filter
  - per_item_offsets_after_truncation

Response size cap:
  - poll_more_ready_when_response_exceeds_size_limit
  - subscribe_iterates_through_more_ready
  - small_response_more_ready_false

Truncation recovery:
  - poll_truncated_offset_returns_application_failure
  - poll_offset_zero_after_truncation
  - subscribe_recovers_from_truncation

Regression tests for correctness fixes:
  - retry_timeout_sequence_reuse_causes_data_loss — exercises the
    workflow-side dedup behavior that the client fix protects against.
    Without the fix, the next batch after a retry timeout reuses the
    expired sequence and is silently dropped.
  - flush_timeout_surfaces_on_stop — stop() rethrows a
    FlushTimeoutError raised on a background tick, so dropped batches
    are never silent.

packages/test gains @temporalio/contrib-pubsub as a workspace dep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to the initial test port. Brings the TS suite in line with
the Python test-quality pass at sdk-python commits 3a710281,
4ab7ce48, fdbb3394, 35417903, 68ad53d2.

De-flake barriers:
  - Remove `await new Promise(r => setTimeout(r, ...))` after signals
    throughout. A subsequent update/query call acts as the barrier —
    waits for prior signals to be processed before running.
  - Switch `truncate` from signal to update (workflow: truncateUpdate
    in place of truncateSignal, renamed truncateSignalWorkflow →
    truncateWorkflow). Update completion is explicit; no barrier
    comment needed.

Delete redundant tests (covered by stronger cases):
  - per_item_offsets_after_truncation — covered by truncate_pubsub +
    subscribe_recovers_from_truncation.
  - poll_offset_zero_after_truncation — covered by truncate_pubsub.
  - small_response_more_ready_false — fold a single more_ready=false
    assertion into poll_more_ready_when_response_exceeds_size_limit.
  - retry_timeout_sequence_reuse_causes_data_loss — asserted the BUG
    (silent dedup) rather than the FIX, so it would fail if dedup
    got stricter. Its purpose is better served by
    flush_retry_preserves_items_after_failures +
    flush_raises_after_max_retry_duration.

Merge related tests:
  - subscribe_from_offset + per_item_offsets →
    subscribe_from_offset_and_per_item_offsets.

Strengthen existing tests:
  - priority_flush: activity now holds 10s via heartbeat loop (was 500ms
    sleep). A regression in priority wakeup now surfaces as a missing
    item rather than passing via the dispose-driven flush at activity
    exit. Test timeout tightened to 5s — well below the hold.
  - poll_more_ready_when_response_exceeds_size_limit: assert
    more_ready=false on the final drain poll.
  - continue_as_new_typed: seed publisher dedup state (pub / seq=1),
    add publisherSequencesQuery on the workflow, assert dedup state
    survives CAN + duplicate publish is rejected + fresh sequence is
    accepted. Previously only verified log contents and offsets.
  - ttl_pruning_in_get_state: rewrite to verify real TTL semantics.
    Wall-clock gap (1.0s) between two publishers, then TTL=0.5s
    query — old pruned, new kept. Previously used 0/9999 extremes
    which exercised the code path but not the semantics.

New behavioral tests:
  - flush_retry_preserves_items_after_failures: injects signal
    failures via handle.signal monkey-patch, verifies items arrive in
    publish order, exactly once, after retry. Replaces the
    white-box `_buffer`/`_pending` inspection approach with behavioral
    assertions.
  - flush_raises_after_max_retry_duration: renamed from
    flush_timeout_surfaces_on_stop; same semantics (stop() rethrows
    FlushTimeoutError after retry window expires).

Result: 21 → 17 tests, all passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Aligns the TypeScript contrib-pubsub module with the sdk-python
bytes-to-Payload migration (commit 900a9812b on sdk-python
contrib/pubsub). `PubSubItem.data` is now a Temporal `Payload`,
and `publish(topic, value)` accepts any payload-convertible value
or a pre-built `Payload`.

Wire format:
  PublishEntry.data / _WireItem.data are now base64-encoded protobuf
  bytes of temporal.api.common.v1.Payload (previously a raw base64
  byte string). A nested `Payload` cannot be JSON-serialized by the
  default converter because the JSON converter only handles
  top-level Payload on signal/update args, so we hand-roll the proto
  encode/decode in types.ts to avoid pulling protobufjs into the
  workflow sandbox. Cross-SDK clients can publish and subscribe by
  following the same base64-of-serialized-Payload shape.

Codec boundary:
  The codec chain (encryption, PII-redaction, compression) runs
  once on the signal/update envelope, not per item. Per-item codec
  would double-encrypt because the envelope already covers items;
  keeping it envelope-level also makes behavior symmetric between
  workflow-side and client-side publishing.

API surface:
  - types.ts: PubSubItem.data: Payload; adds encodePayloadProto /
    decodePayloadProto / encodePayloadWire / decodePayloadWire;
    renames encodeData/decodeData -> encodeBase64/decodeBase64.
  - client.ts: publish(topic, value) accepts unknown or Payload,
    uses defaultPayloadConverter; adds isPayload type guard and
    SubscribeOptions type.
  - mixin.ts: workflow-side handlers updated to the Payload shape.
  - index.ts: re-export the new Payload wire helpers.
  - README.md: documents the Payload API and the envelope-level
    codec rule.
  - test-contrib-pubsub.ts: tests updated to the new API using
    defaultPayloadConverter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pure encode/decode unit tests (no Temporal server) that pin the
exact byte layout produced by encodePayloadProto /
decodePayloadProto, so the TypeScript module stays byte-compatible
with the Python SDK's use of
temporal.api.common.v1.Payload.SerializeToString().

Covers:
  - Default-converter round trip for JSON string payloads.
  - Binary payload round trip.
  - Decoding a protobuf byte sequence shaped exactly like Python's
    output.
  - Empty-payload corner case (Payload().SerializeToString() -> b"").
  - Multi-byte varint length prefix (payloads >= 128 bytes).
  - Multiple metadata entries.
  - Base64 helpers against the canonical RFC 4648 examples.
  - Forward-compat: decoder skips unknown proto fields.
  - Canonical byte sequence for a fixed input.
  - Hermetic round trip across several Payload shapes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 56789ed4 and 68c719ea:

- PubSubClient.publish() parameter renamed from `priority` to
  `forceFlush`. The kwarg never implied ordering — it just forces an
  immediate flush of the buffer — so the new name is accurate.
- PubSubClient.create() now requires both `client` and `workflowId`.
  The silent auto-detect path ("omit args and pull from activity
  context") is gone because it produced a confusing runtime error when
  called from outside an activity.
- New PubSubClient.fromActivity(options?) classmethod pulls the client
  and parent workflow id from Context.current() — the replacement for
  the auto-detect path in create().

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 72d296ea (Replace PubSubMixin with PubSub
dynamic handler registration).

The workflow-side API changes from:

  const pubsub = initPubSub(priorState);

to:

  const pubsub = new PubSub(priorState);

The constructor registers the `__pubsub_publish` signal, `__pubsub_poll`
update (with validator), and `__pubsub_offset` query handlers via
`setHandler`. The wire contract (handler names, payload shapes, offset
semantics) is unchanged.

Renames:

  packages/contrib-pubsub/src/mixin.ts    -> broker.ts
  initPubSub(priorState)                  -> new PubSub(priorState)
  PubSubHandle interface                   (removed)

sdk-python additionally guards against a second `PubSub(...)` call on
the same workflow by checking `workflow.get_signal_handler(...)`. The
TypeScript workflow runtime does not expose that inspection API, and
`reuseV8Context` shares module-level state across workflow executions
in the same worker thread — so a naive module-level flag would either
fire spuriously or miss real duplicates. The guard is omitted in TS
with an in-code note; constructing `PubSub` twice in the same workflow
silently replaces the handlers.

Test callers (workflows, activities, test-contrib-pubsub) and the
README are updated. `priorityWorkflow` / `publishWithPriority` are
renamed to `forceFlushWorkflow` / `publishWithForceFlush` to match the
client-side rename from the previous commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 99a7a8ab (openai_agents: publish raw stream
events, drop normalization layer), applied to the Vercel AI SDK plugin
per user request.

The streaming activity previously maintained a ~50-line normalization
layer: a switch over AI SDK V3 stream-part types that mapped them to
custom app event names (TEXT_DELTA, THINKING_*, LLM_CALL_START /
LLM_CALL_COMPLETE, TOOL_CALL_START, TOOL_INPUT_DELTA), plus a
synthesized TEXT_COMPLETE after text-end.

That normalization made sense when a shared UI consumed events from
multiple providers, but each provider-plugin should expose its native
event stream and let consumers render idiomatically. The activity now
publishes each yielded AI SDK stream part as JSON — one line inside
the stream loop — and still accumulates `currentText` / `currentReasoning`
to build the final `LanguageModelV3GenerateResult`.

Also switched the streaming activity to `PubSubClient.fromActivity()`
so it no longer needs the `temporalClient` plumbed through plugin
options. `AiSdkPluginOptions.temporalClient` and the
`CreateActivitiesOptions` plumbing are removed.

Downstream impact: consumers that depend on the normalized event
names (temporal-streaming-agents-samples frontend, shared-frontend
hooks) need to switch on native AI SDK stream-part types
(`text-delta`, `reasoning-delta`, `tool-input-delta`,
`response-metadata`, `finish`, ...). Not touched in this commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rename the wire-level handler identifiers to follow the existing
__temporal_ convention so they are clearly recognizable as
Temporal-internal and won't collide with user-defined handlers:

  __pubsub_publish -> __temporal_pubsub_publish
  __pubsub_poll    -> __temporal_pubsub_poll
  __pubsub_offset  -> __temporal_pubsub_offset

Mirrors the same rename in sdk-python.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… truncate

Mirrors sdk-python 736b5701. PubSub.truncate() now throws an
ApplicationFailure with type 'TruncateOutOfRange' and nonRetryable=true
when the requested offset is past the end of the log, instead of a
generic Error.

Matches how onPoll already reports 'TruncatedOffset': an update handler
that calls truncate surfaces the error to the caller as
WorkflowUpdateFailedError without poisoning the workflow task. A plain
Error inside an update handler would fail the activation instead of
the update.

New test truncate_past_end_raises_application_failure verifies the
documented behavior end-to-end and that the workflow remains usable
after the failure.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python 2d768771. flush() is an explicit synchronization
point: it returns once items buffered at call time have been signaled
to the workflow and acknowledged by the server, and returns immediately
when there is nothing to send. Complements the declarative forceFlush
on publish() and the dispose-driven flush at scope exit, for the case
where the caller needs proof that prior publications landed but the
moment doesn't naturally correspond to a specific event.

Implementation snapshots the target sequence at entry rather than
looping while pending/buffer are non-empty: a concurrent publisher
that calls publish() during the awaits adds at a later sequence and
must not extend this barrier (otherwise sustained traffic could keep
flush() blocking indefinitely). `sequence` only advances on a
successful send, so reaching the snapshotted target proves entry-time
items were confirmed.

Surfaces a deferred FlushTimeoutError stashed by the background
flusher both on entry and after drain, so flush() never returns
success while an earlier dropped batch sits unreported.

Test explicit_flush_barrier exercises the contract: empty-buffer no-op,
flush as a barrier with batchInterval=60s so a regression hangs rather
than passing on the timer, and idempotent second flush.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python 9274670b. Convenience for the common single-topic
subscriber. Previous signature required wrapping a single topic in an
array, which is noisy at every call site. Internally we normalize to
an array before the poll update; behavior for undefined / empty array
/ multi-topic array is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the Python helper landed in sdk-python: packages drain +
condition(allHandlersFinished) + continueAsNew behind
`await pubsub.continueAsNew<F>(buildArgs)`. The builder is typed
`(state: PubSubState) => Parameters<F>` and runs after drain stabilizes,
with the post-drain state as its single argument — the snapshot
ordering is structural rather than documented-by-prose.

The helper deliberately does not mirror ContinueAsNewOptions; workflows
that need to set taskQueue / searchAttributes / memo / etc. fall back
to the explicit recipe with `makeContinueAsNewFunc`. The README's
explicit-recipe example is also corrected to include the
`condition(allHandlersFinished)` step that was previously missing
between drain and continueAsNew.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pubsub options previously took raw seconds-as-number, diverging from
the SDK's standard convention where public-facing duration parameters
take a `Duration` (the `ms` package's `StringValue | number`, where number
is milliseconds). Align with the rest of the SDK:

- PubSubClient: `batchInterval`, `maxRetryDuration`, and `pollCooldown`
  are now `Duration`. Defaults are unchanged in absolute terms but
  expressed as strings (`'2 seconds'`, `'10 minutes'`, `'100 milliseconds'`).
- PubSub.getState/continueAsNew: `publisherTtl` is now `Duration`,
  default `'15 minutes'`.

Internal storage in PubSubClient is renamed `*Ms` and converted at the
boundary via `msToNumber`. This eliminates the previous `* 1000` /
`/ 1000` conversion math at every use site (setTimeout takes ms,
Date.now() returns ms).

Numeric Duration inputs are now interpreted as milliseconds (was
seconds). This is a breaking change for any external caller still on
the old seconds-based numeric form; the contrib module is unreleased
so no compatibility shim is provided.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Selected feature name is "Workflow Streams" (see
docs/rename-to-workflow-streams.md and docs/naming-analysis.md in the
streaming-comparisons superrepo). This mirrors the sdk-python rename
landed in temporalio/sdk-python on the same branch (commit 5890c589 in
that repo).

Package:        @temporalio/contrib-pubsub -> @temporalio/contrib-workflow-stream
Directory:      packages/contrib-pubsub/ -> packages/contrib-workflow-stream/
Classes:        PubSub -> WorkflowStream
                PubSubClient -> WorkflowStreamClient
                PubSubState -> WorkflowStreamState
                PubSubItem -> WorkflowStreamItem
                _WireItem -> _WorkflowStreamWireItem
Constants:      pubsubPublishSignal -> workflowStreamPublishSignal
                pubsubPollUpdate -> workflowStreamPollUpdate
                pubsubOffsetQuery -> workflowStreamOffsetQuery
Wire handlers:  __temporal_pubsub_publish -> __temporal_workflow_stream_publish
                __temporal_pubsub_poll -> __temporal_workflow_stream_poll
                __temporal_pubsub_offset -> __temporal_workflow_stream_offset
File rename:    src/broker.ts -> src/stream.ts (the class is the stream
                itself, not a workflow; "broker" carried pub/sub framing)

Method names publish/subscribe stay literal per the rename doc. The
operation-level interfaces PublishEntry/PublishInput/PollInput/
PollResult/PublisherState are kept bare for parity with the verbs.

Package directory uses singular "contrib-workflow-stream" to match
every other single-feature package in this workspace (activity, client,
worker, workflow, etc.); plurals are reserved for genuine collections.

Cross-package callers updated:
  - @temporalio/ai-sdk (depends on contrib-workflow-stream)
  - @temporalio/test (test workflows, activities, e2e + interop tests)
  - pnpm-workspace.yaml, pnpm-lock.yaml regenerated

The wire-handler rename does break compatibility with any in-flight
workflow; per the rename doc that is acceptable since this contrib has
not been publicly released.

Note: `pnpm --recursive run build` fails on a pre-existing
ReadableStream type error in packages/ai-sdk/src/provider.ts
(introduced in ca800ab, before this rename). The renamed
contrib-workflow-stream package itself builds clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The streaming language model wraps results in a ReadableStream, which
TypeScript could not resolve under the package's default lib config
(es2023 only — no DOM types). Importing the type from node:stream/web
is a localized fix that avoids pulling the full DOM lib into the
package's type surface, which would conflict with name resolution
elsewhere (notably mcp.ts).

This unblocks the test package build, which transitively type-checks
ai-sdk via project references.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The execute callback returned from TemporalMCPClient.tools() was
implicitly typed as `any`. Strict mode (noImplicitAny) on the
package's lib config rejects this — explicit `unknown` parameters
match the existing runtime behavior (the values are passed straight
through to the call activity without inspection) and pass type
checking.

Surfaced after the previous commit unblocked provider.ts type
checking; the build aborts on the first failure, so this error was
hidden until ReadableStream was resolved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Matches the sdk-python contrib/pubsub final naming. Also updates the
README and the CAN test workflow to the new name.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirror sdk-python's API: publish and subscribe via typed handles
returned by `stream.topic<T>(name)` / `client.topic<T>(name)`. Direct
`WorkflowStream.publish` and `WorkflowStreamClient.publish` are removed.

- New `TopicHandle<T>` (client-side: publish + decoded subscribe)
- New `WorkflowTopicHandle<T>` (workflow-side: publish only)
- `WorkflowStreamItem<T = Payload>` is now generic on decoded data type
- `topic(name)` memoizes per-name handles; T is compile-time only
  (TypeScript has no runtime type representation, so per-topic
  uniformity isn't enforced at runtime — sdk-python remains strict)
- `client.subscribe()` retained for raw / multi-topic decode-yourself
  paths; yields `WorkflowStreamItem<Payload>`

Updates README, ai-sdk activity, test workflows/activities/tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex review caught that TopicHandle.subscribe called
defaultPayloadConverter unconditionally, breaking clients that
configured a custom converter via WorkflowStreamClient.create. Read
the client's payloadConverter (already set up by the create() factory
to honor the Client's loadedDataConverter) and decode through it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
brianstrauch and others added 4 commits May 14, 2026 12:41
# Conflicts:
#	.github/workflows/conventions.yml
#	contrib/ai-sdk/src/__tests__/test-ai-sdk.ts
#	pnpm-lock.yaml
#	pnpm-workspace.yaml
…-streams

Match the layout of the other contrib packages (ai-sdk,
interceptors-opentelemetry): move packages/contrib-workflow-streams
to contrib/workflow-streams, drop the redundant "contrib-" prefix
from the package name, and add bugs/repository/homepage fields to
package.json. Add a CODEOWNERS entry making @temporalio/ai-sdk a
joint owner of contrib/workflow-streams alongside @temporalio/sdk,
since the ai-sdk plugin is the primary consumer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@brianstrauch brianstrauch changed the title contrib: add @temporalio/contrib-workflow-streams contrib: add @temporalio/workflow-streams May 14, 2026
Copy link
Copy Markdown
Member

@chris-olszewski chris-olszewski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review. Apologies, lots to go through in this PR.

Comment thread contrib/ai-sdk/src/load-polyfills.ts
Comment thread contrib/ai-sdk/src/load-polyfills.ts
Comment thread packages/core-bridge/Cargo.lock
Comment thread packages/common/src/reserved.ts Outdated
Comment thread contrib/ai-sdk/src/activities.ts Outdated
Comment thread contrib/workflow-streams/src/types.ts Outdated
Comment thread contrib/workflow-streams/src/types.ts Outdated
Comment on lines +161 to +162
// sandbox. The schema is a fixed public API — the manual encoder cannot
// silently go out of sync with server-side expectations.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly concerning, but I understand it is necessary. It would be good to have some features tests to verify that the SDK/server implementations stay in sync.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will include in a follow-up PR!

Comment thread contrib/ai-sdk/src/activities.ts Outdated
Comment thread contrib/workflow-streams/src/types.ts Outdated
brianstrauch and others added 6 commits May 15, 2026 10:57
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each reserved-prefix wire name is now allowlisted only as the specific
entity type it's intended for (signal/update/query). Registering the
same name as a different entity type is still rejected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Stream parts carry an id and may be interleaved across concurrent
blocks, so a single accumulator could clobber data. Track each open
block separately in a Map keyed by id.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
brianstrauch and others added 3 commits May 20, 2026 15:28
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@brianstrauch brianstrauch requested a review from mjameswh May 21, 2026 16:54
Copy link
Copy Markdown
Member

@chris-olszewski chris-olszewski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need to switch over to having a split workflow and client entrypoints for this package to avoid adding the client code to workflow bundles.

https://nodejs.org/api/packages.html#package-entry-points

Comment thread contrib/workflow-streams/README.md Outdated
brianstrauch and others added 2 commits May 22, 2026 09:49
The package's single root entrypoint pulled crypto, @temporalio/activity, and
@temporalio/client into anything that touched WorkflowStream, breaking
bundleWorkflowCode for workflow files (the existing tests hid it via the
test-helper's bundlerOptions.ignoreModules allowlist). Move the workflow-side
class and protocol constants into src/workflow.ts, expose the client surface
through src/client.ts, drop the root entrypoint so bare imports fail at
resolution time, and add a regression test that runs the real bundler without
the allowlist.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
brianstrauch and others added 2 commits May 22, 2026 10:06
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add `exports` and `typesVersions` to package.json so consumers import from
`@temporalio/workflow-streams/workflow` and
`@temporalio/workflow-streams/client` instead of reaching into the compiled
`./lib/` directory. The package has no root entrypoint — bare
`@temporalio/workflow-streams` resolves to `ERR_PACKAGE_PATH_NOT_EXPORTED`,
forcing callers onto the workflow- or client-safe subpath.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread contrib/workflow-streams/README.md Outdated
Comment thread contrib/workflow-streams/README.md Outdated

### Activity side (publishing)

Use `WorkflowStreamClient.fromActivity()` with `await using` for batched publishing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await using syntax support was only added in Node 24, but can be used in Typescript 5.2+ as the symbols existed in Node 20+.

(This is a note to myself and any other reviewers as I was concerned that this would cause issues with Node 20/22 support)

Comment thread contrib/workflow-streams/src/client.ts Outdated
Comment thread contrib/workflow-streams/README.md Outdated
Comment thread contrib/workflow-streams/README.md Outdated
Comment thread contrib/workflow-streams/src/__tests__/workflows/workflow-streams.ts Outdated
Comment thread contrib/workflow-streams/src/topic-handle.ts Outdated
Comment thread contrib/workflow-streams/src/client.ts Outdated
Comment thread contrib/workflow-streams/src/client.ts Outdated
Comment thread contrib/workflow-streams/src/workflow.ts Outdated
brianstrauch and others added 3 commits May 22, 2026 14:17
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Default to plain-string publishes across the workflow- and activity-side
test fixtures, with a dedicated publishBinaryItems activity +
binary_publish test exercising the Uint8Array (binary/plain) encoding
path. payloadString helper now handles both encodings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the underscore-prefixed pseudo-private _publishToTopic on
WorkflowStream and WorkflowStreamClient with a real `private`
publishToTopic. The topic() factories now capture it in a closure and
pass that to the handle constructors, so TopicHandle/WorkflowTopicHandle
no longer reach across class boundaries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants