Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ with an external configuration file, like TOML, decoupling connection settings f
server to decode payloads for display in Temporal CLI and Temporal Web. This setup can be used for any kind of codec, common
examples are compression or encryption.

- [**External Storage**](./external-storage): Offload large payloads to
S3-compatible object storage plus a codec server built on
the SDK's payload HTTP handler so the Web UI and CLI can decode and download the externally-stored payloads.

- [**Query Example**](./query): Demonstrates how to Query the state
of a single Workflow Execution using the `QueryWorkflow` and `SetQueryHandler` APIs. Additional
documentation: [How to Query a Workflow Execution in Go](https://docs.temporal.io/application-development/features/#queries).
Expand Down
133 changes: 133 additions & 0 deletions external-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# External Storage Sample

This sample demonstrates how to offload large workflow payloads to S3-compatible
object storage using the Temporal Go SDK's built-in `ExternalStorage` system,
combined with the SDK's zlib `PayloadCodec` so the payloads stored both inline
in Temporal and in S3 are compressed.

**Scenario:** A fulfillment center processes batches of shipping orders. The
workflow receives a small request (a batch ID and order count), then internally
calls a `FetchOrders` activity that returns the full list of orders with
customer records, line items, and handling notes. That list — several hundred
kilobytes even after compression — is passed to a second `ProcessOrders`
activity. Finally the workflow returns a small `BatchSummary` with totals.

Each payload is first compressed by the SDK's `NewZlibCodec`. The SDK then
checks the compressed size against the default 256 KiB threshold; payloads
Comment thread
jmaeagle99 marked this conversation as resolved.
still above it are stored in S3 and replaced inline with compact claim-check
references. The workflow's own input (`OrderBatchRequest`) and result
(`BatchSummary`) compress to a few hundred bytes and remain inline.

A mock S3 service ([s3-mock](./s3-mock)) is included so you can run the sample
locally without an AWS account or Docker. A codec server
([codec-server](./codec-server)) is included to retrieve and decompress payloads
on demand for the Temporal Web UI.

## Steps to run this sample

1. Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
Comment thread
jmaeagle99 marked this conversation as resolved.
2. In a separate terminal, run the mock S3 server. It listens on `:5000` and
creates the `temporal-payloads` bucket. Leave it running.
```
go run ./external-storage/s3-mock
```
3. In a separate terminal, run the worker:
```
go run ./external-storage/worker
```
4. In a separate terminal, run the starter:
```
go run ./external-storage/starter
```
Example output:
```
Starting workflow external-storage-20260515-120000 (batch_id=BATCH-20260515-120000, order_count=200)

Batch BATCH-20260515-120000: 200 orders processed
Total shipping cost: $28512.40
Total weight: 19684.2 kg
Avg delivery: 4.4 days
```
5. (Optional) Run the codec server in a fourth terminal:
Comment thread
jmaeagle99 marked this conversation as resolved.
```
go run ./external-storage/codec-server
```
In the Temporal Web UI (http://localhost:8233), open Settings → Data Encoder
and set the Remote Codec Endpoint to `http://localhost:8081`. Reload the
workflow page; the inline compressed payloads will be shown as readable
JSON, and externally-stored payloads can be downloaded to fetch their
actual content from the mock S3.

The Web UI sends the namespace as the `X-Namespace` header on each request,
so multi-namespace setups can dispatch by reading that header.

| Endpoint | Behavior |
| --- | --- |
| `POST /encode` | Compress the payload, then offload to S3 if it exceeds the threshold. |
| `POST /decode` | Retrieve any external storage references from S3, then decompress. Pass `?preserveStorageRefs=true` to leave references as-is. |
| `POST /download` | All inputs must be storage references. Retrieves them from S3 and decompresses. |
6. Run `temporal workflow show` to see how payloads are stored:
```
temporal workflow show --workflow-id external-storage-<timestamp>
Comment thread
jmaeagle99 marked this conversation as resolved.
```
The workflow's input (`OrderBatchRequest`) and result (`BatchSummary`) are
zlib-encoded and stored inline in Temporal — small enough to compress to a
few hundred bytes. The two activity payloads carrying the order list — the
output of `FetchOrders` and the input to `ProcessOrders` — exceed 256 KiB
even after compression, so they appear as external-storage references,
confirming the SDK offloaded them to S3.

## How it works

The client's `DataConverter` is wrapped with the SDK's zlib codec, and
`client.Options.ExternalStorage` is set with the SDK's S3 driver
(`go.temporal.io/sdk/contrib/aws/s3driver`):

```go
driver, _ := s3driver.NewDriver(s3driver.Options{
Client: awssdkv2.NewClient(s3Client),
Bucket: s3driver.StaticBucket(externalstorage.S3Bucket),
})
client.Dial(client.Options{
DataConverter: converter.NewCodecDataConverter(
converter.GetDefaultDataConverter(),
converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}),
),
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
},
})
```

On the encode path the SDK:
Comment thread
jmaeagle99 marked this conversation as resolved.

1. Serializes the Go value to a `Payload`.
2. Runs the zlib codec to compress the payload bytes.
3. Checks the compressed size against `PayloadSizeThreshold` (default: 256 KiB).
4. If still above the threshold, stores the compressed bytes in S3 via
the SDK's `s3driver` and replaces the inline payload with a claim-check
reference.

On the decode path the SDK reverses these steps, transparently retrieving from
S3 and decompressing as needed.

The worker, the starter, and the codec server must use the **same** codec and
external storage configuration so each side can read what the other wrote. In
this sample the shared wiring lives in
Comment thread
jmaeagle99 marked this conversation as resolved.
[data_converter.go](./data_converter.go) for the worker and starter, and is
mirrored in [codec-server/main.go](./codec-server/main.go) for the codec
server.

## Codec server

The codec server is built directly on top of the SDK's
`converter.NewPayloadHTTPHandler`, which implements the `/encode`, `/decode`,
and `/download` endpoints with full external storage support. The sample adds
two thin layers around it:

- A **namespace dispatcher** that picks a per-namespace handler by inspecting
the `X-Namespace` header sent by the Temporal Web UI and CLI. Only `"default"`
is configured here, but the same map can host other namespaces with their own
codec chains and storage backends.
- A **CORS middleware** that allows the Web UI origin to call the codec
server.
141 changes: 141 additions & 0 deletions external-storage/codec-server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// codec-server hosts the payload HTTP handler from the Temporal Go SDK so the
// Web UI and CLI can transform external-storage payloads on demand.
//
// Deliberately left out for sample simplicity: authentication (slot a
// middleware between the CORS handler and the dispatcher) and configurable
// listen address. For an example of enabling authentication in a codec
// server, look at ../../codec-server.
package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"time"

externalstorage "github.com/temporalio/samples-go/external-storage"
"go.temporal.io/sdk/converter"
)

const webUIOrigin = "http://localhost:8233"

// newPayloadNamespacesHTTPHandler returns an http.Handler that dispatches each
// request to a per-namespace handler chosen by the X-Namespace header. The
// Temporal Web UI and CLI send that header on every codec server request, so one
// process can host different codec/storage configurations per namespace
// without per-namespace URL prefixes.
func newPayloadNamespacesHTTPHandler(handlers map[string]http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get("X-Namespace")
h, ok := handlers[namespace]
if !ok {
http.NotFound(w, r)
return
}
h.ServeHTTP(w, r)
})
}

// statusRecorder captures the response status code so the logging middleware
// can include it. WriteHeader is only called once per request by the SDK's
// payload handler; subsequent writes go through ResponseWriter directly.
type statusRecorder struct {
http.ResponseWriter
status int
}

func (s *statusRecorder) WriteHeader(code int) {
s.status = code
s.ResponseWriter.WriteHeader(code)
}

// newLoggingHTTPHandler prints a one-line summary of each request: method,
// path, namespace, response status, and how long the handler took.
func newLoggingHTTPHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(rec, r)
log.Printf("%s %s namespace=%q status=%d duration=%s",
r.Method, r.URL.Path, r.Header.Get("X-Namespace"), rec.status, time.Since(start))
})
}

// newCORSHTTPHandler lets the Temporal Web UI call the codec server from its own
// origin. The X-Namespace header is allowlisted so the dispatcher can read it.
func newCORSHTTPHandler(origin string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Origin") == origin {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "POST,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type,X-Namespace")
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
next.ServeHTTP(w, r)
})
}

func main() {
var port int
flag.IntVar(&port, "port", 8081, "Port to listen on")
flag.Parse()

ctx := context.Background()
driver, err := externalstorage.NewS3Driver(ctx)
if err != nil {
log.Fatalf("new s3 driver: %v", err)
}

// Build the payload handler with the same codec + external storage that
// the worker and starter use. PreStorageCodecs runs before storage on
// encode and after retrieval on decode, mirroring what the client-side
// DataConverter does.
defaultNamespaceHandler, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{
PreStorageCodecs: []converter.PayloadCodec{
converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}),
},
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
},
})
if err != nil {
log.Fatalf("new payload handler: %v", err)
}

// Per-namespace map: extend this to host additional namespaces with their
// own codec chain and/or storage backend.
handler := newPayloadNamespacesHTTPHandler(map[string]http.Handler{
"default": defaultNamespaceHandler,
})
handler = newCORSHTTPHandler(webUIOrigin, handler)
handler = newLoggingHTTPHandler(handler)

srv := &http.Server{
Addr: "localhost:" + strconv.Itoa(port),
Handler: handler,
}

errCh := make(chan error, 1)
go func() { errCh <- srv.ListenAndServe() }()

fmt.Printf("Codec server running at http://%s, ctrl+c to exit\n", srv.Addr)

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
select {
case <-sigCh:
_ = srv.Close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a production codec server, we would want to drain current requests/tasks before shutting down right? I don't know if we offer guidance on this or it's just a "yea you should know that" kind of thing. I don't have strong feelings about mentioning that here or not because it might be too in the weeds.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These samples tend to be about how to use features, not necessarily about production best practices. However, in the case of codec servers, it might be warranted since these are our main demonstrations of these types of servers. I can do a separate follow up for this.

case err := <-errCh:
if err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}
}
38 changes: 38 additions & 0 deletions external-storage/data_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package externalstorage

import (
"context"
"fmt"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
)

// NewClient dials Temporal with the data converter and external storage
// configuration shared by every process in this sample (worker, starter,
// codec server). They must all agree on:
//
// - the codec chain that wraps each payload (zlib here), and
// - the external storage driver and threshold that decides when a payload
// is offloaded instead of stored inline.
//
// If any one of them diverges, payloads written by one side will not be
// readable by the other.
func NewClient(ctx context.Context, options client.Options) (client.Client, error) {
driver, err := NewS3Driver(ctx)
if err != nil {
return nil, fmt.Errorf("new s3 driver: %w", err)
}

if options.DataConverter == nil {
options.DataConverter = converter.NewCodecDataConverter(
converter.GetDefaultDataConverter(),
converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}),
)
}
options.ExternalStorage = converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
}

return client.Dial(options)
}
Loading
Loading