diff --git a/src/Temporalio/Nexus/ITemporalNexusClient.cs b/src/Temporalio/Nexus/ITemporalNexusClient.cs new file mode 100644 index 00000000..21279f00 --- /dev/null +++ b/src/Temporalio/Nexus/ITemporalNexusClient.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading.Tasks; +using NexusRpc.Handlers; +using Temporalio.Client; + +namespace Temporalio.Nexus +{ + /// + /// Nexus-aware client wrapping the Temporal client. Provides methods for starting workflows + /// from within a Nexus operation handler. + /// + /// + /// WARNING: Nexus support is experimental. + /// Obtained via the + /// start function parameter. + /// Example usage — starting a workflow from an operation handler: + /// + /// await client.StartWorkflowAsync<MyWorkflow, MyResult>( + /// wf => wf.RunAsync(input), + /// new(id: "my-workflow-id", taskQueue: "my-task-queue")); + /// + /// To perform a synchronous operation (e.g., sending a signal), use the underlying + /// and return a sync result: + /// + /// await client.TemporalClient + /// .GetWorkflowHandle($"order-{input.OrderId}") + /// .SignalAsync("requestCancellation", new[] { input }); + /// return TemporalOperationResult<NoValue>.Sync(default); + /// + /// + public interface ITemporalNexusClient + { + /// + /// Gets the underlying Temporal client for advanced use cases such as sending signals + /// or queries. + /// + ITemporalClient TemporalClient { get; } + + /// + /// Start a workflow via a lambda invoking the run method. Always returns an async result + /// with a workflow-run operation token. + /// + /// Workflow class type. + /// Workflow result type. + /// Invocation of workflow run method with a result. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + Task> StartWorkflowAsync( + Expression>> workflowRunCall, WorkflowOptions options); + + /// + /// Start a workflow via a lambda invoking the run method with no return value. Always + /// returns an async result with a workflow-run operation token. + /// + /// Workflow class type. + /// Invocation of workflow run method with no result. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + Task> StartWorkflowAsync( + Expression> workflowRunCall, WorkflowOptions options); + + /// + /// Start a workflow by name. Always returns an async result with a workflow-run operation + /// token. + /// + /// Workflow result type. + /// Workflow type name. + /// Arguments for the workflow. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + Task> StartWorkflowAsync( + string workflow, IReadOnlyCollection args, WorkflowOptions options); + } +} diff --git a/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs b/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs index 1e8a3e0c..c6274622 100644 --- a/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs +++ b/src/Temporalio/Nexus/NexusWorkflowRunHandle.cs @@ -86,6 +86,19 @@ internal static byte[] Base64UrlDecode(string s) /// Created handle. /// If the token is invalid. internal static NexusWorkflowRunHandle FromToken(string token) + { + var data = ParseToken(token); + return new(data.Namespace, data.WorkflowId, data.Version ?? 0); + } + + /// + /// Parse an operation token to its underlying fields. Validates encoding, JSON shape, and + /// version (but not type — callers decide which token types they support). + /// + /// Base64url-encoded token string. + /// Parsed token fields. + /// If the token is invalid. + internal static OperationToken ParseToken(string token) { byte[] bytes; try @@ -96,10 +109,10 @@ internal static NexusWorkflowRunHandle FromToken(string token) { throw new ArgumentException("Token invalid"); } - Token? tokenObj; + OperationToken? tokenObj; try { - tokenObj = JsonSerializer.Deserialize(bytes, TokenSerializerOptions); + tokenObj = JsonSerializer.Deserialize(bytes, TokenSerializerOptions); } catch (JsonException e) { @@ -113,7 +126,7 @@ internal static NexusWorkflowRunHandle FromToken(string token) { throw new ArgumentException($"Unsupported token version: {tokenObj.Version}"); } - return new(tokenObj.Namespace, tokenObj.WorkflowId, tokenObj.Version ?? 0); + return tokenObj; } /// @@ -121,10 +134,13 @@ internal static NexusWorkflowRunHandle FromToken(string token) /// /// Operation token. internal string ToToken() => Base64UrlEncode(JsonSerializer.SerializeToUtf8Bytes( - new Token(Namespace, WorkflowId, Version == 0 ? null : Version), + new OperationToken(Namespace, WorkflowId, Version == 0 ? null : Version), TokenSerializerOptions)); - private record Token( + /// + /// Represents the fields of a Nexus operation token. + /// + internal record OperationToken( [property: JsonPropertyName("ns")] string Namespace, [property: JsonPropertyName("wid")] diff --git a/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs b/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs new file mode 100644 index 00000000..395dd342 --- /dev/null +++ b/src/Temporalio/Nexus/NexusWorkflowStartHelper.cs @@ -0,0 +1,121 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using NexusRpc.Handlers; +using Temporalio.Api.Common.V1; +using Temporalio.Api.Enums.V1; +using Temporalio.Client; + +namespace Temporalio.Nexus +{ + /// + /// Internal helper for starting workflows from Nexus operations and managing operation tokens. + /// Shared by both and . + /// + internal static class NexusWorkflowStartHelper + { + private const string NexusOperationTokenHeader = "Nexus-Operation-Token"; + + /// + /// Start a workflow and return the operation token. This handles all Nexus plumbing: + /// cloning options, setting task queue, processing links, injecting callbacks, and + /// adding outbound links. + /// + /// Temporal client. + /// Nexus start context for callbacks and links. + /// Temporal operation context for info and logging. + /// Workflow type name. + /// Workflow arguments. + /// Workflow start options. ID and TaskQueue are required. + /// Base64url-encoded operation token. + internal static async Task StartWorkflowAndGetTokenAsync( + ITemporalClient client, + OperationStartContext nexusStartContext, + NexusOperationExecutionContext temporalContext, + string workflow, + IReadOnlyCollection args, + WorkflowOptions options) + { + var namespace_ = client.Options.Namespace; + var workflowId = options.Id ?? string.Empty; + + // Generate the token before starting the workflow (needed for callback header) + var token = new NexusWorkflowRunHandle(namespace_, workflowId, 0).ToToken(); + + // Shallow clone the options so we can mutate them. We just overwrite any of these + // internal options since they cannot be user set at this time. + options = (WorkflowOptions)options.Clone(); + options.TaskQueue ??= temporalContext.Info.TaskQueue; + if (options.IdConflictPolicy == WorkflowIdConflictPolicy.UseExisting) + { + options.OnConflictOptions = new() + { + AttachLinks = true, + AttachCompletionCallbacks = true, + AttachRequestId = true, + }; + } + if (nexusStartContext.InboundLinks.Count > 0) + { + options.Links = nexusStartContext.InboundLinks.Select(link => + { + try + { + return new Link { WorkflowEvent = link.ToWorkflowEvent() }; + } + catch (ArgumentException e) + { + temporalContext.Logger.LogWarning(e, "Invalid Nexus link: {Url}", link.Uri); + return null; + } + }).OfType().ToList(); + } + if (nexusStartContext.CallbackUrl is { } callbackUrl) + { + var callback = new Callback() { Nexus = new() { Url = callbackUrl } }; + var callbackHeadersHasToken = false; + if (nexusStartContext.CallbackHeaders is { } callbackHeaders) + { + foreach (var kv in callbackHeaders) + { + callback.Nexus.Header.Add(kv.Key, kv.Value); + if (string.Equals( + kv.Key, NexusOperationTokenHeader, StringComparison.OrdinalIgnoreCase)) + { + callbackHeadersHasToken = true; + } + } + } + // Set operation token if not already present (header is case-insensitive) + if (!callbackHeadersHasToken) + { + callback.Nexus.Header[NexusOperationTokenHeader] = token; + } + if (options.Links is { } links) + { + callback.Links.AddRange(links); + } + options.CompletionCallbacks = new[] { callback }; + } + options.RequestId = nexusStartContext.RequestId; + + // Do the start call + var wfHandle = await client.StartWorkflowAsync( + workflow, args, options).ConfigureAwait(false); + + // Add the outbound link + nexusStartContext.OutboundLinks.Add(new Link.Types.WorkflowEvent + { + Namespace = namespace_, + WorkflowId = workflowId, + RunId = wfHandle.FirstExecutionRunId ?? + throw new InvalidOperationException("Handle unexpectedly missing run ID"), + EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted }, + }.ToNexusLink()); + + return token; + } + } +} diff --git a/src/Temporalio/Nexus/TemporalNexusClient.cs b/src/Temporalio/Nexus/TemporalNexusClient.cs new file mode 100644 index 00000000..293db33b --- /dev/null +++ b/src/Temporalio/Nexus/TemporalNexusClient.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Threading.Tasks; +using NexusRpc.Handlers; +using Temporalio.Client; + +namespace Temporalio.Nexus +{ + /// + /// Nexus-aware client wrapping the Temporal client. Provides methods for starting workflows + /// from within Nexus operation handlers, handling all Nexus plumbing (links, callbacks, token + /// generation) internally. + /// + /// + /// WARNING: Nexus support is experimental. + /// This client is created by and passed to the + /// user's start function. It should not be instantiated directly. + /// + public class TemporalNexusClient : ITemporalNexusClient + { + private readonly ITemporalClient temporalClient; + private readonly OperationStartContext nexusStartContext; + private readonly NexusOperationExecutionContext temporalContext; + + /// + /// Initializes a new instance of the class. + /// + /// Nexus start context for callbacks and links. + internal TemporalNexusClient(OperationStartContext nexusStartContext) + { + this.nexusStartContext = nexusStartContext; + temporalContext = NexusOperationExecutionContext.Current; + temporalClient = temporalContext.TemporalClient; + } + + /// + public ITemporalClient TemporalClient => temporalClient; + + /// + /// Start a workflow via a lambda invoking the run method. Always returns an async result + /// with a workflow-run operation token. + /// + /// Workflow class type. + /// Workflow result type. + /// Invocation of workflow run method with a result. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + public Task> StartWorkflowAsync( + Expression>> workflowRunCall, WorkflowOptions options) + { + var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall); + return StartWorkflowAsync( + Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod), + args, + options); + } + + /// + /// Start a workflow via a lambda invoking the run method with no return value. Always + /// returns an async result with a workflow-run operation token. + /// + /// Workflow class type. + /// Invocation of workflow run method with no result. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + public async Task> StartWorkflowAsync( + Expression> workflowRunCall, WorkflowOptions options) + { + var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall); + var token = await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync( + temporalClient, + nexusStartContext, + temporalContext, + Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod), + args, + options).ConfigureAwait(false); + return TemporalOperationResult.Async(token); + } + + /// + /// Start a workflow by name. Always returns an async result with a workflow-run operation + /// token. + /// + /// Workflow result type. + /// Workflow type name. + /// Arguments for the workflow. + /// Start workflow options. ID and TaskQueue are required. + /// An async operation result containing the workflow-run token. + public async Task> StartWorkflowAsync( + string workflow, IReadOnlyCollection args, WorkflowOptions options) + { + var token = await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync( + temporalClient, + nexusStartContext, + temporalContext, + workflow, + args, + options).ConfigureAwait(false); + return TemporalOperationResult.Async(token); + } + } +} diff --git a/src/Temporalio/Nexus/TemporalNexusOperationHandler.cs b/src/Temporalio/Nexus/TemporalNexusOperationHandler.cs new file mode 100644 index 00000000..2fd39a5e --- /dev/null +++ b/src/Temporalio/Nexus/TemporalNexusOperationHandler.cs @@ -0,0 +1,148 @@ +#pragma warning disable SA1402 // We allow multiple types of the same name + +using System; +using System.Threading.Tasks; +using NexusRpc.Handlers; + +namespace Temporalio.Nexus +{ + /// + /// Factory for creating generic Nexus operation handlers backed by Temporal. + /// + /// + /// WARNING: Nexus support is experimental. + /// Usage example — starting a workflow from a Nexus operation: + /// + /// [OperationImpl] + /// public IOperationHandler<TransferInput, TransferResult> StartTransfer() => + /// TemporalNexusOperationHandler.FromHandleFactory<TransferInput, TransferResult>( + /// async (context, client, input) => + /// await client.StartWorkflowAsync<TransferWorkflow, TransferResult>( + /// wf => wf.RunAsync(input), + /// new(id: $"transfer-{input.TransferId}", taskQueue: "my-task-queue"))); + /// + /// To perform a synchronous operation (e.g., sending a signal and returning immediately): + /// + /// [OperationImpl] + /// public IOperationHandler<CancelOrderInput, NoValue> CancelOrder() => + /// TemporalNexusOperationHandler.FromHandleFactory<CancelOrderInput, NoValue>( + /// async (context, client, input) => + /// { + /// await client.TemporalClient + /// .GetWorkflowHandle($"order-{input.OrderId}") + /// .SignalAsync("requestCancellation", new[] { input }); + /// return TemporalOperationResult<NoValue>.Sync(default); + /// }); + /// + /// + public static class TemporalNexusOperationHandler + { + /// + /// Create an operation handler from the given start function. + /// + /// Operation input type. + /// Operation result type. + /// Function invoked on every operation start. Receives the Nexus + /// start context, a Temporal Nexus client for starting workflows, and the operation input. + /// Should return a . + /// Operation handler backed by Temporal. + public static TemporalNexusOperationHandler FromHandleFactory( + Func>> startFunc) => + new(startFunc); + + /// + /// Create an operation handler with no input from the given start function. + /// + /// Operation result type. + /// Function invoked on every operation start. Receives the Nexus + /// start context and a Temporal Nexus client for starting workflows. Should return a + /// . + /// Operation handler backed by Temporal. + public static TemporalNexusOperationHandler FromHandleFactory( + Func>> startFunc) => + new((context, client, _) => startFunc(context, client)); + } + + /// + /// Generic Nexus operation handler backed by Temporal. Implements + /// and provides a composable way to map + /// Temporal operations to Nexus operations. + /// + /// Operation input type. + /// Operation result type. + /// + /// WARNING: Nexus support is experimental. + /// This class supports inheritance to customize cancel behavior. Override + /// to change how workflow-run cancellations are handled. + /// The and methods should not be + /// overridden — they contain the core dispatch logic. + /// + public class TemporalNexusOperationHandler : IOperationHandler + { + private readonly Func>> startFunc; + + /// + /// Initializes a new instance of the + /// class. + /// + /// Start function delegate. + public TemporalNexusOperationHandler( + Func>> startFunc) => + this.startFunc = startFunc; + + /// + public async Task> StartAsync( + OperationStartContext context, TInput input) + { + var client = new TemporalNexusClient(context); + var result = await startFunc(context, client, input).ConfigureAwait(false); + if (result.IsSyncResult) + { + return OperationStartResult.SyncResult(result.SyncValue!); + } + return OperationStartResult.AsyncResult(result.AsyncToken!); + } + + /// + public Task CancelAsync(OperationCancelContext context) + { + NexusWorkflowRunHandle.OperationToken token; + try + { + token = NexusWorkflowRunHandle.ParseToken(context.OperationToken); + } + catch (ArgumentException e) + { + throw new HandlerException(HandlerErrorType.BadRequest, e.Message); + } + if (token.Namespace != NexusOperationExecutionContext.Current.Info.Namespace) + { + throw new HandlerException(HandlerErrorType.BadRequest, "Invalid namespace"); + } + return token.Type switch + { + 1 => CancelWorkflowRunAsync(context, token.WorkflowId), + _ => throw new HandlerException( + HandlerErrorType.BadRequest, + $"Unsupported token type: {token.Type}"), + }; + } + + /// + /// Called when a cancel request is received for a workflow-run token (type=1). Override to + /// customize cancel behavior. + /// Default behavior: cancels the underlying workflow. + /// + /// The cancel context. + /// The workflow ID extracted from the operation token. + /// Task for cancel completion. + protected virtual Task CancelWorkflowRunAsync( + OperationCancelContext context, string workflowId) => + NexusOperationExecutionContext.Current.TemporalClient + .GetWorkflowHandle(workflowId).CancelAsync(); + } +} diff --git a/src/Temporalio/Nexus/TemporalOperationResult.cs b/src/Temporalio/Nexus/TemporalOperationResult.cs new file mode 100644 index 00000000..60d9e359 --- /dev/null +++ b/src/Temporalio/Nexus/TemporalOperationResult.cs @@ -0,0 +1,69 @@ +using System; + +namespace Temporalio.Nexus +{ + /// + /// Unified result type for Temporal-backed Nexus operations. Encapsulates either a synchronous + /// result value or an asynchronous operation token. + /// + /// The result type. + /// + /// WARNING: Nexus support is experimental. + /// Use for operations that complete immediately (e.g., signals). + /// Use for operations that return an operation token for async completion + /// (e.g., starting a workflow). + /// + public sealed class TemporalOperationResult + { + private TemporalOperationResult(bool isSyncResult, TResult? syncValue, string? asyncToken) + { + IsSyncResult = isSyncResult; + SyncValue = syncValue; + AsyncToken = asyncToken; + } + + /// + /// Gets a value indicating whether this is a synchronous result. + /// + public bool IsSyncResult { get; } + + /// + /// Gets the synchronous result value. Only meaningful when is + /// true. + /// + internal TResult? SyncValue { get; } + + /// + /// Gets the asynchronous operation token. Only meaningful when + /// is false. + /// + internal string? AsyncToken { get; } + + /// + /// Create a synchronous result with the given value. + /// + /// The result value. + /// A synchronous operation result. +#pragma warning disable CA1000 // Intentional static factory on generic type + public static TemporalOperationResult Sync(TResult? value) => + new(isSyncResult: true, syncValue: value, asyncToken: null); +#pragma warning restore CA1000 + + /// + /// Create an asynchronous result with the given operation token. + /// + /// The operation token. + /// An asynchronous operation result. + /// If the token is null or empty. +#pragma warning disable CA1000, VSTHRD200 // Intentional: static factory on generic type; "Async" refers to the result kind, not the method signature + public static TemporalOperationResult Async(string token) +#pragma warning restore CA1000, VSTHRD200 + { + if (string.IsNullOrEmpty(token)) + { + throw new ArgumentException("Token cannot be null or empty", nameof(token)); + } + return new(isSyncResult: false, syncValue: default, asyncToken: token); + } + } +} diff --git a/src/Temporalio/Nexus/WorkflowRunOperationContext.cs b/src/Temporalio/Nexus/WorkflowRunOperationContext.cs index 0c530a48..ceb7e802 100644 --- a/src/Temporalio/Nexus/WorkflowRunOperationContext.cs +++ b/src/Temporalio/Nexus/WorkflowRunOperationContext.cs @@ -1,12 +1,8 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using NexusRpc.Handlers; -using Temporalio.Api.Common.V1; -using Temporalio.Api.Enums.V1; using Temporalio.Client; namespace Temporalio.Nexus @@ -78,13 +74,16 @@ public async Task StartWorkflowAsync( { #pragma warning restore CA1822 var temporalContext = NexusOperationExecutionContext.Current; - var handle = new NexusWorkflowRunHandle( - temporalContext.TemporalClient.Options.Namespace, - // Missing ID will be caught later - options.Id ?? string.Empty, - version: 0); - await StartWorkflowInternalAsync(handle, workflow, args, options).ConfigureAwait(false); - return handle; + var ns = temporalContext.TemporalClient.Options.Namespace; + var wfId = options.Id ?? string.Empty; + await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync( + temporalContext.TemporalClient, + (OperationStartContext)temporalContext.HandlerContext, + temporalContext, + workflow, + args, + options).ConfigureAwait(false); + return new NexusWorkflowRunHandle(ns, wfId, version: 0); } /// @@ -101,88 +100,16 @@ public async Task> StartWorkflowAsync( { #pragma warning restore CA1822 var temporalContext = NexusOperationExecutionContext.Current; - var handle = new NexusWorkflowRunHandle( - temporalContext.TemporalClient.Options.Namespace, - // Missing ID will be caught later - options.Id ?? string.Empty, - version: 0); - await StartWorkflowInternalAsync(handle, workflow, args, options).ConfigureAwait(false); - return handle; - } - - private static async Task StartWorkflowInternalAsync( - NexusWorkflowRunHandle handle, - string workflow, - IReadOnlyCollection args, - WorkflowOptions options) - { - var temporalContext = NexusOperationExecutionContext.Current; - var nexusContext = (OperationStartContext)temporalContext.HandlerContext; - - // Shallow clone the options so we can mutate them. We just overwrite any of these - // internal options since they cannot be user set at this time. - options = (WorkflowOptions)options.Clone(); - options.TaskQueue ??= temporalContext.Info.TaskQueue; - if (options.IdConflictPolicy == WorkflowIdConflictPolicy.UseExisting) - { - options.OnConflictOptions = new() - { - AttachLinks = true, - AttachCompletionCallbacks = true, - AttachRequestId = true, - }; - } - if (nexusContext.InboundLinks.Count > 0) - { - options.Links = nexusContext.InboundLinks.Select(link => - { - try - { - return new Link { WorkflowEvent = link.ToWorkflowEvent() }; - } - catch (ArgumentException e) - { - temporalContext.Logger.LogWarning(e, "Invalid Nexus link: {Url}", link.Uri); - return null; - } - }).OfType().ToList(); - } - if (nexusContext.CallbackUrl is { } callbackUrl) - { - var callback = new Callback() { Nexus = new() { Url = callbackUrl } }; - if (nexusContext.CallbackHeaders is { } callbackHeaders) - { - foreach (var kv in callbackHeaders) - { - callback.Nexus.Header.Add(kv.Key, kv.Value); - } - } - // Set operation token - if (nexusContext.CallbackHeaders?.ContainsKey("Nexus-Operation-Token") != true) - { - callback.Nexus.Header["Nexus-Operation-Token"] = handle.ToToken(); - } - if (options.Links is { } links) - { - callback.Links.AddRange(links); - } - options.CompletionCallbacks = new[] { callback }; - } - options.RequestId = nexusContext.RequestId; - - // Do the start call - var wfHandle = await temporalContext.TemporalClient.StartWorkflowAsync( - workflow, args, options).ConfigureAwait(false); - - // Add the outbound link - nexusContext.OutboundLinks.Add(new Link.Types.WorkflowEvent - { - Namespace = handle.Namespace, - WorkflowId = handle.WorkflowId, - RunId = wfHandle.FirstExecutionRunId ?? - throw new InvalidOperationException("Handle unexpectedly missing run ID"), - EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted }, - }.ToNexusLink()); + var ns = temporalContext.TemporalClient.Options.Namespace; + var wfId = options.Id ?? string.Empty; + await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync( + temporalContext.TemporalClient, + (OperationStartContext)temporalContext.HandlerContext, + temporalContext, + workflow, + args, + options).ConfigureAwait(false); + return new NexusWorkflowRunHandle(ns, wfId, version: 0); } } } \ No newline at end of file diff --git a/tests/Temporalio.Tests/Nexus/NexusWorkflowStartHelperTests.cs b/tests/Temporalio.Tests/Nexus/NexusWorkflowStartHelperTests.cs new file mode 100644 index 00000000..3515edc3 --- /dev/null +++ b/tests/Temporalio.Tests/Nexus/NexusWorkflowStartHelperTests.cs @@ -0,0 +1,123 @@ +namespace Temporalio.Tests.Nexus; + +using System.Text; +using System.Text.Json; +using Temporalio.Nexus; +using Xunit; + +public class NexusWorkflowStartHelperTests +{ + [Fact] + public void ToToken_ProducesValidBase64Url() + { + var token = new NexusWorkflowRunHandle("my-ns", "my-wf", 0).ToToken(); + + Assert.DoesNotContain("+", token); + Assert.DoesNotContain("/", token); + Assert.DoesNotContain("=", token); + } + + [Fact] + public void ToToken_ContainsCorrectFields() + { + var token = new NexusWorkflowRunHandle("my-ns", "my-wf", 0).ToToken(); + var json = Encoding.UTF8.GetString(NexusWorkflowRunHandle.Base64UrlDecode(token)); + using var doc = JsonDocument.Parse(json); + var root = doc.RootElement; + + Assert.Equal(1, root.GetProperty("t").GetInt32()); + Assert.Equal("my-ns", root.GetProperty("ns").GetString()); + Assert.Equal("my-wf", root.GetProperty("wid").GetString()); + } + + [Fact] + public void ParseToken_RoundTrips() + { + var token = new NexusWorkflowRunHandle("my-ns", "my-wf", 0).ToToken(); + var parsed = NexusWorkflowRunHandle.ParseToken(token); + + Assert.Equal("my-ns", parsed.Namespace); + Assert.Equal("my-wf", parsed.WorkflowId); + Assert.Equal(1, parsed.Type); + } + + [Fact] + public void ParseToken_RejectsInvalidBase64() + { + Assert.Throws(() => NexusWorkflowRunHandle.ParseToken("!!!invalid!!!")); + } + + [Fact] + public void ParseToken_RejectsInvalidJson() + { + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes("not json")); + Assert.Throws(() => NexusWorkflowRunHandle.ParseToken(token)); + } + + [Fact] + public void ParseToken_RejectsUnsupportedVersion() + { + var json = """{"t":1,"ns":"ns","wid":"wid","v":99}"""; + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes(json)); + Assert.Throws(() => NexusWorkflowRunHandle.ParseToken(token)); + } + + [Fact] + public void ParseToken_AcceptsUnknownTokenType() + { + // ParseToken should not reject unknown token types — that's for the handler to decide + var json = """{"t":99,"ns":"ns","wid":"wid"}"""; + var token = NexusWorkflowRunHandle.Base64UrlEncode(Encoding.UTF8.GetBytes(json)); + var parsed = NexusWorkflowRunHandle.ParseToken(token); + Assert.Equal(99, parsed.Type); + } + + [Fact] + public void ToToken_SpecialCharactersRoundTrip() + { + var token = new NexusWorkflowRunHandle("ns/with+special", "wf?id=1&foo=bar", 0).ToToken(); + var parsed = NexusWorkflowRunHandle.ParseToken(token); + + Assert.Equal("ns/with+special", parsed.Namespace); + Assert.Equal("wf?id=1&foo=bar", parsed.WorkflowId); + } + + [Fact] + public void TemporalOperationResult_Sync_StoresValue() + { + var result = TemporalOperationResult.Sync("hello"); + Assert.True(result.IsSyncResult); + Assert.Equal("hello", result.SyncValue); + Assert.Null(result.AsyncToken); + } + + [Fact] + public void TemporalOperationResult_Sync_AllowsDefault() + { + var result = TemporalOperationResult.Sync(null); + Assert.True(result.IsSyncResult); + Assert.Null(result.SyncValue); + Assert.Null(result.AsyncToken); + } + + [Fact] + public void TemporalOperationResult_Async_StoresToken() + { + var result = TemporalOperationResult.Async("some-token"); + Assert.False(result.IsSyncResult); + Assert.Equal("some-token", result.AsyncToken); + Assert.Null(result.SyncValue); + } + + [Fact] + public void TemporalOperationResult_Async_RejectsNull() + { + Assert.Throws(() => TemporalOperationResult.Async(null!)); + } + + [Fact] + public void TemporalOperationResult_Async_RejectsEmpty() + { + Assert.Throws(() => TemporalOperationResult.Async(string.Empty)); + } +} diff --git a/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs b/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs index 1ba4164f..254c7da9 100644 --- a/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/NexusWorkerTests.cs @@ -1315,6 +1315,330 @@ await Workflow.CreateNexusWorkflowClient(endpoint). exc3.Message); } + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_StartWorkflow_Succeeds() + { + // Build the worker options w/ the nexus service using the new generic handler + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartWorkflowAsync( + (SimpleWorkflow wf) => wf.RunAsync(input), + new() { Id = $"wf-{Guid.NewGuid()}" })))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + // Run the Nexus client code in workflow + await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("some-name")); + Assert.Equal("Hello from workflow, some-name", result); + }); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_Cancel_Succeeds() + { + // Build the worker options w/ the nexus service using the new generic handler + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartWorkflowAsync( + (WaitForeverWorkflow wf) => wf.RunAsync(input), + new() { Id = $"wf-{Guid.NewGuid()}" })))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + // Cancel the whole workflow and confirm it has expected exceptions + var wfExc = await Assert.ThrowsAsync(() => RunInWorkflowAsync( + workerOptions, + async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("some-name")); + Assert.Equal("Hello from workflow, some-name", result); + }, + beforeGetResultFunc: async handle => + { + // Wait for Nexus operation to get started + await AssertMore.HasEventEventuallyAsync( + handle, evt => evt.NexusOperationStartedEventAttributes != null); + // Now cancel entire workflow + await handle.CancelAsync(); + })); + Assert.IsType(wfExc.InnerException); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_SyncResult_Succeeds() + { + // Build the worker options w/ a handler that returns a sync result + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + (context, client, input) => + Task.FromResult(TemporalOperationResult.Sync($"Hello, {input}"))))); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("world")); + Assert.Equal("Hello, world", result); + }); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_LinksAndContext_Populated() + { + // Capture the context and client passed to the generic handler so we can assert plumbing + OperationStartContext? capturedContext = null; + ITemporalNexusClient? capturedClient = null; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + { + capturedContext = context; + capturedClient = client; + return await client.StartWorkflowAsync( + (SimpleWorkflow wf) => wf.RunAsync(input), + new() { Id = $"wf-{Guid.NewGuid()}" }); + }))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + var handle = await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("some-name")); + Assert.Equal("Hello from workflow, some-name", result); + }); + + // Context fields populated + Assert.NotNull(capturedContext); + Assert.Equal("StringService", capturedContext!.Service); + Assert.Equal("DoSomething", capturedContext.Operation); + Assert.True(Guid.TryParse(capturedContext.RequestId, out _)); + Assert.False(string.IsNullOrEmpty(capturedContext.CallbackUrl)); + // Inbound link points to the caller workflow's scheduled event + var wfEvent = Assert.Single(capturedContext.InboundLinks).ToWorkflowEvent(); + Assert.Equal(handle.Id, wfEvent.WorkflowId); + Assert.Equal(Api.Enums.V1.EventType.NexusOperationScheduled, wfEvent.EventRef.EventType); + + // Nexus client exposes the temporal client + Assert.NotNull(capturedClient); + Assert.Equal(Env.Client.Options.Namespace, capturedClient!.TemporalClient.Options.Namespace); + + // Outbound link on the start event points to the workflow that the handler started + var startEvent = Assert.Single( + (await handle.FetchHistoryAsync()).Events, + evt => evt.NexusOperationStartedEventAttributes != null); + var link = Assert.Single(startEvent.Links); + Assert.Equal(1, link.WorkflowEvent.EventRef.EventId); + Assert.Equal( + Api.Enums.V1.EventType.WorkflowExecutionStarted, + link.WorkflowEvent.EventRef.EventType); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_CancelOperation_CancelsUnderlying() + { + // The default CancelWorkflowRunAsync should cancel the underlying workflow when the + // operation is canceled (as opposed to canceling the caller workflow itself). + var workflowId = $"wf-{Guid.NewGuid()}"; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartWorkflowAsync( + (WaitForeverWorkflow wf) => wf.RunAsync(input), + new() { Id = workflowId })))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + var wfExc = await Assert.ThrowsAsync(() => + RunInWorkflowAsync(workerOptions, async () => + { + using var cancelSource = new CancellationTokenSource(); + var handle = await Workflow.CreateNexusWorkflowClient(endpoint). + StartNexusOperationAsync( + svc => svc.DoSomething("some-name"), + new() { CancellationToken = cancelSource.Token }); +#pragma warning disable CA1849, VSTHRD103 // https://github.com/temporalio/sdk-dotnet/issues/327 + cancelSource.Cancel(); +#pragma warning restore CA1849, VSTHRD103 + await handle.GetResultAsync(); + })); + var nexusExc = Assert.IsType(wfExc.InnerException); + Assert.IsType(nexusExc.InnerException); + + // Underlying workflow was canceled too + Assert.IsType( + (await Assert.ThrowsAsync(() => + Client.GetWorkflowHandle(workflowId).GetResultAsync())).InnerException); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_CancelOverride_Invoked() + { + // Subclass with a CancelWorkflowRunAsync override; verify the override is used and + // receives the right workflow ID. + var workflowId = $"wf-{Guid.NewGuid()}"; + CancelOverrideHandler? capturedHandler = null; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + { + capturedHandler ??= new CancelOverrideHandler( + async (context, client, input) => + await client.StartWorkflowAsync( + (WaitForeverWorkflow wf) => wf.RunAsync(input), + new() { Id = workflowId })); + return capturedHandler; + })). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await Assert.ThrowsAsync(() => + RunInWorkflowAsync(workerOptions, async () => + { + using var cancelSource = new CancellationTokenSource(); + var handle = await Workflow.CreateNexusWorkflowClient(endpoint). + StartNexusOperationAsync( + svc => svc.DoSomething("some-name"), + new() { CancellationToken = cancelSource.Token }); +#pragma warning disable CA1849, VSTHRD103 + cancelSource.Cancel(); +#pragma warning restore CA1849, VSTHRD103 + await handle.GetResultAsync(); + })); + + Assert.NotNull(capturedHandler); + Assert.True(capturedHandler!.CancelCallCount > 0); + Assert.Equal(workflowId, capturedHandler.CapturedWorkflowId); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_StartWorkflowByName_Succeeds() + { + // Use the by-name overload of TemporalNexusClient.StartWorkflowAsync + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartWorkflowAsync( + "SimpleWorkflow", + new object?[] { input }, + new() { Id = $"wf-{Guid.NewGuid()}" })))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoSomething("by-name")); + Assert.Equal("Hello from workflow, by-name", result); + }); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_ConflictPolicy_UseExisting() + { + // Two operations with the same workflow ID + UseExisting policy attach to the same + // workflow; both observe the first input. Exercises the OnConflictOptions plumbing in + // NexusWorkflowStartHelper. + var workflowId = $"wf-{Guid.NewGuid()}"; + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryStringService(() => + TemporalNexusOperationHandler.FromHandleFactory( + async (context, client, input) => + await client.StartWorkflowAsync( + (WaitForSignalWorkflow wf) => wf.RunAsync(input), + new() + { + Id = workflowId, + IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting, + })))). + AddWorkflow(); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + var results = new List(); + await RunInWorkflowAsync(workerOptions, async () => + { + var client = Workflow.CreateNexusWorkflowClient(endpoint); + var handle1 = await client.StartNexusOperationAsync(svc => svc.DoSomething("name1")); + var handle2 = await client.StartNexusOperationAsync(svc => svc.DoSomething("name2")); + await Workflow.GetExternalWorkflowHandle(workflowId). + SignalAsync(wf => wf.SignalAsync()); + results.Add(await handle1.GetResultAsync()); + results.Add(await handle2.GetResultAsync()); + }); + Assert.Equal(new List { "Hello, name1!", "Hello, name1!" }, results); + } + + [Fact] + public async Task ExecuteNexusOperationAsync_GenericHandler_NoInputOverload_Succeeds() + { + // Exercise the no-input FromHandleFactory overload + var workerOptions = new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddNexusService(new HandlerFactoryNoInputService(() => + TemporalNexusOperationHandler.FromHandleFactory( + (context, client) => + Task.FromResult(TemporalOperationResult.Sync("hello-no-input"))))); + var endpoint = await CreateNexusEndpointAsync(workerOptions.TaskQueue!); + + await RunInWorkflowAsync(workerOptions, async () => + { + var result = await Workflow.CreateNexusWorkflowClient(endpoint). + ExecuteNexusOperationAsync(svc => svc.DoIt()); + Assert.Equal("hello-no-input", result); + }); + } + + [NexusService] + public interface INoInputService + { + [NexusOperation] + string DoIt(); + } + + [NexusServiceHandler(typeof(INoInputService))] + public class HandlerFactoryNoInputService + { + private readonly Func> handlerFactory; + + public HandlerFactoryNoInputService(Func> handlerFactory) => + this.handlerFactory = handlerFactory; + + [NexusOperationHandler] + public IOperationHandler DoIt() => handlerFactory(); + } + + private class CancelOverrideHandler : TemporalNexusOperationHandler + { + public CancelOverrideHandler( + Func>> startFunc) + : base(startFunc) + { + } + + public int CancelCallCount { get; private set; } + + public string? CapturedWorkflowId { get; private set; } + + protected override Task CancelWorkflowRunAsync( + OperationCancelContext context, string workflowId) + { + CancelCallCount++; + CapturedWorkflowId = workflowId; + return base.CancelWorkflowRunAsync(context, workflowId); + } + } + private async Task CreateNexusEndpointAsync(string taskQueue) { var name = $"nexus-endpoint-{taskQueue}";