Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
71cff0d
External payload storage
jmaeagle99 Feb 25, 2026
086640b
Remove external converter in favor of payload codec
jmaeagle99 Feb 27, 2026
8c7b418
Replace DriverError and DriverNotFoundError with RuntimeError
jmaeagle99 Feb 27, 2026
e4883d1
Import exstore module instead of types
jmaeagle99 Feb 27, 2026
c39be13
Remove DriverSelector
jmaeagle99 Feb 27, 2026
354888e
Remove PayloadNotFoundError in favor of non-retryable ApplicationError
jmaeagle99 Feb 27, 2026
9da8504
Renames and doc updates
jmaeagle99 Feb 27, 2026
cab33e2
Remove unused import
jmaeagle99 Feb 27, 2026
3fffe91
Formatting
jmaeagle99 Feb 27, 2026
734088a
Undo breaking changes
jmaeagle99 Feb 27, 2026
a0ddf52
Rename field
jmaeagle99 Feb 27, 2026
89ac04f
Use mapping instead of dict
jmaeagle99 Feb 27, 2026
f983904
Merge branch 'main' into extstore
jmaeagle99 Mar 10, 2026
f993741
Raise exception for duplicate drivers
jmaeagle99 Mar 10, 2026
62200f2
Fix storage impl initialization
jmaeagle99 Mar 10, 2026
790dc57
Reformat and add doc string
jmaeagle99 Mar 10, 2026
fb562c9
Remove WithSerializationContext for storage types that receive a Stor…
jmaeagle99 Mar 11, 2026
e50a8e6
Update driver selector to return driver name and update README sample.
jmaeagle99 Mar 11, 2026
2e03402
Raise ValueError instead of RuntimeError
jmaeagle99 Mar 11, 2026
c6609ab
Remove asserts
jmaeagle99 Mar 11, 2026
9539f59
Move extstore to non-public submodule of converter
jmaeagle99 Mar 12, 2026
b7a331b
Rename StorageConfig to ExternalStorage
jmaeagle99 Mar 12, 2026
28b6afb
Merge _StorageImpl into ExternalStorage
jmaeagle99 Mar 12, 2026
9206d66
Must have atleast one driver
jmaeagle99 Mar 12, 2026
a16cd76
Cleanup customer warning and remove unnecessary fields
jmaeagle99 Mar 12, 2026
ab7fc43
Move size check to driver selector wrapper method
jmaeagle99 Mar 12, 2026
8bf9f8d
Separate context to store and retrieve contexts
jmaeagle99 Mar 12, 2026
3a9507a
Make methods non-public
jmaeagle99 Mar 12, 2026
debe723
Fix linting violations
jmaeagle99 Mar 12, 2026
fe8e253
Cancel in-flight operations if a driver raises exception
jmaeagle99 Mar 12, 2026
63943d0
Fix doc strings
jmaeagle99 Mar 13, 2026
d384f2e
Update readme
jmaeagle99 Mar 13, 2026
0230957
Merge branch 'main' into extstore
jmaeagle99 Mar 13, 2026
f3b8aec
Add test to validate driver concurrency
jmaeagle99 Mar 13, 2026
86a6514
Change driver selector back to returning driver instance
jmaeagle99 Mar 13, 2026
82dbddb
Add test demonstrating a basic customer scenario
jmaeagle99 Mar 13, 2026
18cce91
Remove unused class
jmaeagle99 Mar 16, 2026
7a6b36b
Fix test comments
jmaeagle99 Mar 16, 2026
7dbd512
Doc updates
jmaeagle99 Mar 16, 2026
aa7cfba
Reverse the steps of codecs and external storage
jmaeagle99 Mar 16, 2026
3f4c3af
Remove ExternalStorage.payload_codec
jmaeagle99 Mar 16, 2026
ba7ef89
Reformat
jmaeagle99 Mar 16, 2026
7573092
Update tests with fuller message validation
jmaeagle99 Mar 16, 2026
953727e
Merge branch 'main' into extstore
jmaeagle99 Mar 16, 2026
f83fd4f
Merge branch 'main' into extstore
jmaeagle99 Mar 16, 2026
9ea70df
Rename StorageDriverClaim data to claim_data
jmaeagle99 Mar 16, 2026
28d2996
Require driver_selector if multiple drivers are registered
jmaeagle99 Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 130 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ informal introduction to the features and their implementation.
- [Data Conversion](#data-conversion)
- [Pydantic Support](#pydantic-support)
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [External Storage](#external-storage)
- [Driver Selection](#driver-selection)
- [Custom Drivers](#custom-drivers)
- [Workers](#workers)
- [Workflows](#workflows)
- [Definition](#definition)
Expand Down Expand Up @@ -309,8 +312,9 @@ other_ns_client = Client(**config)

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data
converters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert
Python values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
converters are a combination of payload converters, external storage, payload codecs, and failure converters. Payload
converters convert Python values to/from serialized bytes. External payload storage optionally stores and retrieves payloads
to/from external storage services using drivers. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
Failure converters convert exceptions to/from serialized failures.

The default data converter supports converting multiple types including:
Expand Down Expand Up @@ -455,6 +459,130 @@ my_data_converter = dataclasses.replace(

Now `IPv4Address` can be used in type hints including collections, optionals, etc.

##### External Storage

⚠️ **External storage support is currently at an experimental release stage.** ⚠️

External storage allows large payloads to be offloaded to an external storage service (such as Amazon S3) rather than stored inline in workflow history. This is useful when workflows or activities work with data that would otherwise exceed Temporal's payload size limits.

External storage is configured via the `external_storage` parameter on `DataConverter`. It should be configured on the `Client` both for clients of your workflow as well as on the worker -- anywhere large payloads may be uploaded or downloaded.

A `StorageDriver` handles uploading and downloading payloads. Temporal provides built-in drivers for common storage solutions, or you may customize one. Here's an example using our provided `InMemoryTestDriver`.

```python
import dataclasses
from temporalio.client import Client
from temporalio.converter import DataConverter
from temporalio.converter import ExternalStorage

driver = InMemoryTestDriver()

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
```

Some things to note about external storage:

* Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
* External storage applies transparently to all payloads, whether they are workflow inputs/outputs, activity inputs/outputs, signal inputs, query outputs, update inputs/outputs, or failure details.
* The `DataConverter`'s `payload_codec` (if configured) is applied to the payload *before* it is handed to the storage driver, so the driver always stores encoded bytes. The reference payload written to workflow history is not encoded by the `DataConverter` codec.
* Setting `ExternalStorage.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size.
Comment thread
drewhoskins-temporal marked this conversation as resolved.

###### Driver Selection

When multiple storage backends are needed, list all drivers in `ExternalStorage.drivers` and provide a `driver_selector` to control which driver stores new payloads. Any driver in the list not chosen for storing is still available for retrieval, which is useful when migrating between storage backends.

```python
from temporalio.converter import ExternalStorage

options = ExternalStorage(
drivers=[hot_driver, cold_driver],
driver_selector=lambda context, payload: (
hot_driver if payload.ByteSize() < 5 * 1024 * 1024 else cold_driver
),
)
```

For more complex selection logic, use a plain callable that reads from the `StorageDriverStoreContext`:

```python
import temporalio.converter
from temporalio.api.common.v1 import Payload

def feature_flag_is_on(workflow_id: str | None) -> bool:
"""Check whether external storage is enabled for this workflow via a feature flag service."""
return workflow_id is not None and len(workflow_id) % 2 == 0

def feature_flag_selector(
context: temporalio.converter.StorageDriverStoreContext, _payload: Payload
) -> temporalio.converter.StorageDriver | None:
workflow_id = None
if isinstance(context.serialization_context, temporalio.converter.WorkflowSerializationContext):
workflow_id = context.serialization_context.workflow_id
elif isinstance(context.serialization_context, temporalio.converter.ActivitySerializationContext):
workflow_id = context.serialization_context.workflow_id
return my_driver if feature_flag_is_on(workflow_id) else None

options = ExternalStorage(
drivers=[my_driver],
driver_selector=feature_flag_selector,
)
```

Some things to note about driver selection:

* A `driver_selector` is required when more than one driver is registered. With a single driver, `driver_selector` may be omitted and that driver is used for all store operations.
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
* The driver instance returned by the selector must be one of the instances registered in `ExternalStorage.drivers`. If it is not, an error is raised.

###### Custom Drivers

Implement `temporalio.converter.StorageDriver` to integrate with an external storage system:

```python
from collections.abc import Sequence
from temporalio.converter import StorageDriver, StorageDriverClaim, StorageDriverRetrieveContext, StorageDriverStoreContext
from temporalio.api.common.v1 import Payload

class MyDriver(StorageDriver):
def __init__(self, driver_name: str | None = None):
self._driver_name = driver_name or "my-org:driver:my-driver"

def name(self) -> str:
return self._driver_name

async def store(
self, context: StorageDriverStoreContext, payloads: Sequence[Payload]
) -> list[StorageDriverClaim]:
claims = []
for payload in payloads:
key = await my_storage.put(payload.SerializeToString())
claims.append(StorageDriverClaim(data={"key": key}))
return claims

async def retrieve(
self, context: StorageDriverRetrieveContext, claims: Sequence[StorageDriverClaim]
) -> list[Payload]:
payloads = []
for claim in claims:
data = await my_storage.get(claim.data["key"])
p = Payload()
p.ParseFromString(data)
payloads.append(p)
return payloads
```

Some things to note about implementing a custom driver:

* `StorageDriver.name()` must return a string that is unique among all drivers in `ExternalStorage.drivers`. This name is embedded in the reference payload stored in workflow history and used to look up the correct driver during retrieval — changing it after payloads have been stored will break retrieval.
Comment thread
jmaeagle99 marked this conversation as resolved.
* `StorageDriver.type()` is automatically implemented to return the name of the class. This can be overridden in subclasses but must remain consistent across all instances of the subclass.
* Implement `temporalio.converter.WithSerializationContext` on your driver to receive workflow or activity context (namespace, workflow ID, activity ID, etc.) at serialization time.

### Workers

Workers host workflows and/or activities. Here's how to run a worker:
Expand Down
7 changes: 3 additions & 4 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,9 @@ async def decode_activation(
decode_headers: bool,
) -> None:
"""Decode all payloads in the activation."""
if data_converter._decode_payload_has_effect:
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not decode_headers
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not decode_headers
).visit(_Visitor(data_converter._decode_payload_sequence), activation)


async def encode_completion(
Expand Down
14 changes: 14 additions & 0 deletions temporalio/converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
DataConverter,
default,
)
from temporalio.converter._extstore import (
ExternalStorage,
StorageDriver,
StorageDriverClaim,
StorageDriverRetrieveContext,
StorageDriverStoreContext,
StorageWarning,
)
from temporalio.converter._failure_converter import (
DefaultFailureConverter,
DefaultFailureConverterWithEncodedAttributes,
Expand Down Expand Up @@ -44,6 +52,12 @@

__all__ = [
"ActivitySerializationContext",
"ExternalStorage",
"StorageDriver",
"StorageDriverClaim",
"StorageDriverRetrieveContext",
"StorageDriverStoreContext",
"StorageWarning",
"AdvancedJSONEncoder",
"BinaryNullPayloadConverter",
"BinaryPlainPayloadConverter",
Expand Down
60 changes: 56 additions & 4 deletions temporalio/converter/_data_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
import temporalio.api.common.v1
import temporalio.api.failure.v1
import temporalio.common
from temporalio.converter._extstore import (
_REFERENCE_ENCODING,
ExternalStorage,
StorageWarning,
)
from temporalio.converter._failure_converter import (
FailureConverter,
)
Expand Down Expand Up @@ -72,6 +77,13 @@ class DataConverter(WithSerializationContext):
payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
"""Settings for payload size limits."""

external_storage: ExternalStorage | None = None
"""Options for external storage. If None, external storage is disabled.

.. warning::
This API is experimental.
"""

default: ClassVar[DataConverter]
"""Singleton default data converter."""

Expand Down Expand Up @@ -158,25 +170,30 @@ def with_context(self, context: SerializationContext) -> Self:
payload_converter = self.payload_converter
payload_codec = self.payload_codec
failure_converter = self.failure_converter
external_storage = self.external_storage
if isinstance(payload_converter, WithSerializationContext):
payload_converter = payload_converter.with_context(context)
if isinstance(payload_codec, WithSerializationContext):
payload_codec = payload_codec.with_context(context)
if isinstance(failure_converter, WithSerializationContext):
failure_converter = failure_converter.with_context(context)
if isinstance(external_storage, WithSerializationContext):
external_storage = external_storage.with_context(context)
if all(
new is orig
for new, orig in [
(payload_converter, self.payload_converter),
(payload_codec, self.payload_codec),
(failure_converter, self.failure_converter),
(external_storage, self.external_storage),
]
):
return self
cloned = dataclasses.replace(self)
object.__setattr__(cloned, "payload_converter", payload_converter)
object.__setattr__(cloned, "payload_codec", payload_codec)
object.__setattr__(cloned, "failure_converter", failure_converter)
object.__setattr__(cloned, "external_storage", external_storage)
return cloned

def _with_payload_error_limits(
Expand Down Expand Up @@ -238,12 +255,16 @@ async def _encode_payload(
) -> temporalio.api.common.v1.Payload:
if self.payload_codec:
payload = (await self.payload_codec.encode([payload]))[0]
if self.external_storage:
payload = await self.external_storage._store_payload(payload)
self._validate_payload_limits([payload])
return payload

async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
if self.payload_codec:
await self.payload_codec.encode_wrapper(payloads)
if self.external_storage:
await self.external_storage._store_payloads(payloads)
self._validate_payload_limits(payloads.payloads)

async def _encode_payload_sequence(
Expand All @@ -252,32 +273,63 @@ async def _encode_payload_sequence(
encoded_payloads = list(payloads)
if self.payload_codec:
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
if self.external_storage:
encoded_payloads = await self.external_storage._store_payload_sequence(
encoded_payloads
)
self._validate_payload_limits(encoded_payloads)
return encoded_payloads

async def _decode_payload(
self, payload: temporalio.api.common.v1.Payload
) -> temporalio.api.common.v1.Payload:
if self.external_storage:
payload = await self.external_storage._retrieve_payload(payload)
if self.payload_codec:
payload = (await self.payload_codec.decode([payload]))[0]
return payload

async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
if self.external_storage:
await self.external_storage._retrieve_payloads(payloads)
else:
if any(
p.metadata.get("encoding") == _REFERENCE_ENCODING
for p in payloads.payloads
):
warnings.warn(
"[TMPRL1105] Detected externally stored payload(s) but external storage is not configured.",
StorageWarning,
)
if self.payload_codec:
await self.payload_codec.decode_wrapper(payloads)

async def _decode_payload_sequence(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> list[temporalio.api.common.v1.Payload]:
if not self.payload_codec:
return list(payloads)
return await self.payload_codec.decode(payloads)
decoded_payloads = list(payloads)
if self.external_storage:
decoded_payloads = await self.external_storage._retrieve_payload_sequence(
decoded_payloads
)
else:
if any(
p.metadata.get("encoding") == _REFERENCE_ENCODING
for p in decoded_payloads
):
warnings.warn(
"[TMPRL1105] Detected externally stored payload(s) but external storage is not configured.",
StorageWarning,
)
if self.payload_codec:
decoded_payloads = await self.payload_codec.decode(decoded_payloads)
return decoded_payloads

# Temporary shortcircuit detection while the _decode_* methods may no-op if
# a payload codec is not configured. Remove once those paths have more to them.
@property
def _decode_payload_has_effect(self) -> bool:
return self.payload_codec is not None
return self.payload_codec is not None or self.external_storage is not None

def _validate_payload_limits(
self,
Expand Down
Loading
Loading