Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/Temporalio/Nexus/ITemporalNexusClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading.Tasks;
using NexusRpc.Handlers;
using Temporalio.Client;

namespace Temporalio.Nexus
{
/// <summary>
/// Nexus-aware client wrapping the Temporal client. Provides methods for starting workflows
/// from within a Nexus operation handler.
/// </summary>
/// <remarks>
/// <para>WARNING: Nexus support is experimental.</para>
/// <para>Obtained via the <see cref="TemporalNexusOperationHandler.FromHandleFactory{TInput, TResult}"/>
/// start function parameter.</para>
/// <para>Example usage — starting a workflow from an operation handler:</para>
/// <code>
/// await client.StartWorkflowAsync&lt;MyWorkflow, MyResult&gt;(
/// wf => wf.RunAsync(input),
/// new(id: "my-workflow-id", taskQueue: "my-task-queue"));
/// </code>
/// <para>To perform a synchronous operation (e.g., sending a signal), use the underlying
/// <see cref="TemporalClient"/> and return a sync result:</para>
/// <code>
/// await client.TemporalClient
/// .GetWorkflowHandle($"order-{input.OrderId}")
/// .SignalAsync("requestCancellation", new[] { input });
/// return TemporalOperationResult&lt;NoValue&gt;.Sync(default);
/// </code>
/// </remarks>
public interface ITemporalNexusClient
{
/// <summary>
/// Gets the underlying Temporal client for advanced use cases such as sending signals
/// or queries.
/// </summary>
ITemporalClient TemporalClient { get; }

/// <summary>
/// Start a workflow via a lambda invoking the run method. Always returns an async result
/// with a workflow-run operation token.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with a result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TWorkflow, TResult>(
Expression<Func<TWorkflow, Task<TResult>>> workflowRunCall, WorkflowOptions options);

/// <summary>
/// Start a workflow via a lambda invoking the run method with no return value. Always
/// returns an async result with a workflow-run operation token.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with no result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<NoValue>> StartWorkflowAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> workflowRunCall, WorkflowOptions options);

/// <summary>
/// Start a workflow by name. Always returns an async result with a workflow-run operation
/// token.
/// </summary>
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflow">Workflow type name.</param>
/// <param name="args">Arguments for the workflow.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TResult>(
string workflow, IReadOnlyCollection<object?> args, WorkflowOptions options);
}
}
26 changes: 21 additions & 5 deletions src/Temporalio/Nexus/NexusWorkflowRunHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ internal static byte[] Base64UrlDecode(string s)
/// <returns>Created handle.</returns>
/// <exception cref="ArgumentException">If the token is invalid.</exception>
internal static NexusWorkflowRunHandle FromToken(string token)
{
var data = ParseToken(token);
return new(data.Namespace, data.WorkflowId, data.Version ?? 0);
}

/// <summary>
/// Parse an operation token to its underlying fields. Validates encoding, JSON shape, and
/// version (but not type — callers decide which token types they support).
/// </summary>
/// <param name="token">Base64url-encoded token string.</param>
/// <returns>Parsed token fields.</returns>
/// <exception cref="ArgumentException">If the token is invalid.</exception>
internal static OperationToken ParseToken(string token)
{
byte[] bytes;
try
Expand All @@ -96,10 +109,10 @@ internal static NexusWorkflowRunHandle FromToken(string token)
{
throw new ArgumentException("Token invalid");
}
Token? tokenObj;
OperationToken? tokenObj;
try
{
tokenObj = JsonSerializer.Deserialize<Token>(bytes, TokenSerializerOptions);
tokenObj = JsonSerializer.Deserialize<OperationToken>(bytes, TokenSerializerOptions);
}
catch (JsonException e)
{
Expand All @@ -113,18 +126,21 @@ internal static NexusWorkflowRunHandle FromToken(string token)
{
throw new ArgumentException($"Unsupported token version: {tokenObj.Version}");
}
return new(tokenObj.Namespace, tokenObj.WorkflowId, tokenObj.Version ?? 0);
return tokenObj;
}

/// <summary>
/// Create a string token based on this handle.
/// </summary>
/// <returns>Operation token.</returns>
internal string ToToken() => Base64UrlEncode(JsonSerializer.SerializeToUtf8Bytes(
new Token(Namespace, WorkflowId, Version == 0 ? null : Version),
new OperationToken(Namespace, WorkflowId, Version == 0 ? null : Version),
TokenSerializerOptions));

private record Token(
/// <summary>
/// Represents the fields of a Nexus operation token.
/// </summary>
internal record OperationToken(
[property: JsonPropertyName("ns")]
string Namespace,
[property: JsonPropertyName("wid")]
Expand Down
121 changes: 121 additions & 0 deletions src/Temporalio/Nexus/NexusWorkflowStartHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NexusRpc.Handlers;
using Temporalio.Api.Common.V1;
using Temporalio.Api.Enums.V1;
using Temporalio.Client;

namespace Temporalio.Nexus
{
/// <summary>
/// Internal helper for starting workflows from Nexus operations and managing operation tokens.
/// Shared by both <see cref="WorkflowRunOperationContext"/> and <see cref="TemporalNexusClient"/>.
/// </summary>
internal static class NexusWorkflowStartHelper
{
private const string NexusOperationTokenHeader = "Nexus-Operation-Token";

/// <summary>
/// Start a workflow and return the operation token. This handles all Nexus plumbing:
/// cloning options, setting task queue, processing links, injecting callbacks, and
/// adding outbound links.
/// </summary>
/// <param name="client">Temporal client.</param>
/// <param name="nexusStartContext">Nexus start context for callbacks and links.</param>
/// <param name="temporalContext">Temporal operation context for info and logging.</param>
/// <param name="workflow">Workflow type name.</param>
/// <param name="args">Workflow arguments.</param>
/// <param name="options">Workflow start options. ID and TaskQueue are required.</param>
/// <returns>Base64url-encoded operation token.</returns>
internal static async Task<string> StartWorkflowAndGetTokenAsync(
ITemporalClient client,
OperationStartContext nexusStartContext,
NexusOperationExecutionContext temporalContext,
string workflow,
IReadOnlyCollection<object?> args,
WorkflowOptions options)
{
var namespace_ = client.Options.Namespace;
var workflowId = options.Id ?? string.Empty;

// Generate the token before starting the workflow (needed for callback header)
var token = new NexusWorkflowRunHandle(namespace_, workflowId, 0).ToToken();

// Shallow clone the options so we can mutate them. We just overwrite any of these
// internal options since they cannot be user set at this time.
options = (WorkflowOptions)options.Clone();
options.TaskQueue ??= temporalContext.Info.TaskQueue;
if (options.IdConflictPolicy == WorkflowIdConflictPolicy.UseExisting)
{
options.OnConflictOptions = new()
{
AttachLinks = true,
AttachCompletionCallbacks = true,
AttachRequestId = true,
};
}
if (nexusStartContext.InboundLinks.Count > 0)
{
options.Links = nexusStartContext.InboundLinks.Select(link =>
{
try
{
return new Link { WorkflowEvent = link.ToWorkflowEvent() };
}
catch (ArgumentException e)
{
temporalContext.Logger.LogWarning(e, "Invalid Nexus link: {Url}", link.Uri);
return null;
}
}).OfType<Link>().ToList();
}
if (nexusStartContext.CallbackUrl is { } callbackUrl)
{
var callback = new Callback() { Nexus = new() { Url = callbackUrl } };
var callbackHeadersHasToken = false;
if (nexusStartContext.CallbackHeaders is { } callbackHeaders)
{
foreach (var kv in callbackHeaders)
{
callback.Nexus.Header.Add(kv.Key, kv.Value);
if (string.Equals(
kv.Key, NexusOperationTokenHeader, StringComparison.OrdinalIgnoreCase))
{
callbackHeadersHasToken = true;
}
}
}
// Set operation token if not already present (header is case-insensitive)
if (!callbackHeadersHasToken)
{
callback.Nexus.Header[NexusOperationTokenHeader] = token;
}
if (options.Links is { } links)
{
callback.Links.AddRange(links);
}
options.CompletionCallbacks = new[] { callback };
}
options.RequestId = nexusStartContext.RequestId;

// Do the start call
var wfHandle = await client.StartWorkflowAsync(
workflow, args, options).ConfigureAwait(false);

// Add the outbound link
nexusStartContext.OutboundLinks.Add(new Link.Types.WorkflowEvent
{
Namespace = namespace_,
WorkflowId = workflowId,
RunId = wfHandle.FirstExecutionRunId ??
throw new InvalidOperationException("Handle unexpectedly missing run ID"),
EventRef = new() { EventId = 1, EventType = EventType.WorkflowExecutionStarted },
}.ToNexusLink());

return token;
}
}
}
103 changes: 103 additions & 0 deletions src/Temporalio/Nexus/TemporalNexusClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading.Tasks;
using NexusRpc.Handlers;
using Temporalio.Client;

namespace Temporalio.Nexus
{
/// <summary>
/// Nexus-aware client wrapping the Temporal client. Provides methods for starting workflows
/// from within Nexus operation handlers, handling all Nexus plumbing (links, callbacks, token
/// generation) internally.
/// </summary>
/// <remarks>
/// WARNING: Nexus support is experimental.
/// This client is created by <see cref="TemporalNexusOperationHandler"/> and passed to the
/// user's start function. It should not be instantiated directly.
/// </remarks>
public class TemporalNexusClient : ITemporalNexusClient
{
private readonly ITemporalClient temporalClient;
private readonly OperationStartContext nexusStartContext;
private readonly NexusOperationExecutionContext temporalContext;

/// <summary>
/// Initializes a new instance of the <see cref="TemporalNexusClient"/> class.
/// </summary>
/// <param name="nexusStartContext">Nexus start context for callbacks and links.</param>
internal TemporalNexusClient(OperationStartContext nexusStartContext)
{
this.nexusStartContext = nexusStartContext;
temporalContext = NexusOperationExecutionContext.Current;
temporalClient = temporalContext.TemporalClient;
}

/// <inheritdoc/>
public ITemporalClient TemporalClient => temporalClient;

/// <summary>
/// Start a workflow via a lambda invoking the run method. Always returns an async result
/// with a workflow-run operation token.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with a result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
public Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TWorkflow, TResult>(
Expression<Func<TWorkflow, Task<TResult>>> workflowRunCall, WorkflowOptions options)
{
var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall);
return StartWorkflowAsync<TResult>(
Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod),
args,
options);
}

/// <summary>
/// Start a workflow via a lambda invoking the run method with no return value. Always
/// returns an async result with a workflow-run operation token.
/// </summary>
/// <typeparam name="TWorkflow">Workflow class type.</typeparam>
/// <param name="workflowRunCall">Invocation of workflow run method with no result.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
public async Task<TemporalOperationResult<NoValue>> StartWorkflowAsync<TWorkflow>(
Expression<Func<TWorkflow, Task>> workflowRunCall, WorkflowOptions options)
{
var (runMethod, args) = Common.ExpressionUtil.ExtractCall(workflowRunCall);
var token = await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync(
temporalClient,
nexusStartContext,
temporalContext,
Workflows.WorkflowDefinition.NameFromRunMethodForCall(runMethod),
args,
options).ConfigureAwait(false);
return TemporalOperationResult<NoValue>.Async(token);
}

/// <summary>
/// Start a workflow by name. Always returns an async result with a workflow-run operation
/// token.
/// </summary>
/// <typeparam name="TResult">Workflow result type.</typeparam>
/// <param name="workflow">Workflow type name.</param>
/// <param name="args">Arguments for the workflow.</param>
/// <param name="options">Start workflow options. ID and TaskQueue are required.</param>
/// <returns>An async operation result containing the workflow-run token.</returns>
public async Task<TemporalOperationResult<TResult>> StartWorkflowAsync<TResult>(
string workflow, IReadOnlyCollection<object?> args, WorkflowOptions options)
{
var token = await NexusWorkflowStartHelper.StartWorkflowAndGetTokenAsync(
temporalClient,
nexusStartContext,
temporalContext,
workflow,
args,
options).ConfigureAwait(false);
return TemporalOperationResult<TResult>.Async(token);
}
}
}
Loading
Loading