-
Notifications
You must be signed in to change notification settings - Fork 214
Stand alone operations for Nexus #2872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
59fd36f
e306ae7
5368059
4fcec53
d7fa4bf
5e6908b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| package io.temporal.client; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.lang.reflect.Type; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Client for managing standalone Nexus operation executions. Obtain an instance via {@link | ||
| * #newInstance(WorkflowServiceStubs)} or {@link #newInstance(WorkflowServiceStubs, | ||
| * NexusClientOptions)}. Do not create this object per request; share it for the lifetime of the | ||
| * process. | ||
| * | ||
| * <p>Standalone Nexus operations run independently of any workflow — they are scheduled, monitored, | ||
| * and managed directly through this client (and the service-bound clients it produces) rather than | ||
| * from within a workflow execution. | ||
| * | ||
| * <p>To start operations, build a service-bound client and call {@code start}/{@code execute}: | ||
| * | ||
| * <pre>{@code | ||
| * NexusClient client = NexusClient.newInstance(stubs, options); | ||
| * | ||
| * // Typed: bind to an @ServiceInterface and invoke a method reference. | ||
| * NexusServiceClient<MyService> svc = | ||
| * NexusServiceClient.newInstance(MyService.class, "my-endpoint", stubs, options); | ||
| * String result = svc.execute(MyService::greet, "world"); | ||
| * | ||
| * // Untyped: dispatch by operation name string. | ||
| * UntypedNexusServiceClient untyped = | ||
| * client.newUntypedNexusServiceClient("my-endpoint", "MyService"); | ||
| * UntypedNexusOperationHandle handle = untyped.start("greet", null, "world"); | ||
| * }</pre> | ||
| * | ||
| * <p>To act on an existing operation (describe, cancel, terminate, get result), obtain a handle via | ||
| * {@link #getHandle}: | ||
| * | ||
| * <pre>{@code | ||
| * NexusOperationHandle<String> handle = client.getHandle(operationId, runId, String.class); | ||
| * String result = handle.getResult(); | ||
| * handle.cancel("user requested"); | ||
| * }</pre> | ||
| * | ||
| * <p>For visibility queries across all operations in the namespace, see {@link | ||
| * #listNexusOperationExecutions} and {@link #countNexusOperationExecutions}. | ||
| * | ||
| * @see NexusServiceClient | ||
| * @see UntypedNexusServiceClient | ||
| * @see NexusOperationHandle | ||
| */ | ||
| @Experimental | ||
| public interface NexusClient { | ||
|
|
||
| /** | ||
| * Creates a client with default {@link NexusClientOptions}. | ||
| * | ||
| * @param service gRPC stubs connected to a Temporal Service endpoint | ||
| */ | ||
| static NexusClient newInstance(WorkflowServiceStubs service) { | ||
| return NexusClientImpl.newInstance(service, NexusClientOptions.getDefaultInstance()); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a client with the supplied options. | ||
| * | ||
| * @param service gRPC stubs connected to a Temporal Service endpoint | ||
| * @param options namespace, data converter, interceptors, and defaults applied to operations | ||
| * started through this client | ||
| */ | ||
| static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { | ||
| return NexusClientImpl.newInstance(service, options); | ||
| } | ||
|
|
||
| /** Returns the underlying gRPC stubs this client routes RPCs through. */ | ||
| WorkflowServiceStubs getWorkflowServiceStubs(); | ||
|
|
||
| /** | ||
| * Returns an untyped handle to an existing operation execution, targeting the latest run. To bind | ||
| * a result type, wrap the handle with {@link NexusOperationHandle#fromUntyped}. | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @return an untyped handle | ||
| */ | ||
| UntypedNexusOperationHandle getHandle(String operationId); | ||
|
|
||
| /** | ||
| * Returns an untyped handle to an existing operation execution, optionally pinned to a specific | ||
| * run. | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @return an untyped handle | ||
| */ | ||
| UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId); | ||
|
|
||
| /** | ||
| * Returns a typed handle to an existing operation execution, bound to {@code resultClass}. | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @param resultClass expected result type | ||
| * @param <R> result type | ||
| */ | ||
| <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass); | ||
|
|
||
| /** | ||
| * Returns a typed handle to an existing operation execution, bound to {@code resultClass}/{@code | ||
| * resultType}. Use the {@code resultType} variant when the result is a generic type whose | ||
| * parameters cannot be captured by {@link Class} alone (e.g. {@code List<String>}). | ||
| * | ||
| * @param operationId the user-assigned operation ID | ||
| * @param runId the server-assigned run ID, or {@code null} to target the latest run | ||
| * @param resultClass expected result class | ||
| * @param resultType generic type for deserialization; may be {@code null} | ||
| * @param <R> result type | ||
| */ | ||
| <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass, @Nullable Type resultType); | ||
|
|
||
| /** | ||
| * Builds an untyped service-bound client targeting the given endpoint and service. Use this to | ||
| * dispatch operations by name string when no service interface is available. | ||
| * | ||
| * @param endpoint Nexus endpoint name registered on the Temporal Service | ||
| * @param serviceName Nexus service name on that endpoint | ||
| */ | ||
| UntypedNexusServiceClient newUntypedNexusServiceClient(String endpoint, String serviceName); | ||
|
|
||
| /** | ||
| * Returns a stream of standalone Nexus operation executions matching the given visibility query. | ||
| * The stream paginates lazily over server-side results — pages are fetched on demand as the | ||
| * stream is consumed. | ||
| * | ||
| * @param query Temporal visibility query string, or {@code null} to return all executions in the | ||
| * client namespace | ||
| * @return a lazy stream of matching executions | ||
| */ | ||
| Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions(@Nullable String query); | ||
|
|
||
| /** | ||
| * Returns the count of standalone Nexus operation executions matching the given visibility query, | ||
| * optionally with aggregation groups. | ||
| * | ||
| * @param query Temporal visibility query string, or {@code null} to count all executions in the | ||
| * client namespace | ||
| * @return execution count, optionally with aggregation groups when the query uses {@code GROUP | ||
| * BY} | ||
| */ | ||
| NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,216 @@ | ||
| package io.temporal.client; | ||
|
|
||
| import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread; | ||
|
|
||
| import com.google.protobuf.ByteString; | ||
| import com.uber.m3.tally.Scope; | ||
| import io.temporal.common.Experimental; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput; | ||
| import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput; | ||
| import io.temporal.common.interceptors.NexusClientInterceptor; | ||
| import io.temporal.internal.WorkflowThreadMarker; | ||
| import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs; | ||
| import io.temporal.internal.client.RootNexusClientInvoker; | ||
| import io.temporal.internal.client.external.GenericWorkflowClient; | ||
| import io.temporal.internal.client.external.GenericWorkflowClientImpl; | ||
| import io.temporal.serviceclient.MetricsTag; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.Spliterator; | ||
| import java.util.Spliterators; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import java.util.stream.StreamSupport; | ||
| import javax.annotation.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| @Experimental | ||
| public class NexusClientImpl implements NexusClient { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(NexusClientImpl.class); | ||
|
|
||
| private final WorkflowServiceStubs workflowServiceStubs; | ||
| private final NexusClientOptions options; | ||
| private final GenericWorkflowClient genericClient; | ||
| private final Scope metricsScope; | ||
| private final NexusClientCallsInterceptor nexusClientCallsInvoker; | ||
| private final List<NexusClientInterceptor> interceptors; | ||
|
|
||
| public static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) { | ||
| enforceNonWorkflowThread(); | ||
| return WorkflowThreadMarker.protectFromWorkflowThread( | ||
| new NexusClientImpl(service, options), NexusClient.class); | ||
| } | ||
|
|
||
| NexusClientImpl(WorkflowServiceStubs workflowServiceStubs, NexusClientOptions options) { | ||
| workflowServiceStubs = | ||
| new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace()); | ||
| this.workflowServiceStubs = workflowServiceStubs; | ||
| this.options = options; | ||
| this.metricsScope = | ||
| workflowServiceStubs | ||
| .getOptions() | ||
| .getMetricsScope() | ||
| .tagged(MetricsTag.defaultTags(options.getNamespace())); | ||
| this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope); | ||
| this.interceptors = options.getInterceptors(); | ||
| this.nexusClientCallsInvoker = initializeClientInvoker(); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug( | ||
| "NexusClient initialized: namespace={}, interceptors={}", | ||
| options.getNamespace(), | ||
| interceptors.size()); | ||
| } | ||
| } | ||
|
|
||
| private NexusClientCallsInterceptor initializeClientInvoker() { | ||
| NexusClientCallsInterceptor invoker = new RootNexusClientInvoker(genericClient, options); | ||
| for (NexusClientInterceptor clientInterceptor : interceptors) { | ||
| NexusClientCallsInterceptor wrapped = clientInterceptor.nexusClientCallsInterceptor(invoker); | ||
| if (wrapped == null) { | ||
| throw new IllegalStateException( | ||
| "NexusClientInterceptor " | ||
| + clientInterceptor.getClass().getName() | ||
| + " returned null from nexusClientCallsInterceptor; expected a non-null" | ||
| + " NexusClientCallsInterceptor wrapping the supplied next link"); | ||
| } | ||
| invoker = wrapped; | ||
| } | ||
| return invoker; | ||
| } | ||
|
|
||
| @Override | ||
| public WorkflowServiceStubs getWorkflowServiceStubs() { | ||
| return workflowServiceStubs; | ||
| } | ||
|
|
||
| @Override | ||
| public UntypedNexusOperationHandle getHandle(String operationId) { | ||
| return getHandle(operationId, null); | ||
| } | ||
|
|
||
| @Override | ||
| public UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId) { | ||
| return new NexusOperationHandleImpl<>( | ||
| nexusClientCallsInvoker, operationId, runId, options.getDataConverter()); | ||
| } | ||
|
|
||
| @Override | ||
| public <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, @Nullable String runId, Class<R> resultClass) { | ||
| return getHandle(operationId, runId, resultClass, null); | ||
| } | ||
|
|
||
| @Override | ||
| public <R> NexusOperationHandle<R> getHandle( | ||
| String operationId, | ||
| @Nullable String runId, | ||
| Class<R> resultClass, | ||
| @Nullable java.lang.reflect.Type resultType) { | ||
| return new NexusOperationHandleImpl<>( | ||
| nexusClientCallsInvoker, | ||
| operationId, | ||
| runId, | ||
| options.getDataConverter(), | ||
| resultClass, | ||
| resultType); | ||
| } | ||
|
|
||
| @Override | ||
| public UntypedNexusServiceClient newUntypedNexusServiceClient( | ||
| String endpoint, String serviceName) { | ||
| return new UntypedNexusServiceClientImpl( | ||
| nexusClientCallsInvoker, endpoint, serviceName, options); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the head of the interceptor chain. Package-private so service-client builders can route | ||
| * start RPCs through the chain without exposing it on the public {@link NexusClient} interface. | ||
| */ | ||
| NexusClientCallsInterceptor getNexusClientCallsInvoker() { | ||
| return nexusClientCallsInvoker; | ||
| } | ||
|
|
||
| private static final int DEFAULT_LIST_PAGE_SIZE = 1000; | ||
|
|
||
| @Override | ||
| public Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions( | ||
| @Nullable String query) { | ||
| Iterator<NexusOperationExecutionMetadata> iter = | ||
| new ListPageIterator(nexusClientCallsInvoker, query, DEFAULT_LIST_PAGE_SIZE); | ||
| return StreamSupport.stream( | ||
| Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED | Spliterator.NONNULL), | ||
| false); | ||
| } | ||
|
|
||
| @Override | ||
| public NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query) { | ||
| CountNexusOperationExecutionsOutput out = | ||
| nexusClientCallsInvoker.countNexusOperationExecutions( | ||
| new CountNexusOperationExecutionsInput(query)); | ||
| List<NexusOperationExecutionCount.AggregationGroup> publicGroups = | ||
| out.getGroups().stream() | ||
| .map( | ||
| g -> | ||
| new NexusOperationExecutionCount.AggregationGroup( | ||
| g.getCount(), g.getGroupValues())) | ||
| .collect(Collectors.toList()); | ||
| return new NexusOperationExecutionCount(out.getCount(), publicGroups); | ||
| } | ||
|
|
||
| /** Lazily fetches pages from the interceptor and flattens them into a single iteration. */ | ||
| private static final class ListPageIterator implements Iterator<NexusOperationExecutionMetadata> { | ||
| private final NexusClientCallsInterceptor invoker; | ||
| private final @Nullable String query; | ||
| private final int pageSize; | ||
| private Iterator<NexusOperationExecutionMetadata> current = | ||
| java.util.Collections.emptyIterator(); | ||
| private @Nullable ByteString nextPageToken = null; | ||
| private boolean exhausted = false; | ||
|
|
||
| ListPageIterator(NexusClientCallsInterceptor invoker, @Nullable String query, int pageSize) { | ||
| this.invoker = invoker; | ||
| this.query = query; | ||
| this.pageSize = pageSize; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| while (!current.hasNext() && !exhausted) { | ||
| fetchNextPage(); | ||
| } | ||
| return current.hasNext(); | ||
| } | ||
|
|
||
| @Override | ||
| public NexusOperationExecutionMetadata next() { | ||
| if (!hasNext()) { | ||
| throw new NoSuchElementException(); | ||
| } | ||
| return current.next(); | ||
| } | ||
|
|
||
| private void fetchNextPage() { | ||
| ListNexusOperationExecutionsOutput page = | ||
| invoker.listNexusOperationExecutions( | ||
| new ListNexusOperationExecutionsInput(query, pageSize, nextPageToken)); | ||
| current = | ||
| page.getOperations().stream() | ||
| .map(NexusOperationExecutionMetadata::fromListInfo) | ||
| .iterator(); | ||
| ByteString token = page.getNextPageToken(); | ||
| if (token == null || token.isEmpty()) { | ||
| exhausted = true; | ||
| nextPageToken = null; | ||
| } else { | ||
| nextPageToken = token; | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,26 @@ | ||||||
| package io.temporal.client; | ||||||
|
|
||||||
| import io.temporal.api.workflowservice.v1.DescribeNexusOperationExecutionResponse; | ||||||
| import io.temporal.common.Experimental; | ||||||
|
|
||||||
| /** Snapshot of a standalone Nexus operation execution returned by describe/poll calls. */ | ||||||
| @Experimental | ||||||
| public final class NexusClientOperationExecutionDescription { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think this can just be? |
||||||
|
|
||||||
| private final DescribeNexusOperationExecutionResponse response; | ||||||
|
|
||||||
| public NexusClientOperationExecutionDescription( | ||||||
| DescribeNexusOperationExecutionResponse response) { | ||||||
| this.response = response; | ||||||
| } | ||||||
|
|
||||||
| /** Run ID of the operation described. */ | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect a lot more fields here like https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/client/ActivityExecutionDescription.java#L28 |
||||||
| public String getRunId() { | ||||||
| return response.getRunId(); | ||||||
| } | ||||||
|
|
||||||
| /** Underlying proto response. Exposed while the Nexus SDK surface is still experimental. */ | ||||||
| public DescribeNexusOperationExecutionResponse getRawResponse() { | ||||||
| return response; | ||||||
| } | ||||||
| } | ||||||
Uh oh!
There was an error while loading. Please reload this page.