contrib: add @temporalio/workflow-streams#2039
Conversation
TypeScript implementation of temporalio.contrib.pubsub, matching the Python SDK's wire protocol exactly for cross-language interoperability. Workflow side (mixin.ts): - initPubSub() returns a PubSubHandle for publish/drain/getState - Signal __pubsub_publish with publisher_id + sequence dedup - Update __pubsub_poll with long-poll via condition() + drain validator - Query __pubsub_offset for global offset - Continue-as-new via PubSubState serialization Client side (client.ts): - PubSubClient with batching (start/stop lifecycle) - Flush with asyncio lock equivalent + buffer swap + dedup sequence - subscribe() async generator with CAN following via describe() - Configurable poll interval (default 0.1s) - forWorkflow() factory for CAN-aware handles Wire format (types.ts): - data fields use number[] (not Uint8Array) to match Python's JSON serialization of bytes as numeric arrays - snake_case field names match Python dataclass field names - toWireBytes()/fromWireBytes() helpers for string conversion - All handler names match Python: __pubsub_publish, __pubsub_poll, __pubsub_offset Verified via cross-language interop test: TypeScript client successfully publishes to, subscribes from, queries, and dedup-tests a Python PubSubMixin workflow. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Mirror all Python pub/sub changes to TypeScript: - Dedup rewrite: separate pending batch, retry with same sequence - TTL pruning in getState() with legacy state preservation - max_retry_duration (default 600s) with FlushTimeoutError - truncate() method on PubSubHandle - publisher_last_seen timestamps via Date.now() - forWorkflow→create, flush removed, pollInterval→pollCooldown - Publisher ID shortened to 16 hex chars Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode data as base64 strings for cross-language compatibility. User-facing PubSubItem uses Uint8Array. Base64 encode/decode uses pure JS (no Buffer dependency) for workflow sandbox compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the bounded poll wait from PubSubMixin and fix minor whitespace in client and types. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add opt-in streaming to the AI SDK integration. When enabled, the invokeModelStreaming activity calls model.doStream(), publishes TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via PubSubClient, and returns the accumulated LanguageModelV3GenerateResult. - New invokeModelStreaming activity in createActivities() - doStream() implemented on TemporalLanguageModel (returns replay stream) - streaming option on TemporalProviderOptions.languageModel - temporalClient option on AiSdkPluginOptions (required for pubsub) - Validates prerequisites before making LLM call - Unique part IDs via incrementing counter in replay stream Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Brings the TypeScript pubsub contrib module up to parity with
temporalio.contrib.pubsub. Major changes grouped by concern:
Correctness (matching sdk-python commit 97be29c0):
- Client _flush(): advance `this.sequence = this.pendingSeq` before
clearing pending on retry timeout. Without this, the next batch
reuses the expired sequence, the workflow may have already accepted
it, and the batch is silently deduped — causing data loss.
Truncation safety (sdk-python commit 7bc830ae):
- Mixin poll update throws ApplicationFailure with type
'TruncatedOffset' (nonRetryable: true) instead of plain Error.
Plain errors would fail the update handler and risk a poison pill
on replay.
- `from_offset === 0` is treated as "from the beginning of whatever
exists" (starts at log head regardless of base offset).
- Client subscribe() catches WorkflowUpdateFailedError whose cause is
ApplicationFailure of type 'TruncatedOffset' and restarts from
offset 0.
Per-item offsets (sdk-python commit 5a8716ce):
- PubSubItem and _WireItem gain an `offset: number` field. The mixin
populates it from the global log position at poll time; the client
yields it to subscribers.
Response size cap (sdk-python commit 90d753ed):
- MAX_POLL_RESPONSE_BYTES = 1_000_000 constant in the mixin. Poll
responses truncate to stay under the cap, set more_ready: true, and
report the next offset. Subscribers skip the pollCooldown sleep
when more_ready is true, draining the backlog quickly.
Flusher:
- Replace setInterval-based flusher with an event-driven loop using
a ResolvableEvent + in-flight mutex. Priority publishes reset the
batch interval (via event.set()), matching sdk-python's asyncio
behavior. A single mutex serializes concurrent _flush() calls,
fixing a race where timer tick + priority flush could issue
redundant signals.
- FlushTimeoutError from a background tick is stashed and rethrown
from stop()/[Symbol.asyncDispose]() so dropped batches are never
silent.
Activity ergonomics:
- PubSubClient.create(client?, workflowId?, options?) — both args
now optional. When omitted, the activity Context is used
(Context.current().client / info.workflowExecution.workflowId),
matching Python's activity.client() / activity.info() pattern.
Dispose ergonomics:
- [Symbol.asyncDispose] enables `await using client = PubSubClient.create(...)`.
package.json bumps engines.node to >= 20.4.0 for runtime support.
Types:
- PollResult gains `more_ready: boolean`.
- PubSubState.publisher_last_seen becomes required (the field is
always emitted by getState()).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adapts the sdk-python README to TypeScript idioms:
- initPubSub() handle returned to the workflow function (in place of
Python's PubSubMixin class).
- `await using client = PubSubClient.create(...)` as the preferred
activity-side pattern (with start()/stop() as the fallback).
- for-await async generator for subscribe().
- Uint8Array payload type called out in the Cross-Language Protocol
section, with base64 on the wire.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ports 21 tests from sdk-python tests/contrib/pubsub/test_pubsub.py.
Coverage:
Baseline:
- activity_publish_and_subscribe
- topic_filtering
- subscribe_from_offset
- dispose_flushes_on_exit (await using drains buffer)
- max_batch_size
- dedup_rejects_duplicate_signal
- priority_flush
- truncate_pubsub
- ttl_pruning_in_get_state
- continue_as_new_typed
Per-item offset feature:
- per_item_offsets
- per_item_offsets_with_topic_filter
- per_item_offsets_after_truncation
Response size cap:
- poll_more_ready_when_response_exceeds_size_limit
- subscribe_iterates_through_more_ready
- small_response_more_ready_false
Truncation recovery:
- poll_truncated_offset_returns_application_failure
- poll_offset_zero_after_truncation
- subscribe_recovers_from_truncation
Regression tests for correctness fixes:
- retry_timeout_sequence_reuse_causes_data_loss — exercises the
workflow-side dedup behavior that the client fix protects against.
Without the fix, the next batch after a retry timeout reuses the
expired sequence and is silently dropped.
- flush_timeout_surfaces_on_stop — stop() rethrows a
FlushTimeoutError raised on a background tick, so dropped batches
are never silent.
packages/test gains @temporalio/contrib-pubsub as a workspace dep.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to the initial test port. Brings the TS suite in line with
the Python test-quality pass at sdk-python commits 3a710281,
4ab7ce48, fdbb3394, 35417903, 68ad53d2.
De-flake barriers:
- Remove `await new Promise(r => setTimeout(r, ...))` after signals
throughout. A subsequent update/query call acts as the barrier —
waits for prior signals to be processed before running.
- Switch `truncate` from signal to update (workflow: truncateUpdate
in place of truncateSignal, renamed truncateSignalWorkflow →
truncateWorkflow). Update completion is explicit; no barrier
comment needed.
Delete redundant tests (covered by stronger cases):
- per_item_offsets_after_truncation — covered by truncate_pubsub +
subscribe_recovers_from_truncation.
- poll_offset_zero_after_truncation — covered by truncate_pubsub.
- small_response_more_ready_false — fold a single more_ready=false
assertion into poll_more_ready_when_response_exceeds_size_limit.
- retry_timeout_sequence_reuse_causes_data_loss — asserted the BUG
(silent dedup) rather than the FIX, so it would fail if dedup
got stricter. Its purpose is better served by
flush_retry_preserves_items_after_failures +
flush_raises_after_max_retry_duration.
Merge related tests:
- subscribe_from_offset + per_item_offsets →
subscribe_from_offset_and_per_item_offsets.
Strengthen existing tests:
- priority_flush: activity now holds 10s via heartbeat loop (was 500ms
sleep). A regression in priority wakeup now surfaces as a missing
item rather than passing via the dispose-driven flush at activity
exit. Test timeout tightened to 5s — well below the hold.
- poll_more_ready_when_response_exceeds_size_limit: assert
more_ready=false on the final drain poll.
- continue_as_new_typed: seed publisher dedup state (pub / seq=1),
add publisherSequencesQuery on the workflow, assert dedup state
survives CAN + duplicate publish is rejected + fresh sequence is
accepted. Previously only verified log contents and offsets.
- ttl_pruning_in_get_state: rewrite to verify real TTL semantics.
Wall-clock gap (1.0s) between two publishers, then TTL=0.5s
query — old pruned, new kept. Previously used 0/9999 extremes
which exercised the code path but not the semantics.
New behavioral tests:
- flush_retry_preserves_items_after_failures: injects signal
failures via handle.signal monkey-patch, verifies items arrive in
publish order, exactly once, after retry. Replaces the
white-box `_buffer`/`_pending` inspection approach with behavioral
assertions.
- flush_raises_after_max_retry_duration: renamed from
flush_timeout_surfaces_on_stop; same semantics (stop() rethrows
FlushTimeoutError after retry window expires).
Result: 21 → 17 tests, all passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Aligns the TypeScript contrib-pubsub module with the sdk-python
bytes-to-Payload migration (commit 900a9812b on sdk-python
contrib/pubsub). `PubSubItem.data` is now a Temporal `Payload`,
and `publish(topic, value)` accepts any payload-convertible value
or a pre-built `Payload`.
Wire format:
PublishEntry.data / _WireItem.data are now base64-encoded protobuf
bytes of temporal.api.common.v1.Payload (previously a raw base64
byte string). A nested `Payload` cannot be JSON-serialized by the
default converter because the JSON converter only handles
top-level Payload on signal/update args, so we hand-roll the proto
encode/decode in types.ts to avoid pulling protobufjs into the
workflow sandbox. Cross-SDK clients can publish and subscribe by
following the same base64-of-serialized-Payload shape.
Codec boundary:
The codec chain (encryption, PII-redaction, compression) runs
once on the signal/update envelope, not per item. Per-item codec
would double-encrypt because the envelope already covers items;
keeping it envelope-level also makes behavior symmetric between
workflow-side and client-side publishing.
API surface:
- types.ts: PubSubItem.data: Payload; adds encodePayloadProto /
decodePayloadProto / encodePayloadWire / decodePayloadWire;
renames encodeData/decodeData -> encodeBase64/decodeBase64.
- client.ts: publish(topic, value) accepts unknown or Payload,
uses defaultPayloadConverter; adds isPayload type guard and
SubscribeOptions type.
- mixin.ts: workflow-side handlers updated to the Payload shape.
- index.ts: re-export the new Payload wire helpers.
- README.md: documents the Payload API and the envelope-level
codec rule.
- test-contrib-pubsub.ts: tests updated to the new API using
defaultPayloadConverter.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pure encode/decode unit tests (no Temporal server) that pin the
exact byte layout produced by encodePayloadProto /
decodePayloadProto, so the TypeScript module stays byte-compatible
with the Python SDK's use of
temporal.api.common.v1.Payload.SerializeToString().
Covers:
- Default-converter round trip for JSON string payloads.
- Binary payload round trip.
- Decoding a protobuf byte sequence shaped exactly like Python's
output.
- Empty-payload corner case (Payload().SerializeToString() -> b"").
- Multi-byte varint length prefix (payloads >= 128 bytes).
- Multiple metadata entries.
- Base64 helpers against the canonical RFC 4648 examples.
- Forward-compat: decoder skips unknown proto fields.
- Canonical byte sequence for a fixed input.
- Hermetic round trip across several Payload shapes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 56789ed4 and 68c719ea:
- PubSubClient.publish() parameter renamed from `priority` to
`forceFlush`. The kwarg never implied ordering — it just forces an
immediate flush of the buffer — so the new name is accurate.
- PubSubClient.create() now requires both `client` and `workflowId`.
The silent auto-detect path ("omit args and pull from activity
context") is gone because it produced a confusing runtime error when
called from outside an activity.
- New PubSubClient.fromActivity(options?) classmethod pulls the client
and parent workflow id from Context.current() — the replacement for
the auto-detect path in create().
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 72d296ea (Replace PubSubMixin with PubSub dynamic handler registration). The workflow-side API changes from: const pubsub = initPubSub(priorState); to: const pubsub = new PubSub(priorState); The constructor registers the `__pubsub_publish` signal, `__pubsub_poll` update (with validator), and `__pubsub_offset` query handlers via `setHandler`. The wire contract (handler names, payload shapes, offset semantics) is unchanged. Renames: packages/contrib-pubsub/src/mixin.ts -> broker.ts initPubSub(priorState) -> new PubSub(priorState) PubSubHandle interface (removed) sdk-python additionally guards against a second `PubSub(...)` call on the same workflow by checking `workflow.get_signal_handler(...)`. The TypeScript workflow runtime does not expose that inspection API, and `reuseV8Context` shares module-level state across workflow executions in the same worker thread — so a naive module-level flag would either fire spuriously or miss real duplicates. The guard is omitted in TS with an in-code note; constructing `PubSub` twice in the same workflow silently replaces the handlers. Test callers (workflows, activities, test-contrib-pubsub) and the README are updated. `priorityWorkflow` / `publishWithPriority` are renamed to `forceFlushWorkflow` / `publishWithForceFlush` to match the client-side rename from the previous commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python commit 99a7a8ab (openai_agents: publish raw stream events, drop normalization layer), applied to the Vercel AI SDK plugin per user request. The streaming activity previously maintained a ~50-line normalization layer: a switch over AI SDK V3 stream-part types that mapped them to custom app event names (TEXT_DELTA, THINKING_*, LLM_CALL_START / LLM_CALL_COMPLETE, TOOL_CALL_START, TOOL_INPUT_DELTA), plus a synthesized TEXT_COMPLETE after text-end. That normalization made sense when a shared UI consumed events from multiple providers, but each provider-plugin should expose its native event stream and let consumers render idiomatically. The activity now publishes each yielded AI SDK stream part as JSON — one line inside the stream loop — and still accumulates `currentText` / `currentReasoning` to build the final `LanguageModelV3GenerateResult`. Also switched the streaming activity to `PubSubClient.fromActivity()` so it no longer needs the `temporalClient` plumbed through plugin options. `AiSdkPluginOptions.temporalClient` and the `CreateActivitiesOptions` plumbing are removed. Downstream impact: consumers that depend on the normalized event names (temporal-streaming-agents-samples frontend, shared-frontend hooks) need to switch on native AI SDK stream-part types (`text-delta`, `reasoning-delta`, `tool-input-delta`, `response-metadata`, `finish`, ...). Not touched in this commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rename the wire-level handler identifiers to follow the existing __temporal_ convention so they are clearly recognizable as Temporal-internal and won't collide with user-defined handlers: __pubsub_publish -> __temporal_pubsub_publish __pubsub_poll -> __temporal_pubsub_poll __pubsub_offset -> __temporal_pubsub_offset Mirrors the same rename in sdk-python. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… truncate Mirrors sdk-python 736b5701. PubSub.truncate() now throws an ApplicationFailure with type 'TruncateOutOfRange' and nonRetryable=true when the requested offset is past the end of the log, instead of a generic Error. Matches how onPoll already reports 'TruncatedOffset': an update handler that calls truncate surfaces the error to the caller as WorkflowUpdateFailedError without poisoning the workflow task. A plain Error inside an update handler would fail the activation instead of the update. New test truncate_past_end_raises_application_failure verifies the documented behavior end-to-end and that the workflow remains usable after the failure. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python 2d768771. flush() is an explicit synchronization point: it returns once items buffered at call time have been signaled to the workflow and acknowledged by the server, and returns immediately when there is nothing to send. Complements the declarative forceFlush on publish() and the dispose-driven flush at scope exit, for the case where the caller needs proof that prior publications landed but the moment doesn't naturally correspond to a specific event. Implementation snapshots the target sequence at entry rather than looping while pending/buffer are non-empty: a concurrent publisher that calls publish() during the awaits adds at a later sequence and must not extend this barrier (otherwise sustained traffic could keep flush() blocking indefinitely). `sequence` only advances on a successful send, so reaching the snapshotted target proves entry-time items were confirmed. Surfaces a deferred FlushTimeoutError stashed by the background flusher both on entry and after drain, so flush() never returns success while an earlier dropped batch sits unreported. Test explicit_flush_barrier exercises the contract: empty-buffer no-op, flush as a barrier with batchInterval=60s so a regression hangs rather than passing on the timer, and idempotent second flush. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors sdk-python 9274670b. Convenience for the common single-topic subscriber. Previous signature required wrapping a single topic in an array, which is noisy at every call site. Internally we normalize to an array before the poll update; behavior for undefined / empty array / multi-topic array is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the Python helper landed in sdk-python: packages drain + condition(allHandlersFinished) + continueAsNew behind `await pubsub.continueAsNew<F>(buildArgs)`. The builder is typed `(state: PubSubState) => Parameters<F>` and runs after drain stabilizes, with the post-drain state as its single argument — the snapshot ordering is structural rather than documented-by-prose. The helper deliberately does not mirror ContinueAsNewOptions; workflows that need to set taskQueue / searchAttributes / memo / etc. fall back to the explicit recipe with `makeContinueAsNewFunc`. The README's explicit-recipe example is also corrected to include the `condition(allHandlersFinished)` step that was previously missing between drain and continueAsNew. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The pubsub options previously took raw seconds-as-number, diverging from the SDK's standard convention where public-facing duration parameters take a `Duration` (the `ms` package's `StringValue | number`, where number is milliseconds). Align with the rest of the SDK: - PubSubClient: `batchInterval`, `maxRetryDuration`, and `pollCooldown` are now `Duration`. Defaults are unchanged in absolute terms but expressed as strings (`'2 seconds'`, `'10 minutes'`, `'100 milliseconds'`). - PubSub.getState/continueAsNew: `publisherTtl` is now `Duration`, default `'15 minutes'`. Internal storage in PubSubClient is renamed `*Ms` and converted at the boundary via `msToNumber`. This eliminates the previous `* 1000` / `/ 1000` conversion math at every use site (setTimeout takes ms, Date.now() returns ms). Numeric Duration inputs are now interpreted as milliseconds (was seconds). This is a breaking change for any external caller still on the old seconds-based numeric form; the contrib module is unreleased so no compatibility shim is provided. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Selected feature name is "Workflow Streams" (see
docs/rename-to-workflow-streams.md and docs/naming-analysis.md in the
streaming-comparisons superrepo). This mirrors the sdk-python rename
landed in temporalio/sdk-python on the same branch (commit 5890c589 in
that repo).
Package: @temporalio/contrib-pubsub -> @temporalio/contrib-workflow-stream
Directory: packages/contrib-pubsub/ -> packages/contrib-workflow-stream/
Classes: PubSub -> WorkflowStream
PubSubClient -> WorkflowStreamClient
PubSubState -> WorkflowStreamState
PubSubItem -> WorkflowStreamItem
_WireItem -> _WorkflowStreamWireItem
Constants: pubsubPublishSignal -> workflowStreamPublishSignal
pubsubPollUpdate -> workflowStreamPollUpdate
pubsubOffsetQuery -> workflowStreamOffsetQuery
Wire handlers: __temporal_pubsub_publish -> __temporal_workflow_stream_publish
__temporal_pubsub_poll -> __temporal_workflow_stream_poll
__temporal_pubsub_offset -> __temporal_workflow_stream_offset
File rename: src/broker.ts -> src/stream.ts (the class is the stream
itself, not a workflow; "broker" carried pub/sub framing)
Method names publish/subscribe stay literal per the rename doc. The
operation-level interfaces PublishEntry/PublishInput/PollInput/
PollResult/PublisherState are kept bare for parity with the verbs.
Package directory uses singular "contrib-workflow-stream" to match
every other single-feature package in this workspace (activity, client,
worker, workflow, etc.); plurals are reserved for genuine collections.
Cross-package callers updated:
- @temporalio/ai-sdk (depends on contrib-workflow-stream)
- @temporalio/test (test workflows, activities, e2e + interop tests)
- pnpm-workspace.yaml, pnpm-lock.yaml regenerated
The wire-handler rename does break compatibility with any in-flight
workflow; per the rename doc that is acceptable since this contrib has
not been publicly released.
Note: `pnpm --recursive run build` fails on a pre-existing
ReadableStream type error in packages/ai-sdk/src/provider.ts
(introduced in ca800ab, before this rename). The renamed
contrib-workflow-stream package itself builds clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The streaming language model wraps results in a ReadableStream, which TypeScript could not resolve under the package's default lib config (es2023 only — no DOM types). Importing the type from node:stream/web is a localized fix that avoids pulling the full DOM lib into the package's type surface, which would conflict with name resolution elsewhere (notably mcp.ts). This unblocks the test package build, which transitively type-checks ai-sdk via project references. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The execute callback returned from TemporalMCPClient.tools() was implicitly typed as `any`. Strict mode (noImplicitAny) on the package's lib config rejects this — explicit `unknown` parameters match the existing runtime behavior (the values are passed straight through to the call activity without inspection) and pass type checking. Surfaced after the previous commit unblocked provider.ts type checking; the build aborts on the first failure, so this error was hidden until ReadableStream was resolved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Matches the sdk-python contrib/pubsub final naming. Also updates the README and the CAN test workflow to the new name. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirror sdk-python's API: publish and subscribe via typed handles returned by `stream.topic<T>(name)` / `client.topic<T>(name)`. Direct `WorkflowStream.publish` and `WorkflowStreamClient.publish` are removed. - New `TopicHandle<T>` (client-side: publish + decoded subscribe) - New `WorkflowTopicHandle<T>` (workflow-side: publish only) - `WorkflowStreamItem<T = Payload>` is now generic on decoded data type - `topic(name)` memoizes per-name handles; T is compile-time only (TypeScript has no runtime type representation, so per-topic uniformity isn't enforced at runtime — sdk-python remains strict) - `client.subscribe()` retained for raw / multi-topic decode-yourself paths; yields `WorkflowStreamItem<Payload>` Updates README, ai-sdk activity, test workflows/activities/tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codex review caught that TopicHandle.subscribe called defaultPayloadConverter unconditionally, breaking clients that configured a custom converter via WorkflowStreamClient.create. Read the client's payloadConverter (already set up by the create() factory to honor the Client's loadedDataConverter) and decode through it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # .github/workflows/conventions.yml # contrib/ai-sdk/src/__tests__/test-ai-sdk.ts # pnpm-lock.yaml # pnpm-workspace.yaml
…-streams Match the layout of the other contrib packages (ai-sdk, interceptors-opentelemetry): move packages/contrib-workflow-streams to contrib/workflow-streams, drop the redundant "contrib-" prefix from the package name, and add bugs/repository/homepage fields to package.json. Add a CODEOWNERS entry making @temporalio/ai-sdk a joint owner of contrib/workflow-streams alongside @temporalio/sdk, since the ai-sdk plugin is the primary consumer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
chris-olszewski
left a comment
There was a problem hiding this comment.
Partial review. Apologies, lots to go through in this PR.
| // sandbox. The schema is a fixed public API — the manual encoder cannot | ||
| // silently go out of sync with server-side expectations. |
There was a problem hiding this comment.
This is slightly concerning, but I understand it is necessary. It would be good to have some features tests to verify that the SDK/server implementations stay in sync.
There was a problem hiding this comment.
Will include in a follow-up PR!
# Conflicts: # .github/CODEOWNERS
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # .github/CODEOWNERS
Each reserved-prefix wire name is now allowlisted only as the specific entity type it's intended for (signal/update/query). Registering the same name as a different entity type is still rejected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Stream parts carry an id and may be interleaved across concurrent blocks, so a single accumulator could clobber data. Track each open block separately in a Map keyed by id. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
chris-olszewski
left a comment
There was a problem hiding this comment.
I think we'll need to switch over to having a split workflow and client entrypoints for this package to avoid adding the client code to workflow bundles.
The package's single root entrypoint pulled crypto, @temporalio/activity, and @temporalio/client into anything that touched WorkflowStream, breaking bundleWorkflowCode for workflow files (the existing tests hid it via the test-helper's bundlerOptions.ignoreModules allowlist). Move the workflow-side class and protocol constants into src/workflow.ts, expose the client surface through src/client.ts, drop the root entrypoint so bare imports fail at resolution time, and add a regression test that runs the real bundler without the allowlist. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add `exports` and `typesVersions` to package.json so consumers import from `@temporalio/workflow-streams/workflow` and `@temporalio/workflow-streams/client` instead of reaching into the compiled `./lib/` directory. The package has no root entrypoint — bare `@temporalio/workflow-streams` resolves to `ERR_PACKAGE_PATH_NOT_EXPORTED`, forcing callers onto the workflow- or client-safe subpath. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
398c43a to
b62f816
Compare
|
|
||
| ### Activity side (publishing) | ||
|
|
||
| Use `WorkflowStreamClient.fromActivity()` with `await using` for batched publishing |
There was a problem hiding this comment.
await using syntax support was only added in Node 24, but can be used in Typescript 5.2+ as the symbols existed in Node 20+.
(This is a note to myself and any other reviewers as I was concerned that this would cause issues with Node 20/22 support)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Default to plain-string publishes across the workflow- and activity-side test fixtures, with a dedicated publishBinaryItems activity + binary_publish test exercising the Uint8Array (binary/plain) encoding path. payloadString helper now handles both encodings. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the underscore-prefixed pseudo-private _publishToTopic on WorkflowStream and WorkflowStreamClient with a real `private` publishToTopic. The topic() factories now capture it in a closure and pass that to the handle constructors, so TopicHandle/WorkflowTopicHandle no longer reach across class boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Introduces
@temporalio/workflow-streams— a durable, offset-addressedevent channel for Temporal workflows, built on Signals (publish) and polling
Updates (subscribe). Mirrors the equivalent contrib package in sdk-python so
the same wire format and conventions work cross-SDK.
A workflow holds an append-only log of
(topic, data)entries; externalclients (activities, starters, other workflows) publish and subscribe via
the workflow handle. Cost scales with durable batches, not tokens; ~100ms
per round-trip. Suited for chat tokens, agent reasoning streams, progress
events. Not for ultra-low-latency voice.
What's in the package
WorkflowStream— workflow-side. Registers publish/poll/offsethandlers, append-only log with optional
truncate(),continueAsNew()helper, dedup by(publisher_id, sequence).WorkflowStreamClient— external-side. Buffered publishing withbatchInterval/maxBatchSize/forceFlush, asyncflush()barrier,retry with
maxRetryDurationand explicit data-loss surfacing,subscribe()that follows continue-as-new and exits cleanly onworkflow completion,
await usingdisposal.TopicHandle/WorkflowTopicHandle— bind a topic to a value typeonce; subsequent
publish/subscribedon't repeat the name.__temporal_workflow_stream_*)and base64-of-protobuf-Payload wire format match sdk-python; tests in
test-contrib-workflow-stream-interop.tscover the round-trip.Side changes
bundler.tsstrips thenode:URI scheme sonode:stream/webandsimilar imports route through the existing alias map.
ReadableStream,WritableStream,TransformStream, queuing strategies) — needed by@temporalio/ai-sdk'sReadableStreamuse; letsai-sdkdrop theweb-streams-polyfillworkaround.throwIfReservedNameallowlists the three__temporal_workflow_streams_*handler names so contrib packages can register under that namespace; the
dispatch-time prefix check is narrowed to only fire when a default
handler would otherwise catch the call.
Test plan
pnpm -C packages/test exec ava lib/test-contrib-workflow-streams.js— 21 functional testspnpm -C packages/test exec ava lib/test-contrib-workflow-streams-interop.js— 10 cross-SDK wiretests
pnpm -C packages/test exec ava lib/test-ai-sdk.js— 8 ai-sdk tests covering the bundler/sandboxchanges
pnpm -C packages/test exec ava lib/test-integration-workflows.js --match "*reserved prefix*"—confirm reserved-prefix guard still rejects user-defined
__temporal_*handlers