diff --git a/codec-server/go.mod b/codec-server/go.mod index cbb5a4cd..60bde66c 100644 --- a/codec-server/go.mod +++ b/codec-server/go.mod @@ -6,7 +6,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 go.temporal.io/api v1.62.12 - go.temporal.io/sdk v1.43.1 + go.temporal.io/sdk v1.44.0 go.temporal.io/server v1.31.0 ) diff --git a/codec-server/go.sum b/codec-server/go.sum index e77a7922..f266a8f2 100644 --- a/codec-server/go.sum +++ b/codec-server/go.sum @@ -272,8 +272,8 @@ go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOV go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= -go.temporal.io/sdk v1.43.1 h1:Cy06+uVZ/MmXBjfNBhC3hNErifEqxrYAif01cjHIaUc= -go.temporal.io/sdk v1.43.1/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= +go.temporal.io/sdk v1.44.0 h1:suitPDukX74rW3/N1FqvEbZTZVJJsxMKhv0KMa/j7pU= +go.temporal.io/sdk v1.44.0/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= go.temporal.io/server v1.31.0 h1:FKLodreaMXUxYc3zr6xxwxtpGz1WH/t7O0IWxV1d1x0= go.temporal.io/server v1.31.0/go.mod h1:MTQAw8uMU3ooSHyg/62JsNu/j8lK34SfKMTXkexYcw8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= diff --git a/go.mod b/go.mod index f7b666c4..710c8157 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( go.opentelemetry.io/otel/sdk v1.42.0 go.opentelemetry.io/otel/trace v1.42.0 go.temporal.io/api v1.62.12 - go.temporal.io/sdk v1.43.1 + go.temporal.io/sdk v1.44.0 go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1 go.temporal.io/sdk/contrib/aws/lambdaworker/otel v0.1.1 go.temporal.io/sdk/contrib/aws/s3driver v0.2.0 diff --git a/go.sum b/go.sum index 29d3957c..4664f9a3 100644 --- a/go.sum +++ b/go.sum @@ -468,8 +468,8 @@ go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/sdk v1.43.1 h1:Cy06+uVZ/MmXBjfNBhC3hNErifEqxrYAif01cjHIaUc= -go.temporal.io/sdk v1.43.1/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= +go.temporal.io/sdk v1.44.0 h1:suitPDukX74rW3/N1FqvEbZTZVJJsxMKhv0KMa/j7pU= +go.temporal.io/sdk v1.44.0/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1 h1:AQBa7CN+EOWhZaf4vr46TfxTZM7yCsvrzAsKqrExxmQ= go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1/go.mod h1:Rgn/tlb4MDNAAjnXKNgHui4IY+MogMCk4Y4c2YA6Dcc= go.temporal.io/sdk/contrib/aws/lambdaworker/otel v0.1.1 h1:sMTtpD5jsb4FeJadYkoOOzb84oWYK2/g0keb3IRO6xY= diff --git a/grpc-proxy/go.mod b/grpc-proxy/go.mod index 67843a4b..dcdd8bb0 100644 --- a/grpc-proxy/go.mod +++ b/grpc-proxy/go.mod @@ -5,7 +5,7 @@ go 1.26.2 require ( github.com/golang/snappy v0.0.4 go.temporal.io/api v1.62.12 - go.temporal.io/sdk v1.43.1 + go.temporal.io/sdk v1.44.0 go.temporal.io/server v1.31.0 google.golang.org/grpc v1.79.3 ) diff --git a/grpc-proxy/go.sum b/grpc-proxy/go.sum index e77a7922..f266a8f2 100644 --- a/grpc-proxy/go.sum +++ b/grpc-proxy/go.sum @@ -272,8 +272,8 @@ go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOV go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.temporal.io/api v1.62.12 h1:627rVnItegQmrszg1bH4vfyc/1uNo5qCereCNkvZefw= go.temporal.io/api v1.62.12/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= -go.temporal.io/sdk v1.43.1 h1:Cy06+uVZ/MmXBjfNBhC3hNErifEqxrYAif01cjHIaUc= -go.temporal.io/sdk v1.43.1/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= +go.temporal.io/sdk v1.44.0 h1:suitPDukX74rW3/N1FqvEbZTZVJJsxMKhv0KMa/j7pU= +go.temporal.io/sdk v1.44.0/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= go.temporal.io/server v1.31.0 h1:FKLodreaMXUxYc3zr6xxwxtpGz1WH/t7O0IWxV1d1x0= go.temporal.io/server v1.31.0/go.mod h1:MTQAw8uMU3ooSHyg/62JsNu/j8lK34SfKMTXkexYcw8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= diff --git a/nexus-standalone-operations/README.md b/nexus-standalone-operations/README.md new file mode 100644 index 00000000..0306e1ee --- /dev/null +++ b/nexus-standalone-operations/README.md @@ -0,0 +1,70 @@ +This sample demonstrates how to use Standalone Nexus Operations (executing Nexus operations directly from client code without wrapping them in a Workflow). +It shows both sync and async (workflow-backed) operations, and how to use the `ListNexusOperations` and `CountNexusOperations` APIs. + +## Note: Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.1-standalone-nexus-operations. + +### Steps to run this sample (with expected output): +1) Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.1-standalone-nexus-operations). (If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.) + +Start the dev server with the dynamic config flags required for standalone Nexus operations: + +```bash +temporal server start-dev \ + --dynamic-config-value "nexusoperation.enableStandalone=true" \ + --dynamic-config-value "history.enableChasmCallbacks=true" +``` + +You should see a line about the CLI, Server and UI versions, and one line each for the Server URL, UI URL and Metrics endpoint. It should look something like this: + +```bash +Temporal CLI 1.7.1-standalone-nexus-operations (Server 1.32.0-155.0, UI 2.49.1) + +Temporal Server: localhost:7233 +Temporal UI: http://localhost:8233 +Temporal Metrics: http://localhost:61951/metrics +``` + +2) Create a Nexus endpoint that routes to the worker's task queue. In a second terminal, run: +```bash +temporal operator nexus endpoint create \ + --name nexus-standalone-operations-endpoint \ + --target-namespace default \ + --target-task-queue nexus-standalone-operations +``` + +1) Then run the following command to start the worker. The worker is a blocking process that runs until it receives a SIGINT (Ctrl + C) command. +```bash +go run nexus-standalone-operations/worker/main.go +``` + +You should see the following log line: +1. Starting the Worker with Namespace `default`, and TaskQueue `nexus-standalone-operations` and it will list the WorkerID for the created worker. + +For example: +```bash +2026/05/21 08:59:49 INFO Started Worker Namespace default TaskQueue nexus-standalone-operations WorkerID 71172 +``` + +> [!NOTE] +> Timestamps and IDs will differ on your machine. + +4) In a third terminal, run the following command to start the example: +```bash +go run nexus-standalone-operations/starter/main.go +``` + +You should see something similar to the following output: + +```bash +2026/05/21 09:00:30 Started Echo operation OperationID nexus-standalone-echo-op +2026/05/21 09:00:30 Echo result: hello +2026/05/21 09:00:30 Started Hello operation OperationID nexus-standalone-hello-op +2026/05/21 09:00:30 Hello result: Hello Temporal 👋 +2026/05/21 09:00:30 ListNexusOperations results: +2026/05/21 09:00:30 OperationID: nexus-standalone-hello-op, Operation: say-hello, Status: Completed +2026/05/21 09:00:30 OperationID: nexus-standalone-echo-op, Operation: echo, Status: Completed +2026/05/21 09:00:30 Total Nexus operations: 2 +``` + +If you run the starter code multiple times, you should see additional `ListNexusOperations` results, as more operations are run. +The same goes for the number from `CountNexusOperations`. diff --git a/nexus-standalone-operations/handler_test.go b/nexus-standalone-operations/handler_test.go new file mode 100644 index 00000000..c4b20ac9 --- /dev/null +++ b/nexus-standalone-operations/handler_test.go @@ -0,0 +1,161 @@ +package nexus_standalone_operations_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/nexus-rpc/sdk-go/nexus" + + nexuspb "go.temporal.io/api/nexus/v1" + operatorservice "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/worker" + + "github.com/temporalio/samples-go/nexus/handler" + "github.com/temporalio/samples-go/nexus/service" +) + +const ( + taskQueue = "nexus-standalone-operations-test" + endpointName = "nexus-standalone-operations-test-endpoint" +) + +func Test_StandaloneNexusOperations_Using_DevServer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Start the dev server with standalone Nexus support. + server, err := testsuite.StartDevServer(ctx, testsuite.DevServerOptions{ + CachedDownload: testsuite.CachedDownload{ + Version: "v1.7.1-standalone-nexus-operations", + }, + ExtraArgs: []string{ + "--dynamic-config-value", "nexusoperation.enableStandalone=true", + "--dynamic-config-value", "history.enableChasmCallbacks=true", + }, + }) + require.NoError(t, err) + defer func() { _ = server.Stop() }() + + c := server.Client() + + // Create a Nexus endpoint targeting our task queue. + _, err = c.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpointName, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: "default", + TaskQueue: taskQueue, + }, + }, + }, + }, + }) + require.NoError(t, err) + + // Register Nexus operations on the worker, reusing the handler from the nexus sample. + w := worker.New(c, taskQueue, worker.Options{}) + + svc := nexus.NewService(service.HelloServiceName) + require.NoError(t, svc.Register(handler.EchoOperation, handler.HelloOperation)) + w.RegisterNexusService(svc) + w.RegisterWorkflow(handler.HelloHandlerWorkflow) + require.NoError(t, w.Start()) + defer w.Stop() + + // Create a standalone NexusClient. + nexusClient, err := c.NewNexusClient(client.NexusClientOptions{ + Endpoint: endpointName, + Service: service.HelloServiceName, + }) + require.NoError(t, err) + + // executeWithRetry retries ExecuteOperation until the endpoint has propagated. + // The endpoint registry is eventually consistent. + executeWithRetry := func( + t *testing.T, + opName string, + input any, + options client.StartNexusOperationOptions, + ) client.NexusOperationHandle { + t.Helper() + var handle client.NexusOperationHandle + require.Eventually(t, func() bool { + var execErr error + handle, execErr = nexusClient.ExecuteOperation(ctx, opName, input, options) + return execErr == nil + }, 10*time.Second, 100*time.Millisecond, "timed out waiting for endpoint to propagate") + return handle + } + + // Test sync operation (Echo). + t.Run("Echo sync operation", func(t *testing.T) { + input := service.EchoInput{Message: "hello-nexus"} + handle := executeWithRetry(t, service.EchoOperationName, input, client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 10 * time.Second, + }) + require.NotEmpty(t, handle.GetID()) + + var result service.EchoOutput + err := handle.Get(ctx, &result) + require.NoError(t, err) + require.Equal(t, "hello-nexus", result.Message) + }) + + // Test async operation (Hello). + t.Run("Hello async operation", func(t *testing.T) { + input := service.HelloInput{Name: "Temporal", Language: service.EN} + handle := executeWithRetry(t, service.HelloOperationName, input, client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 10 * time.Second, + }) + require.NotEmpty(t, handle.GetID()) + + var result service.HelloOutput + err := handle.Get(ctx, &result) + require.NoError(t, err) + require.Equal(t, "Hello Temporal 👋", result.Message) + }) + + // Test ListNexusOperations (on client.Client, not NexusClient). + t.Run("List operations", func(t *testing.T) { + require.Eventually(t, func() bool { + resp, listErr := c.ListNexusOperations(ctx, client.ListNexusOperationsOptions{ + Query: fmt.Sprintf("Endpoint = '%s'", endpointName), + }) + if listErr != nil { + return false + } + count := 0 + for metadata, iterErr := range resp.Results { + if iterErr != nil { + return false + } + if metadata.OperationID == "" || metadata.Endpoint != endpointName { + return false + } + count++ + } + return count > 0 + }, 10*time.Second, 500*time.Millisecond, "timed out waiting for operations to appear in list") + }) + + // Test CountNexusOperations (on client.Client, not NexusClient). + t.Run("Count operations", func(t *testing.T) { + require.Eventually(t, func() bool { + resp, countErr := c.CountNexusOperations(ctx, client.CountNexusOperationsOptions{ + Query: fmt.Sprintf("Endpoint = '%s'", endpointName), + }) + return countErr == nil && resp.Count > 0 + }, 10*time.Second, 500*time.Millisecond, "timed out waiting for count to reflect operations") + }) +} diff --git a/nexus-standalone-operations/starter/main.go b/nexus-standalone-operations/starter/main.go new file mode 100644 index 00000000..dda3e51b --- /dev/null +++ b/nexus-standalone-operations/starter/main.go @@ -0,0 +1,100 @@ +// @@@SNIPSTART samples-go-nexus-standalone-operations-starter +package main + +import ( + "context" + "fmt" + "log" + "time" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/envconfig" + + "github.com/temporalio/samples-go/nexus/service" +) + +// This sample demonstrates standalone Nexus operations — executing Nexus operations +// directly from client code without wrapping them in a workflow. + +const endpointName = "nexus-standalone-operations-endpoint" + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(envconfig.MustLoadDefaultClientOptions()) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + // Create a NexusClient bound to the endpoint and service. + // The endpoint must be pre-created on the server (see README). + nexusClient, err := c.NewNexusClient(client.NexusClientOptions{ + Endpoint: endpointName, + Service: service.HelloServiceName, + }) + if err != nil { + log.Fatalln("Unable to create Nexus client", err) + } + + // Execute the sync Echo operation. + echoHandle, err := nexusClient.ExecuteOperation(context.Background(), service.EchoOperationName, service.EchoInput{Message: "hello"}, client.StartNexusOperationOptions{ + ID: "nexus-standalone-echo-op", + ScheduleToCloseTimeout: 10 * time.Second, + }) + if err != nil { + log.Fatalln("Unable to execute Echo operation", err) + } + log.Println("Started Echo operation", "OperationID", echoHandle.GetID()) + + var echoResult service.EchoOutput + err = echoHandle.Get(context.Background(), &echoResult) + if err != nil { + log.Fatalln("Unable to get Echo operation result", err) + } + log.Println("Echo result:", echoResult.Message) + + // Execute the async (workflow-backed) Hello operation. + helloHandle, err := nexusClient.ExecuteOperation(context.Background(), service.HelloOperationName, service.HelloInput{Name: "Temporal", Language: service.EN}, client.StartNexusOperationOptions{ + ID: "nexus-standalone-hello-op", + ScheduleToCloseTimeout: 10 * time.Second, + }) + if err != nil { + log.Fatalln("Unable to execute Hello operation", err) + } + log.Println("Started Hello operation", "OperationID", helloHandle.GetID()) + + var helloResult service.HelloOutput + err = helloHandle.Get(context.Background(), &helloResult) + if err != nil { + log.Fatalln("Unable to get Hello operation result", err) + } + log.Println("Hello result:", helloResult.Message) + + // List Nexus operations using client.Client (not client.NexusClient). + listResp, err := c.ListNexusOperations(context.Background(), client.ListNexusOperationsOptions{ + Query: fmt.Sprintf("Endpoint = '%s'", endpointName), + }) + if err != nil { + log.Fatalln("Unable to list Nexus operations", err) + } + + log.Println("ListNexusOperations results:") + for metadata, err := range listResp.Results { + if err != nil { + log.Fatalln("Error iterating operations", err) + } + log.Printf("\tOperationID: %s, Operation: %s, Status: %v\n", + metadata.OperationID, metadata.Operation, metadata.Status) + } + + // Count Nexus operations using the base client (not NexusClient). + countResp, err := c.CountNexusOperations(context.Background(), client.CountNexusOperationsOptions{ + Query: fmt.Sprintf("Endpoint = '%s'", endpointName), + }) + if err != nil { + log.Fatalln("Unable to count Nexus operations", err) + } + log.Println("Total Nexus operations:", countResp.Count) +} + +// @@@SNIPEND diff --git a/nexus-standalone-operations/worker/main.go b/nexus-standalone-operations/worker/main.go new file mode 100644 index 00000000..687227d9 --- /dev/null +++ b/nexus-standalone-operations/worker/main.go @@ -0,0 +1,43 @@ +// @@@SNIPSTART samples-go-nexus-standalone-operations-worker +package main + +import ( + "log" + + "github.com/nexus-rpc/sdk-go/nexus" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/envconfig" + "go.temporal.io/sdk/worker" + + "github.com/temporalio/samples-go/nexus/handler" + "github.com/temporalio/samples-go/nexus/service" +) + +const taskQueue = "nexus-standalone-operations" + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(envconfig.MustLoadDefaultClientOptions()) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, taskQueue, worker.Options{}) + + svc := nexus.NewService(service.HelloServiceName) + err = svc.Register(handler.EchoOperation, handler.HelloOperation) + if err != nil { + log.Fatalln("Unable to register operations", err) + } + w.RegisterNexusService(svc) + w.RegisterWorkflow(handler.HelloHandlerWorkflow) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} + +// @@@SNIPEND