-
Notifications
You must be signed in to change notification settings - Fork 238
Add sample for stand alone nexus operations #456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| This sample demonstrates how to use Standalone Nexus Operations (executing Nexus operations directly from client code without wrapping them in a Workflow). | ||
| It shows both sync and async (workflow-backed) operations, and how to use the `ListNexusOperations` and `CountNexusOperations` APIs. | ||
|
|
||
| ## Note: Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.1-standalone-nexus-operations. | ||
|
|
||
| ### Steps to run this sample (with expected output): | ||
| 1) Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.1-standalone-nexus-operations). (If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.) | ||
|
|
||
| Start the dev server with the dynamic config flags required for standalone Nexus operations: | ||
|
|
||
| ```bash | ||
| temporal server start-dev \ | ||
| --dynamic-config-value "nexusoperation.enableStandalone=true" \ | ||
| --dynamic-config-value "history.enableChasmCallbacks=true" | ||
| ``` | ||
|
|
||
| You should see a line about the CLI, Server and UI versions, and one line each for the Server URL, UI URL and Metrics endpoint. It should look something like this: | ||
|
|
||
| ```bash | ||
| Temporal CLI 1.7.1-standalone-nexus-operations (Server 1.32.0-155.0, UI 2.49.1) | ||
|
|
||
| Temporal Server: localhost:7233 | ||
| Temporal UI: http://localhost:8233 | ||
| Temporal Metrics: http://localhost:61951/metrics | ||
| ``` | ||
|
|
||
| 2) Create a Nexus endpoint that routes to the worker's task queue. In a second terminal, run: | ||
| ```bash | ||
| temporal operator nexus endpoint create \ | ||
| --name nexus-standalone-operations-endpoint \ | ||
| --target-namespace default \ | ||
| --target-task-queue nexus-standalone-operations | ||
| ``` | ||
|
|
||
| 1) Then run the following command to start the worker. The worker is a blocking process that runs until it receives a SIGINT (Ctrl + C) command. | ||
| ```bash | ||
| go run nexus-standalone-operations/worker/main.go | ||
| ``` | ||
|
|
||
| You should see the following log line: | ||
| 1. Starting the Worker with Namespace `default`, and TaskQueue `nexus-standalone-operations` and it will list the WorkerID for the created worker. | ||
|
|
||
| For example: | ||
| ```bash | ||
| 2026/05/21 08:59:49 INFO Started Worker Namespace default TaskQueue nexus-standalone-operations WorkerID 71172 | ||
| ``` | ||
|
|
||
| > [!NOTE] | ||
| > Timestamps and IDs will differ on your machine. | ||
|
|
||
| 4) In a third terminal, run the following command to start the example: | ||
| ```bash | ||
| go run nexus-standalone-operations/starter/main.go | ||
| ``` | ||
|
|
||
| You should see something similar to the following output: | ||
|
|
||
| ```bash | ||
| 2026/05/21 09:00:30 Started Echo operation OperationID nexus-standalone-echo-op | ||
| 2026/05/21 09:00:30 Echo result: hello | ||
| 2026/05/21 09:00:30 Started Hello operation OperationID nexus-standalone-hello-op | ||
| 2026/05/21 09:00:30 Hello result: Hello Temporal 👋 | ||
| 2026/05/21 09:00:30 ListNexusOperations results: | ||
| 2026/05/21 09:00:30 OperationID: nexus-standalone-hello-op, Operation: say-hello, Status: Completed | ||
| 2026/05/21 09:00:30 OperationID: nexus-standalone-echo-op, Operation: echo, Status: Completed | ||
| 2026/05/21 09:00:30 Total Nexus operations: 2 | ||
| ``` | ||
|
|
||
| If you run the starter code multiple times, you should see additional `ListNexusOperations` results, as more operations are run. | ||
| The same goes for the number from `CountNexusOperations`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| package nexus_standalone_operations_test | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/google/uuid" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/nexus-rpc/sdk-go/nexus" | ||
|
|
||
| nexuspb "go.temporal.io/api/nexus/v1" | ||
| operatorservice "go.temporal.io/api/operatorservice/v1" | ||
| "go.temporal.io/sdk/client" | ||
| "go.temporal.io/sdk/testsuite" | ||
| "go.temporal.io/sdk/worker" | ||
|
|
||
| "github.com/temporalio/samples-go/nexus/handler" | ||
| "github.com/temporalio/samples-go/nexus/service" | ||
| ) | ||
|
|
||
| const ( | ||
| taskQueue = "nexus-standalone-operations-test" | ||
| endpointName = "nexus-standalone-operations-test-endpoint" | ||
| ) | ||
|
|
||
| func Test_StandaloneNexusOperations_Using_DevServer(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | ||
| defer cancel() | ||
|
|
||
| // Start the dev server with standalone Nexus support. | ||
| server, err := testsuite.StartDevServer(ctx, testsuite.DevServerOptions{ | ||
| CachedDownload: testsuite.CachedDownload{ | ||
| Version: "v1.7.1-standalone-nexus-operations", | ||
| }, | ||
| ExtraArgs: []string{ | ||
| "--dynamic-config-value", "nexusoperation.enableStandalone=true", | ||
| "--dynamic-config-value", "history.enableChasmCallbacks=true", | ||
| }, | ||
| }) | ||
| require.NoError(t, err) | ||
| defer func() { _ = server.Stop() }() | ||
|
|
||
| c := server.Client() | ||
|
|
||
| // Create a Nexus endpoint targeting our task queue. | ||
| _, err = c.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ | ||
| Spec: &nexuspb.EndpointSpec{ | ||
| Name: endpointName, | ||
| Target: &nexuspb.EndpointTarget{ | ||
| Variant: &nexuspb.EndpointTarget_Worker_{ | ||
| Worker: &nexuspb.EndpointTarget_Worker{ | ||
| Namespace: "default", | ||
| TaskQueue: taskQueue, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| // Register Nexus operations on the worker, reusing the handler from the nexus sample. | ||
| w := worker.New(c, taskQueue, worker.Options{}) | ||
|
|
||
| svc := nexus.NewService(service.HelloServiceName) | ||
| require.NoError(t, svc.Register(handler.EchoOperation, handler.HelloOperation)) | ||
| w.RegisterNexusService(svc) | ||
| w.RegisterWorkflow(handler.HelloHandlerWorkflow) | ||
| require.NoError(t, w.Start()) | ||
| defer w.Stop() | ||
|
|
||
| // Create a standalone NexusClient. | ||
| nexusClient, err := c.NewNexusClient(client.NexusClientOptions{ | ||
| Endpoint: endpointName, | ||
| Service: service.HelloServiceName, | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| // executeWithRetry retries ExecuteOperation until the endpoint has propagated. | ||
| // The endpoint registry is eventually consistent. | ||
| executeWithRetry := func( | ||
| t *testing.T, | ||
| opName string, | ||
| input any, | ||
| options client.StartNexusOperationOptions, | ||
| ) client.NexusOperationHandle { | ||
| t.Helper() | ||
| var handle client.NexusOperationHandle | ||
| require.Eventually(t, func() bool { | ||
| var execErr error | ||
| handle, execErr = nexusClient.ExecuteOperation(ctx, opName, input, options) | ||
| return execErr == nil | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should fix the need to retry for tests before we launch SANO. It's going to be annoying to use. |
||
| }, 10*time.Second, 100*time.Millisecond, "timed out waiting for endpoint to propagate") | ||
| return handle | ||
| } | ||
|
|
||
| // Test sync operation (Echo). | ||
| t.Run("Echo sync operation", func(t *testing.T) { | ||
| input := service.EchoInput{Message: "hello-nexus"} | ||
| handle := executeWithRetry(t, service.EchoOperationName, input, client.StartNexusOperationOptions{ | ||
| ID: uuid.NewString(), | ||
| ScheduleToCloseTimeout: 10 * time.Second, | ||
| }) | ||
| require.NotEmpty(t, handle.GetID()) | ||
|
|
||
| var result service.EchoOutput | ||
| err := handle.Get(ctx, &result) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "hello-nexus", result.Message) | ||
| }) | ||
|
|
||
| // Test async operation (Hello). | ||
| t.Run("Hello async operation", func(t *testing.T) { | ||
| input := service.HelloInput{Name: "Temporal", Language: service.EN} | ||
| handle := executeWithRetry(t, service.HelloOperationName, input, client.StartNexusOperationOptions{ | ||
| ID: uuid.NewString(), | ||
| ScheduleToCloseTimeout: 10 * time.Second, | ||
| }) | ||
| require.NotEmpty(t, handle.GetID()) | ||
|
|
||
| var result service.HelloOutput | ||
| err := handle.Get(ctx, &result) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "Hello Temporal 👋", result.Message) | ||
| }) | ||
|
|
||
| // Test ListNexusOperations (on client.Client, not NexusClient). | ||
| t.Run("List operations", func(t *testing.T) { | ||
| require.Eventually(t, func() bool { | ||
| resp, listErr := c.ListNexusOperations(ctx, client.ListNexusOperationsOptions{ | ||
| Query: fmt.Sprintf("Endpoint = '%s'", endpointName), | ||
| }) | ||
| if listErr != nil { | ||
| return false | ||
| } | ||
| count := 0 | ||
| for metadata, iterErr := range resp.Results { | ||
| if iterErr != nil { | ||
| return false | ||
| } | ||
| if metadata.OperationID == "" || metadata.Endpoint != endpointName { | ||
| return false | ||
| } | ||
| count++ | ||
| } | ||
| return count > 0 | ||
| }, 10*time.Second, 500*time.Millisecond, "timed out waiting for operations to appear in list") | ||
| }) | ||
|
|
||
| // Test CountNexusOperations (on client.Client, not NexusClient). | ||
| t.Run("Count operations", func(t *testing.T) { | ||
| require.Eventually(t, func() bool { | ||
| resp, countErr := c.CountNexusOperations(ctx, client.CountNexusOperationsOptions{ | ||
| Query: fmt.Sprintf("Endpoint = '%s'", endpointName), | ||
| }) | ||
| return countErr == nil && resp.Count > 0 | ||
| }, 10*time.Second, 500*time.Millisecond, "timed out waiting for count to reflect operations") | ||
| }) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| // @@@SNIPSTART samples-go-nexus-standalone-operations-starter | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log" | ||
| "time" | ||
|
|
||
| "go.temporal.io/sdk/client" | ||
| "go.temporal.io/sdk/contrib/envconfig" | ||
|
|
||
| "github.com/temporalio/samples-go/nexus/service" | ||
| ) | ||
|
|
||
| // This sample demonstrates standalone Nexus operations — executing Nexus operations | ||
| // directly from client code without wrapping them in a workflow. | ||
|
|
||
| const endpointName = "nexus-standalone-operations-endpoint" | ||
|
|
||
| func main() { | ||
| // The client is a heavyweight object that should be created once per process. | ||
| c, err := client.Dial(envconfig.MustLoadDefaultClientOptions()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other Nexus samples are using "options.ParseClientOptionFlags(os.Args[1:])" before client.Dial, should this as well for consistent behavior?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah bit on the fence about this since the |
||
| if err != nil { | ||
| log.Fatalln("Unable to create client", err) | ||
| } | ||
| defer c.Close() | ||
|
|
||
| // Create a NexusClient bound to the endpoint and service. | ||
| // The endpoint must be pre-created on the server (see README). | ||
| nexusClient, err := c.NewNexusClient(client.NexusClientOptions{ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an odd one when looking at other Nexus sample code in here. I went to the equivalent spot and there is no NewNexusClient, so I thought maybe that's new? But then I found it in workflows.go at the top level, which isn't in this sample. Short version - I am reading the code to see what is different, and so get thrown off - and mostly I was reading this to learn how to use Nexus for standalone operations, and I can't tell what makes this standalone as opposed to self hosted!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other Nexus samples starter code is starting a workflow not a Nexus operation, the comparison isn't between standalone and self hosted, the comparison is between inside a workflow and outside a workflow |
||
| Endpoint: endpointName, | ||
| Service: service.HelloServiceName, | ||
| }) | ||
| if err != nil { | ||
| log.Fatalln("Unable to create Nexus client", err) | ||
| } | ||
|
|
||
| // Execute the sync Echo operation. | ||
| echoHandle, err := nexusClient.ExecuteOperation(context.Background(), service.EchoOperationName, service.EchoInput{Message: "hello"}, client.StartNexusOperationOptions{ | ||
| ID: "nexus-standalone-echo-op", | ||
| ScheduleToCloseTimeout: 10 * time.Second, | ||
| }) | ||
| if err != nil { | ||
| log.Fatalln("Unable to execute Echo operation", err) | ||
| } | ||
| log.Println("Started Echo operation", "OperationID", echoHandle.GetID()) | ||
|
|
||
| var echoResult service.EchoOutput | ||
| err = echoHandle.Get(context.Background(), &echoResult) | ||
| if err != nil { | ||
| log.Fatalln("Unable to get Echo operation result", err) | ||
| } | ||
| log.Println("Echo result:", echoResult.Message) | ||
|
|
||
| // Execute the async (workflow-backed) Hello operation. | ||
| helloHandle, err := nexusClient.ExecuteOperation(context.Background(), service.HelloOperationName, service.HelloInput{Name: "Temporal", Language: service.EN}, client.StartNexusOperationOptions{ | ||
| ID: "nexus-standalone-hello-op", | ||
| ScheduleToCloseTimeout: 10 * time.Second, | ||
| }) | ||
| if err != nil { | ||
| log.Fatalln("Unable to execute Hello operation", err) | ||
| } | ||
| log.Println("Started Hello operation", "OperationID", helloHandle.GetID()) | ||
|
|
||
| var helloResult service.HelloOutput | ||
| err = helloHandle.Get(context.Background(), &helloResult) | ||
| if err != nil { | ||
| log.Fatalln("Unable to get Hello operation result", err) | ||
| } | ||
| log.Println("Hello result:", helloResult.Message) | ||
|
|
||
| // List Nexus operations using client.Client (not client.NexusClient). | ||
| listResp, err := c.ListNexusOperations(context.Background(), client.ListNexusOperationsOptions{ | ||
| Query: fmt.Sprintf("Endpoint = '%s'", endpointName), | ||
| }) | ||
| if err != nil { | ||
| log.Fatalln("Unable to list Nexus operations", err) | ||
| } | ||
|
|
||
| log.Println("ListNexusOperations results:") | ||
| for metadata, err := range listResp.Results { | ||
| if err != nil { | ||
| log.Fatalln("Error iterating operations", err) | ||
| } | ||
| log.Printf("\tOperationID: %s, Operation: %s, Status: %v\n", | ||
| metadata.OperationID, metadata.Operation, metadata.Status) | ||
| } | ||
|
|
||
| // Count Nexus operations using the base client (not NexusClient). | ||
| countResp, err := c.CountNexusOperations(context.Background(), client.CountNexusOperationsOptions{ | ||
| Query: fmt.Sprintf("Endpoint = '%s'", endpointName), | ||
| }) | ||
| if err != nil { | ||
| log.Fatalln("Unable to count Nexus operations", err) | ||
| } | ||
| log.Println("Total Nexus operations:", countResp.Count) | ||
| } | ||
|
|
||
| // @@@SNIPEND | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to have a link to an issue or some other way to track when we can remove this once the server side changes are in.