-
Notifications
You must be signed in to change notification settings - Fork 238
External storage and codec server sample #474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| # External Storage Sample | ||
|
|
||
| This sample demonstrates how to offload large workflow payloads to S3-compatible | ||
| object storage using the Temporal Go SDK's built-in `ExternalStorage` system, | ||
| combined with the SDK's zlib `PayloadCodec` so the payloads stored both inline | ||
| in Temporal and in S3 are compressed. | ||
|
|
||
| **Scenario:** A fulfillment center processes batches of shipping orders. The | ||
| workflow receives a small request (a batch ID and order count), then internally | ||
| calls a `FetchOrders` activity that returns the full list of orders with | ||
| customer records, line items, and handling notes. That list — several hundred | ||
| kilobytes even after compression — is passed to a second `ProcessOrders` | ||
| activity. Finally the workflow returns a small `BatchSummary` with totals. | ||
|
|
||
| Each payload is first compressed by the SDK's `NewZlibCodec`. The SDK then | ||
| checks the compressed size against the default 256 KiB threshold; payloads | ||
| still above it are stored in S3 and replaced inline with compact claim-check | ||
| references. The workflow's own input (`OrderBatchRequest`) and result | ||
| (`BatchSummary`) compress to a few hundred bytes and remain inline. | ||
|
|
||
| A mock S3 service ([s3-mock](./s3-mock)) is included so you can run the sample | ||
| locally without an AWS account or Docker. A codec server | ||
| ([codec-server](./codec-server)) is included to retrieve and decompress payloads | ||
| on demand for the Temporal Web UI. | ||
|
|
||
| ## Steps to run this sample | ||
|
|
||
| 1. Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). | ||
|
jmaeagle99 marked this conversation as resolved.
|
||
| 2. In a separate terminal, run the mock S3 server. It listens on `:5000` and | ||
| creates the `temporal-payloads` bucket. Leave it running. | ||
| ``` | ||
| go run ./external-storage/s3-mock | ||
| ``` | ||
| 3. In a separate terminal, run the worker: | ||
| ``` | ||
| go run ./external-storage/worker | ||
| ``` | ||
| 4. In a separate terminal, run the starter: | ||
| ``` | ||
| go run ./external-storage/starter | ||
| ``` | ||
| Example output: | ||
| ``` | ||
| Starting workflow external-storage-20260515-120000 (batch_id=BATCH-20260515-120000, order_count=200) | ||
|
|
||
| Batch BATCH-20260515-120000: 200 orders processed | ||
| Total shipping cost: $28512.40 | ||
| Total weight: 19684.2 kg | ||
| Avg delivery: 4.4 days | ||
| ``` | ||
| 5. (Optional) Run the codec server in a fourth terminal: | ||
|
jmaeagle99 marked this conversation as resolved.
|
||
| ``` | ||
| go run ./external-storage/codec-server | ||
| ``` | ||
| In the Temporal Web UI (http://localhost:8233), open Settings → Data Encoder | ||
| and set the Remote Codec Endpoint to `http://localhost:8081`. Reload the | ||
| workflow page; the inline compressed payloads will be shown as readable | ||
| JSON, and externally-stored payloads can be downloaded to fetch their | ||
| actual content from the mock S3. | ||
|
|
||
| The Web UI sends the namespace as the `X-Namespace` header on each request, | ||
| so multi-namespace setups can dispatch by reading that header. | ||
|
|
||
| | Endpoint | Behavior | | ||
| | --- | --- | | ||
| | `POST /encode` | Compress the payload, then offload to S3 if it exceeds the threshold. | | ||
| | `POST /decode` | Retrieve any external storage references from S3, then decompress. Pass `?preserveStorageRefs=true` to leave references as-is. | | ||
| | `POST /download` | All inputs must be storage references. Retrieves them from S3 and decompresses. | | ||
| 6. Run `temporal workflow show` to see how payloads are stored: | ||
| ``` | ||
| temporal workflow show --workflow-id external-storage-<timestamp> | ||
|
jmaeagle99 marked this conversation as resolved.
|
||
| ``` | ||
| The workflow's input (`OrderBatchRequest`) and result (`BatchSummary`) are | ||
| zlib-encoded and stored inline in Temporal — small enough to compress to a | ||
| few hundred bytes. The two activity payloads carrying the order list — the | ||
| output of `FetchOrders` and the input to `ProcessOrders` — exceed 256 KiB | ||
| even after compression, so they appear as external-storage references, | ||
| confirming the SDK offloaded them to S3. | ||
|
|
||
| ## How it works | ||
|
|
||
| The client's `DataConverter` is wrapped with the SDK's zlib codec, and | ||
| `client.Options.ExternalStorage` is set with the SDK's S3 driver | ||
| (`go.temporal.io/sdk/contrib/aws/s3driver`): | ||
|
|
||
| ```go | ||
| driver, _ := s3driver.NewDriver(s3driver.Options{ | ||
| Client: awssdkv2.NewClient(s3Client), | ||
| Bucket: s3driver.StaticBucket(externalstorage.S3Bucket), | ||
| }) | ||
| client.Dial(client.Options{ | ||
| DataConverter: converter.NewCodecDataConverter( | ||
| converter.GetDefaultDataConverter(), | ||
| converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}), | ||
| ), | ||
| ExternalStorage: converter.ExternalStorage{ | ||
| Drivers: []converter.StorageDriver{driver}, | ||
| }, | ||
| }) | ||
| ``` | ||
|
|
||
| On the encode path the SDK: | ||
|
jmaeagle99 marked this conversation as resolved.
|
||
|
|
||
| 1. Serializes the Go value to a `Payload`. | ||
| 2. Runs the zlib codec to compress the payload bytes. | ||
| 3. Checks the compressed size against `PayloadSizeThreshold` (default: 256 KiB). | ||
| 4. If still above the threshold, stores the compressed bytes in S3 via | ||
| the SDK's `s3driver` and replaces the inline payload with a claim-check | ||
| reference. | ||
|
|
||
| On the decode path the SDK reverses these steps, transparently retrieving from | ||
| S3 and decompressing as needed. | ||
|
|
||
| The worker, the starter, and the codec server must use the **same** codec and | ||
| external storage configuration so each side can read what the other wrote. In | ||
| this sample the shared wiring lives in | ||
|
jmaeagle99 marked this conversation as resolved.
|
||
| [data_converter.go](./data_converter.go) for the worker and starter, and is | ||
| mirrored in [codec-server/main.go](./codec-server/main.go) for the codec | ||
| server. | ||
|
|
||
| ## Codec server | ||
|
|
||
| The codec server is built directly on top of the SDK's | ||
| `converter.NewPayloadHTTPHandler`, which implements the `/encode`, `/decode`, | ||
| and `/download` endpoints with full external storage support. The sample adds | ||
| two thin layers around it: | ||
|
|
||
| - A **namespace dispatcher** that picks a per-namespace handler by inspecting | ||
| the `X-Namespace` header sent by the Temporal Web UI and CLI. Only `"default"` | ||
| is configured here, but the same map can host other namespaces with their own | ||
| codec chains and storage backends. | ||
| - A **CORS middleware** that allows the Web UI origin to call the codec | ||
| server. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| // codec-server hosts the payload HTTP handler from the Temporal Go SDK so the | ||
| // Web UI and CLI can transform external-storage payloads on demand. | ||
| // | ||
| // Deliberately left out for sample simplicity: authentication (slot a | ||
| // middleware between the CORS handler and the dispatcher) and configurable | ||
| // listen address. For an example of enabling authentication in a codec | ||
| // server, look at ../../codec-server. | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "flag" | ||
| "fmt" | ||
| "log" | ||
| "net/http" | ||
| "os" | ||
| "os/signal" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| externalstorage "github.com/temporalio/samples-go/external-storage" | ||
| "go.temporal.io/sdk/converter" | ||
| ) | ||
|
|
||
| const webUIOrigin = "http://localhost:8233" | ||
|
|
||
| // newPayloadNamespacesHTTPHandler returns an http.Handler that dispatches each | ||
| // request to a per-namespace handler chosen by the X-Namespace header. The | ||
| // Temporal Web UI and CLI send that header on every codec server request, so one | ||
| // process can host different codec/storage configurations per namespace | ||
| // without per-namespace URL prefixes. | ||
| func newPayloadNamespacesHTTPHandler(handlers map[string]http.Handler) http.Handler { | ||
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| namespace := r.Header.Get("X-Namespace") | ||
| h, ok := handlers[namespace] | ||
| if !ok { | ||
| http.NotFound(w, r) | ||
| return | ||
| } | ||
| h.ServeHTTP(w, r) | ||
| }) | ||
| } | ||
|
|
||
| // statusRecorder captures the response status code so the logging middleware | ||
| // can include it. WriteHeader is only called once per request by the SDK's | ||
| // payload handler; subsequent writes go through ResponseWriter directly. | ||
| type statusRecorder struct { | ||
| http.ResponseWriter | ||
| status int | ||
| } | ||
|
|
||
| func (s *statusRecorder) WriteHeader(code int) { | ||
| s.status = code | ||
| s.ResponseWriter.WriteHeader(code) | ||
| } | ||
|
|
||
| // newLoggingHTTPHandler prints a one-line summary of each request: method, | ||
| // path, namespace, response status, and how long the handler took. | ||
| func newLoggingHTTPHandler(next http.Handler) http.Handler { | ||
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| start := time.Now() | ||
| rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK} | ||
| next.ServeHTTP(rec, r) | ||
| log.Printf("%s %s namespace=%q status=%d duration=%s", | ||
| r.Method, r.URL.Path, r.Header.Get("X-Namespace"), rec.status, time.Since(start)) | ||
| }) | ||
| } | ||
|
|
||
| // newCORSHTTPHandler lets the Temporal Web UI call the codec server from its own | ||
| // origin. The X-Namespace header is allowlisted so the dispatcher can read it. | ||
| func newCORSHTTPHandler(origin string, next http.Handler) http.Handler { | ||
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| if r.Header.Get("Origin") == origin { | ||
| w.Header().Set("Access-Control-Allow-Origin", origin) | ||
| w.Header().Set("Access-Control-Allow-Methods", "POST,OPTIONS") | ||
| w.Header().Set("Access-Control-Allow-Headers", "Content-Type,X-Namespace") | ||
| } | ||
| if r.Method == http.MethodOptions { | ||
| w.WriteHeader(http.StatusOK) | ||
| return | ||
| } | ||
| next.ServeHTTP(w, r) | ||
| }) | ||
| } | ||
|
|
||
| func main() { | ||
| var port int | ||
| flag.IntVar(&port, "port", 8081, "Port to listen on") | ||
| flag.Parse() | ||
|
|
||
| ctx := context.Background() | ||
| driver, err := externalstorage.NewS3Driver(ctx) | ||
| if err != nil { | ||
| log.Fatalf("new s3 driver: %v", err) | ||
| } | ||
|
|
||
| // Build the payload handler with the same codec + external storage that | ||
| // the worker and starter use. PreStorageCodecs runs before storage on | ||
| // encode and after retrieval on decode, mirroring what the client-side | ||
| // DataConverter does. | ||
| defaultNamespaceHandler, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{ | ||
| PreStorageCodecs: []converter.PayloadCodec{ | ||
| converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}), | ||
| }, | ||
| ExternalStorage: converter.ExternalStorage{ | ||
| Drivers: []converter.StorageDriver{driver}, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| log.Fatalf("new payload handler: %v", err) | ||
| } | ||
|
|
||
| // Per-namespace map: extend this to host additional namespaces with their | ||
| // own codec chain and/or storage backend. | ||
| handler := newPayloadNamespacesHTTPHandler(map[string]http.Handler{ | ||
| "default": defaultNamespaceHandler, | ||
| }) | ||
| handler = newCORSHTTPHandler(webUIOrigin, handler) | ||
| handler = newLoggingHTTPHandler(handler) | ||
|
|
||
| srv := &http.Server{ | ||
| Addr: "localhost:" + strconv.Itoa(port), | ||
| Handler: handler, | ||
| } | ||
|
|
||
| errCh := make(chan error, 1) | ||
| go func() { errCh <- srv.ListenAndServe() }() | ||
|
|
||
| fmt.Printf("Codec server running at http://%s, ctrl+c to exit\n", srv.Addr) | ||
|
|
||
| sigCh := make(chan os.Signal, 1) | ||
| signal.Notify(sigCh, os.Interrupt) | ||
| select { | ||
| case <-sigCh: | ||
| _ = srv.Close() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a production codec server, we would want to drain current requests/tasks before shutting down right? I don't know if we offer guidance on this or it's just a "yea you should know that" kind of thing. I don't have strong feelings about mentioning that here or not because it might be too in the weeds.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These samples tend to be about how to use features, not necessarily about production best practices. However, in the case of codec servers, it might be warranted since these are our main demonstrations of these types of servers. I can do a separate follow up for this. |
||
| case err := <-errCh: | ||
| if err != nil && err != http.ErrServerClosed { | ||
| log.Fatal(err) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package externalstorage | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "go.temporal.io/sdk/client" | ||
| "go.temporal.io/sdk/converter" | ||
| ) | ||
|
|
||
| // NewClient dials Temporal with the data converter and external storage | ||
| // configuration shared by every process in this sample (worker, starter, | ||
| // codec server). They must all agree on: | ||
| // | ||
| // - the codec chain that wraps each payload (zlib here), and | ||
| // - the external storage driver and threshold that decides when a payload | ||
| // is offloaded instead of stored inline. | ||
| // | ||
| // If any one of them diverges, payloads written by one side will not be | ||
| // readable by the other. | ||
| func NewClient(ctx context.Context, options client.Options) (client.Client, error) { | ||
| driver, err := NewS3Driver(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("new s3 driver: %w", err) | ||
| } | ||
|
|
||
| if options.DataConverter == nil { | ||
| options.DataConverter = converter.NewCodecDataConverter( | ||
| converter.GetDefaultDataConverter(), | ||
| converter.NewZlibCodec(converter.ZlibCodecOptions{AlwaysEncode: true}), | ||
| ) | ||
| } | ||
| options.ExternalStorage = converter.ExternalStorage{ | ||
| Drivers: []converter.StorageDriver{driver}, | ||
| } | ||
|
|
||
| return client.Dial(options) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.