diff --git a/client/build.gradle b/client/build.gradle index 9ba98dce..9ed1c737 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -57,7 +57,8 @@ compileTestJava { sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 options.fork = true - options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac" + def javacName = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'javac.exe' : 'javac' + options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/${javacName}" } task downloadProtoFiles { @@ -90,7 +91,9 @@ protobuf { } generateProtoTasks { all()*.plugins { grpc {} } - all()*.dependsOn downloadProtoFiles + if (project.gradle.startParameter.taskNames.any { it.contains('downloadProtoFiles') }) { + all()*.dependsOn downloadProtoFiles + } } } @@ -107,7 +110,8 @@ sourceSets { } tasks.withType(Test) { - executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", 'bin/java') + def javaName = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'java.exe' : 'java' + executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", "bin/${javaName}") } test { diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 552cf579..7415c695 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -34,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final DataConverter dataConverter; private final Duration maximumTimerInterval; private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; + private final ExceptionPropertiesProvider exceptionPropertiesProvider; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -65,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.versioningOptions = builder.versioningOptions; + this.exceptionPropertiesProvider = builder.exceptionPropertiesProvider; } /** @@ -118,7 +120,8 @@ public void startAndBlock() { this.dataConverter, this.maximumTimerInterval, logger, - this.versioningOptions); + this.versioningOptions, + this.exceptionPropertiesProvider); TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor( this.activityFactories, this.dataConverter, @@ -228,11 +231,9 @@ public void startAndBlock() { activityRequest.getInput().getValue(), activityRequest.getTaskId()); } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); + Exception ex = e instanceof Exception ? (Exception) e : new RuntimeException(e); + failureDetails = FailureDetails.fromException( + ex, this.exceptionPropertiesProvider).toProto(); } ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index ec39fee2..cba3a7e5 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -18,6 +18,7 @@ public final class DurableTaskGrpcWorkerBuilder { DataConverter dataConverter; Duration maximumTimerInterval; DurableTaskGrpcWorkerVersioningOptions versioningOptions; + ExceptionPropertiesProvider exceptionPropertiesProvider; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -125,6 +126,21 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin return this; } + /** + * Sets the {@link ExceptionPropertiesProvider} to use for extracting custom properties from exceptions. + *

+ * When set, the provider is invoked whenever an activity or orchestration fails with an exception. The returned + * properties are included in the {@link FailureDetails} and can be retrieved via + * {@link FailureDetails#getProperties()}. + * + * @param provider the exception properties provider + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder exceptionPropertiesProvider(ExceptionPropertiesProvider provider) { + this.exceptionPropertiesProvider = provider; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object diff --git a/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java b/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java new file mode 100644 index 00000000..96573be9 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * Provider interface for extracting custom properties from exceptions. + *

+ * Implementations of this interface can be registered with a {@link DurableTaskGrpcWorkerBuilder} to include + * custom exception properties in {@link FailureDetails} when activities or orchestrations fail. + * These properties are then available via {@link FailureDetails#getProperties()}. + *

+ * Example usage: + *

{@code
+ * DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ *     .exceptionPropertiesProvider(exception -> {
+ *         if (exception instanceof MyCustomException) {
+ *             MyCustomException custom = (MyCustomException) exception;
+ *             Map props = new HashMap<>();
+ *             props.put("errorCode", custom.getErrorCode());
+ *             props.put("retryable", custom.isRetryable());
+ *             return props;
+ *         }
+ *         return null;
+ *     })
+ *     .addOrchestration(...)
+ *     .build();
+ * }
+ */ +@FunctionalInterface +public interface ExceptionPropertiesProvider { + + /** + * Extracts custom properties from the given exception. + *

+ * Return {@code null} or an empty map if no custom properties should be included for this exception. + * + * @param exception the exception to extract properties from + * @return a map of property names to values, or {@code null} + */ + @Nullable + Map getExceptionProperties(Exception exception); +} diff --git a/client/src/main/java/com/microsoft/durabletask/FailureDetails.java b/client/src/main/java/com/microsoft/durabletask/FailureDetails.java index ad3f1ba1..42ac9713 100644 --- a/client/src/main/java/com/microsoft/durabletask/FailureDetails.java +++ b/client/src/main/java/com/microsoft/durabletask/FailureDetails.java @@ -2,11 +2,16 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.google.protobuf.NullValue; import com.google.protobuf.StringValue; +import com.google.protobuf.Value; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * Class that represents the details of a task failure. @@ -20,29 +25,76 @@ public final class FailureDetails { private final String errorMessage; private final String stackTrace; private final boolean isNonRetriable; + private final FailureDetails innerFailure; + private final Map properties; FailureDetails( String errorType, @Nullable String errorMessage, @Nullable String errorDetails, boolean isNonRetriable) { + this(errorType, errorMessage, errorDetails, isNonRetriable, null, null); + } + + FailureDetails( + String errorType, + @Nullable String errorMessage, + @Nullable String errorDetails, + boolean isNonRetriable, + @Nullable FailureDetails innerFailure, + @Nullable Map properties) { this.errorType = errorType; this.stackTrace = errorDetails; // Error message can be null for things like NullPointerException but the gRPC contract doesn't allow null this.errorMessage = errorMessage != null ? errorMessage : ""; this.isNonRetriable = isNonRetriable; + this.innerFailure = innerFailure; + this.properties = properties != null ? Collections.unmodifiableMap(new HashMap<>(properties)) : null; } FailureDetails(Exception exception) { - this(exception.getClass().getName(), exception.getMessage(), getFullStackTrace(exception), false); + this(exception.getClass().getName(), + exception.getMessage(), + getFullStackTrace(exception), + false, + exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), null) : null, + null); + } + + /** + * Creates a {@code FailureDetails} from an exception, optionally using the provided + * {@link ExceptionPropertiesProvider} to extract custom properties. + * + * @param exception the exception that caused the failure + * @param provider the provider for extracting custom properties, or {@code null} + * @return a new {@code FailureDetails} instance + */ + static FailureDetails fromException(Exception exception, @Nullable ExceptionPropertiesProvider provider) { + Map properties = null; + if (provider != null) { + try { + properties = provider.getExceptionProperties(exception); + } catch (Exception ignored) { + // Don't let provider errors mask the original failure + } + } + return new FailureDetails( + exception.getClass().getName(), + exception.getMessage(), + getFullStackTrace(exception), + false, + exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), provider) : null, + properties); } FailureDetails(TaskFailureDetails proto) { this(proto.getErrorType(), proto.getErrorMessage(), proto.getStackTrace().getValue(), - proto.getIsNonRetriable()); + proto.getIsNonRetriable(), + proto.hasInnerFailure() ? new FailureDetails(proto.getInnerFailure()) : null, + convertProtoProperties(proto.getPropertiesMap())); } /** @@ -86,6 +138,28 @@ public boolean isNonRetriable() { return this.isNonRetriable; } + /** + * Gets the inner failure that caused this failure, or {@code null} if there is no inner cause. + * + * @return the inner {@code FailureDetails} or {@code null} + */ + @Nullable + public FailureDetails getInnerFailure() { + return this.innerFailure; + } + + /** + * Gets additional properties associated with the exception, or {@code null} if no properties are available. + *

+ * The returned map is unmodifiable. + * + * @return an unmodifiable map of property names to values, or {@code null} + */ + @Nullable + public Map getProperties() { + return this.properties; + } + /** * Returns {@code true} if the task failure was provided by the specified exception type, otherwise {@code false}. *

@@ -112,6 +186,11 @@ public boolean isCausedBy(Class exceptionClass) { } } + @Override + public String toString() { + return this.errorType + ": " + this.errorMessage; + } + static String getFullStackTrace(Throwable e) { StackTraceElement[] elements = e.getStackTrace(); @@ -124,10 +203,98 @@ static String getFullStackTrace(Throwable e) { } TaskFailureDetails toProto() { - return TaskFailureDetails.newBuilder() + TaskFailureDetails.Builder builder = TaskFailureDetails.newBuilder() .setErrorType(this.getErrorType()) .setErrorMessage(this.getErrorMessage()) .setStackTrace(StringValue.of(this.getStackTrace() != null ? this.getStackTrace() : "")) - .build(); + .setIsNonRetriable(this.isNonRetriable); + + if (this.innerFailure != null) { + builder.setInnerFailure(this.innerFailure.toProto()); + } + + if (this.properties != null) { + builder.putAllProperties(convertToProtoProperties(this.properties)); + } + + return builder.build(); + } + + @Nullable + private static FailureDetails fromExceptionRecursive( + @Nullable Throwable exception, + @Nullable ExceptionPropertiesProvider provider) { + if (exception == null) { + return null; + } + Map properties = null; + if (provider != null && exception instanceof Exception) { + try { + properties = provider.getExceptionProperties((Exception) exception); + } catch (Exception ignored) { + // Don't let provider errors mask the original failure + } + } + return new FailureDetails( + exception.getClass().getName(), + exception.getMessage(), + getFullStackTrace(exception), + false, + exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), provider) : null, + properties); + } + + @Nullable + private static Map convertProtoProperties(Map protoProperties) { + if (protoProperties == null || protoProperties.isEmpty()) { + return null; + } + + Map result = new HashMap<>(); + for (Map.Entry entry : protoProperties.entrySet()) { + result.put(entry.getKey(), convertProtoValue(entry.getValue())); + } + return result; + } + + @Nullable + private static Object convertProtoValue(Value value) { + if (value == null) { + return null; + } + switch (value.getKindCase()) { + case NULL_VALUE: + return null; + case NUMBER_VALUE: + return value.getNumberValue(); + case STRING_VALUE: + return value.getStringValue(); + case BOOL_VALUE: + return value.getBoolValue(); + default: + return value.toString(); + } + } + + private static Map convertToProtoProperties(Map properties) { + Map result = new HashMap<>(); + for (Map.Entry entry : properties.entrySet()) { + result.put(entry.getKey(), convertToProtoValue(entry.getValue())); + } + return result; + } + + private static Value convertToProtoValue(@Nullable Object obj) { + if (obj == null) { + return Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build(); + } else if (obj instanceof Number) { + return Value.newBuilder().setNumberValue(((Number) obj).doubleValue()).build(); + } else if (obj instanceof Boolean) { + return Value.newBuilder().setBoolValue((Boolean) obj).build(); + } else if (obj instanceof String) { + return Value.newBuilder().setStringValue((String) obj).build(); + } else { + return Value.newBuilder().setStringValue(obj.toString()).build(); + } } -} \ No newline at end of file +} diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 9c70db02..9de15c68 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -31,6 +31,7 @@ final class TaskOrchestrationExecutor { private final Logger logger; private final Duration maximumTimerInterval; private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; + private final ExceptionPropertiesProvider exceptionPropertiesProvider; public TaskOrchestrationExecutor( HashMap orchestrationFactories, @@ -38,11 +39,22 @@ public TaskOrchestrationExecutor( Duration maximumTimerInterval, Logger logger, DurableTaskGrpcWorkerVersioningOptions versioningOptions) { + this(orchestrationFactories, dataConverter, maximumTimerInterval, logger, versioningOptions, null); + } + + public TaskOrchestrationExecutor( + HashMap orchestrationFactories, + DataConverter dataConverter, + Duration maximumTimerInterval, + Logger logger, + DurableTaskGrpcWorkerVersioningOptions versioningOptions, + ExceptionPropertiesProvider exceptionPropertiesProvider) { this.orchestrationFactories = orchestrationFactories; this.dataConverter = dataConverter; this.maximumTimerInterval = maximumTimerInterval; this.logger = logger; this.versioningOptions = versioningOptions; + this.exceptionPropertiesProvider = exceptionPropertiesProvider; } public TaskOrchestratorResult execute(List pastEvents, List newEvents) { @@ -68,7 +80,7 @@ public TaskOrchestratorResult execute(List pastEvents, List details.getProperties().put("newKey", "newValue")); + } + + @Test + void constructFromException_capturesInnerCause() { + IOException innerCause = new IOException("disk full"); + RuntimeException outer = new RuntimeException("operation failed", innerCause); + + FailureDetails details = new FailureDetails(outer); + + assertEquals("java.lang.RuntimeException", details.getErrorType()); + assertEquals("operation failed", details.getErrorMessage()); + assertNotNull(details.getStackTrace()); + assertFalse(details.isNonRetriable()); + + assertNotNull(details.getInnerFailure()); + FailureDetails inner = details.getInnerFailure(); + assertEquals("java.io.IOException", inner.getErrorType()); + assertEquals("disk full", inner.getErrorMessage()); + assertNull(inner.getInnerFailure()); + } + + @Test + void constructFromException_deeplyNestedCauses() { + NullPointerException root = new NullPointerException("null ref"); + IOException mid = new IOException("io error", root); + RuntimeException top = new RuntimeException("top", mid); + + FailureDetails details = new FailureDetails(top); + + assertNotNull(details.getInnerFailure()); + assertEquals("java.io.IOException", details.getInnerFailure().getErrorType()); + + assertNotNull(details.getInnerFailure().getInnerFailure()); + assertEquals("java.lang.NullPointerException", + details.getInnerFailure().getInnerFailure().getErrorType()); + + assertNull(details.getInnerFailure().getInnerFailure().getInnerFailure()); + } + + @Test + void constructFromException_noCause_innerFailureIsNull() { + IllegalStateException ex = new IllegalStateException("bad state"); + + FailureDetails details = new FailureDetails(ex); + + assertNull(details.getInnerFailure()); + assertNull(details.getProperties()); + } + + @Test + void constructFromException_nullMessage_defaultsToEmpty() { + NullPointerException ex = new NullPointerException(); + + FailureDetails details = new FailureDetails(ex); + + assertEquals("java.lang.NullPointerException", details.getErrorType()); + assertEquals("", details.getErrorMessage()); + } + + @Test + void toProto_roundTrip_basicFields() { + FailureDetails original = new FailureDetails( + "TestError", "test message", "stack trace", true); + + TaskFailureDetails proto = original.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertEquals(original.getErrorType(), roundTripped.getErrorType()); + assertEquals(original.getErrorMessage(), roundTripped.getErrorMessage()); + assertEquals(original.getStackTrace(), roundTripped.getStackTrace()); + assertEquals(original.isNonRetriable(), roundTripped.isNonRetriable()); + } + + @Test + void toProto_roundTrip_withInnerFailure() { + FailureDetails inner = new FailureDetails( + "InnerError", "inner msg", "inner stack", false); + FailureDetails outer = new FailureDetails( + "OuterError", "outer msg", "outer stack", false, inner, null); + + TaskFailureDetails proto = outer.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertEquals("OuterError", roundTripped.getErrorType()); + assertNotNull(roundTripped.getInnerFailure()); + assertEquals("InnerError", roundTripped.getInnerFailure().getErrorType()); + assertEquals("inner msg", roundTripped.getInnerFailure().getErrorMessage()); + } + + @Test + void toProto_roundTrip_withProperties() { + Map properties = new HashMap<>(); + properties.put("stringProp", "value1"); + properties.put("intProp", 42); + properties.put("longProp", 999999999L); + properties.put("boolProp", true); + properties.put("nullProp", null); + + FailureDetails original = new FailureDetails( + "TestError", "msg", "stack", false, null, properties); + + TaskFailureDetails proto = original.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertNotNull(roundTripped.getProperties()); + assertEquals(5, roundTripped.getProperties().size()); + assertEquals("value1", roundTripped.getProperties().get("stringProp")); + assertEquals(42.0, roundTripped.getProperties().get("intProp")); + assertEquals(999999999.0, roundTripped.getProperties().get("longProp")); + assertEquals(true, roundTripped.getProperties().get("boolProp")); + assertNull(roundTripped.getProperties().get("nullProp")); + } + + @Test + void toProto_roundTrip_withInnerFailureAndProperties() { + Map innerProps = new HashMap<>(); + innerProps.put("errorCode", "DISK_FULL"); + innerProps.put("retryCount", 3); + + FailureDetails inner = new FailureDetails( + "java.io.IOException", "disk full", "stack", false, null, innerProps); + FailureDetails outer = new FailureDetails( + "java.lang.RuntimeException", "operation failed", "outer stack", false, inner, null); + + TaskFailureDetails proto = outer.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertNull(roundTripped.getProperties()); + assertNotNull(roundTripped.getInnerFailure()); + + FailureDetails roundTrippedInner = roundTripped.getInnerFailure(); + assertNotNull(roundTrippedInner.getProperties()); + assertEquals("DISK_FULL", roundTrippedInner.getProperties().get("errorCode")); + assertEquals(3.0, roundTrippedInner.getProperties().get("retryCount")); + } + + @Test + void toProto_noInnerFailure_protoDoesNotHaveInnerFailure() { + FailureDetails details = new FailureDetails( + "TestError", "msg", "stack", false); + + TaskFailureDetails proto = details.toProto(); + + assertFalse(proto.hasInnerFailure()); + assertTrue(proto.getPropertiesMap().isEmpty()); + } + + @Test + void toProto_setsIsNonRetriable() { + FailureDetails details = new FailureDetails( + "TestError", "msg", "stack", true); + + TaskFailureDetails proto = details.toProto(); + + assertTrue(proto.getIsNonRetriable()); + } + + @Test + void toProto_fromException_roundTripsInnerFailure() { + IOException cause = new IOException("root cause"); + RuntimeException ex = new RuntimeException("outer", cause); + + FailureDetails details = new FailureDetails(ex); + TaskFailureDetails proto = details.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertEquals("java.lang.RuntimeException", roundTripped.getErrorType()); + assertNotNull(roundTripped.getInnerFailure()); + assertEquals("java.io.IOException", roundTripped.getInnerFailure().getErrorType()); + assertEquals("root cause", roundTripped.getInnerFailure().getErrorMessage()); + } + + @Test + void toString_returnsTypeAndMessage() { + FailureDetails details = new FailureDetails( + "java.lang.RuntimeException", "something went wrong", null, false); + + assertEquals("java.lang.RuntimeException: something went wrong", details.toString()); + } + + @Test + void fourArgConstructor_backwardCompatible() { + FailureDetails details = new FailureDetails( + "ErrorType", "message", "stack", true); + + assertEquals("ErrorType", details.getErrorType()); + assertEquals("message", details.getErrorMessage()); + assertEquals("stack", details.getStackTrace()); + assertTrue(details.isNonRetriable()); + assertNull(details.getInnerFailure()); + assertNull(details.getProperties()); + } + + @Test + void constructFromProto_withInnerFailureAndProperties() { + TaskFailureDetails innerProto = TaskFailureDetails.newBuilder() + .setErrorType("java.io.IOException") + .setErrorMessage("disk full") + .setStackTrace(StringValue.of("")) + .putProperties("retryable", Value.newBuilder().setBoolValue(false).build()) + .build(); + + TaskFailureDetails outerProto = TaskFailureDetails.newBuilder() + .setErrorType("java.lang.RuntimeException") + .setErrorMessage("wrapped") + .setStackTrace(StringValue.of("")) + .setInnerFailure(innerProto) + .putProperties("correlationId", Value.newBuilder().setStringValue("abc-123").build()) + .build(); + + FailureDetails details = new FailureDetails(outerProto); + + assertNotNull(details.getProperties()); + assertEquals("abc-123", details.getProperties().get("correlationId")); + + assertNotNull(details.getInnerFailure()); + assertNotNull(details.getInnerFailure().getProperties()); + assertEquals(false, details.getInnerFailure().getProperties().get("retryable")); + } + + @Test + void fromException_withProvider_extractsProperties() { + ExceptionPropertiesProvider provider = exception -> { + if (exception instanceof IllegalArgumentException) { + Map props = new HashMap<>(); + props.put("paramName", exception.getMessage()); + props.put("severity", 3); + props.put("isCritical", true); + return props; + } + return null; + }; + + IllegalArgumentException ex = new IllegalArgumentException("userId"); + + FailureDetails details = FailureDetails.fromException(ex, provider); + + assertEquals("java.lang.IllegalArgumentException", details.getErrorType()); + assertEquals("userId", details.getErrorMessage()); + assertNotNull(details.getProperties()); + assertEquals(3, details.getProperties().size()); + assertEquals("userId", details.getProperties().get("paramName")); + assertEquals(3, details.getProperties().get("severity")); + assertEquals(true, details.getProperties().get("isCritical")); + } + + @Test + void fromException_withProvider_propertiesOnInnerCauseToo() { + ExceptionPropertiesProvider provider = exception -> { + Map props = new HashMap<>(); + props.put("exceptionType", exception.getClass().getSimpleName()); + return props; + }; + + IOException inner = new IOException("disk full"); + RuntimeException outer = new RuntimeException("failed", inner); + + FailureDetails details = FailureDetails.fromException(outer, provider); + + assertNotNull(details.getProperties()); + assertEquals("RuntimeException", details.getProperties().get("exceptionType")); + + assertNotNull(details.getInnerFailure()); + assertNotNull(details.getInnerFailure().getProperties()); + assertEquals("IOException", details.getInnerFailure().getProperties().get("exceptionType")); + } + + @Test + void fromException_withProvider_returnsNull_noProperties() { + ExceptionPropertiesProvider provider = exception -> null; + + RuntimeException ex = new RuntimeException("test"); + + FailureDetails details = FailureDetails.fromException(ex, provider); + + assertNull(details.getProperties()); + } + + @Test + void fromException_withNullProvider_noProperties() { + RuntimeException ex = new RuntimeException("test"); + + FailureDetails details = FailureDetails.fromException(ex, null); + + assertEquals("java.lang.RuntimeException", details.getErrorType()); + assertEquals("test", details.getErrorMessage()); + assertNull(details.getProperties()); + } + + @Test + void fromException_providerThrows_gracefullyIgnored() { + ExceptionPropertiesProvider provider = exception -> { + throw new RuntimeException("provider error"); + }; + + IllegalStateException ex = new IllegalStateException("original error"); + + FailureDetails details = FailureDetails.fromException(ex, provider); + + assertEquals("java.lang.IllegalStateException", details.getErrorType()); + assertEquals("original error", details.getErrorMessage()); + assertNull(details.getProperties()); + } + + @Test + void fromException_withProvider_roundTripsViaProto() { + ExceptionPropertiesProvider provider = exception -> { + Map props = new HashMap<>(); + props.put("errorCode", "VALIDATION_FAILED"); + props.put("retryCount", 3); + props.put("isCritical", true); + return props; + }; + + IllegalArgumentException ex = new IllegalArgumentException("bad input"); + FailureDetails details = FailureDetails.fromException(ex, provider); + + TaskFailureDetails proto = details.toProto(); + FailureDetails roundTripped = new FailureDetails(proto); + + assertNotNull(roundTripped.getProperties()); + assertEquals("VALIDATION_FAILED", roundTripped.getProperties().get("errorCode")); + assertEquals(3.0, roundTripped.getProperties().get("retryCount")); + assertEquals(true, roundTripped.getProperties().get("isCritical")); + } + + @Test + void constructFromProto_withProperties_containsNullKey() { + // Properties map with a null-valued entry should be preserved + TaskFailureDetails proto = TaskFailureDetails.newBuilder() + .setErrorType("CustomException") + .setErrorMessage("msg") + .setStackTrace(StringValue.of("")) + .putProperties("presentKey", Value.newBuilder().setStringValue("value").build()) + .putProperties("nullKey", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()) + .build(); + + FailureDetails details = new FailureDetails(proto); + + assertNotNull(details.getProperties()); + assertEquals(2, details.getProperties().size()); + assertTrue(details.getProperties().containsKey("nullKey")); + assertNull(details.getProperties().get("nullKey")); + assertEquals("value", details.getProperties().get("presentKey")); + } +} diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index 3d3f9e98..0ef1ed22 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -fbe5bb20835678099fc51a44993ed9b045dee5a6 \ No newline at end of file +1caadbd7ecfdf5f2309acbeac28a3e36d16aa156 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 88928c3b..0c34d986 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -41,6 +41,7 @@ message TaskFailureDetails { google.protobuf.StringValue stackTrace = 3; TaskFailureDetails innerFailure = 4; bool isNonRetriable = 5; + map properties = 6; } enum OrchestrationStatus { @@ -95,6 +96,7 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + map tags = 5; } message TaskCompletedEvent { @@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; TraceContext parentTraceContext = 5; + map tags = 6; } message SubOrchestrationInstanceCompletedEvent { @@ -192,7 +195,7 @@ message EntityOperationCalledEvent { } message EntityLockRequestedEvent { - string criticalSectionId = 1; + string criticalSectionId = 1; repeated string lockSet = 2; int32 position = 3; google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories @@ -217,7 +220,19 @@ message EntityUnlockSentEvent { message EntityLockGrantedEvent { string criticalSectionId = 1; } - + +message ExecutionRewoundEvent { + google.protobuf.StringValue reason = 1; + google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise + TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue name = 5; // used by DTS backend only + google.protobuf.StringValue version = 6; // used by DTS backend only + google.protobuf.StringValue input = 7; // used by DTS backend only + ParentInstanceInfo parentInstance = 8; // used by DTS backend only + map tags = 9; // used by DTS backend only +} + message HistoryEvent { int32 eventId = 1; google.protobuf.Timestamp timestamp = 2; @@ -244,11 +259,12 @@ message HistoryEvent { ExecutionResumedEvent executionResumed = 22; EntityOperationSignaledEvent entityOperationSignaled = 23; EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; EntityLockRequestedEvent entityLockRequested = 27; EntityLockGrantedEvent entityLockGranted = 28; EntityUnlockSentEvent entityUnlockSent = 29; + ExecutionRewoundEvent executionRewound = 30; } } @@ -256,6 +272,8 @@ message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + map tags = 4; + TraceContext parentTraceContext = 5; } message CreateSubOrchestrationAction { @@ -263,6 +281,8 @@ message CreateSubOrchestrationAction { string name = 2; google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; + map tags = 6; } message CreateTimerAction { @@ -282,6 +302,7 @@ message CompleteOrchestrationAction { google.protobuf.StringValue newVersion = 4; repeated HistoryEvent carryoverEvents = 5; TaskFailureDetails failureDetails = 6; + map tags = 7; } message TerminateOrchestrationAction { @@ -312,6 +333,11 @@ message OrchestratorAction { } } +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + message OrchestratorRequest { string instanceId = 1; google.protobuf.StringValue executionId = 2; @@ -320,6 +346,8 @@ message OrchestratorRequest { OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; } message OrchestratorResponse { @@ -331,6 +359,17 @@ message OrchestratorResponse { // The number of work item events that were processed by the orchestrator. // This field is optional. If not set, the service should assume that the orchestrator processed all events. google.protobuf.Int32Value numEventsProcessed = 5; + OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; + + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). + bool isPartial = 8; + + // Zero-based position of the current chunk within a chunked completion sequence. + // This field is omitted for non-chunked completions. + google.protobuf.Int32Value chunkIndex = 9; } message CreateInstanceRequest { @@ -343,6 +382,7 @@ message CreateInstanceRequest { google.protobuf.StringValue executionId = 7; map tags = 8; TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; } message OrchestrationIdReusePolicy { @@ -449,12 +489,28 @@ message QueryInstancesResponse { google.protobuf.StringValue continuationToken = 2; } +message ListInstanceIdsRequest { + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp completedTimeFrom = 2; + google.protobuf.Timestamp completedTimeTo = 3; + int32 pageSize = 4; + google.protobuf.StringValue lastInstanceKey = 5; +} + +message ListInstanceIdsResponse { + repeated string instanceIds = 1; + google.protobuf.StringValue lastInstanceKey = 2; +} + message PurgeInstancesRequest { oneof request { string instanceId = 1; PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; } bool recursive = 3; + // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity) + bool isOrchestration = 5; } message PurgeInstanceFilter { @@ -468,6 +524,15 @@ message PurgeInstancesResponse { google.protobuf.BoolValue isComplete = 2; } +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + message CreateTaskHubRequest { bool recreateIfExists = 1; } @@ -490,10 +555,12 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; } message SignalEntityResponse { - // no payload + // no payload } message GetEntityRequest { @@ -553,6 +620,7 @@ message EntityBatchRequest { string instanceId = 1; google.protobuf.StringValue entityState = 2; repeated OperationRequest operations = 3; + map properties = 4; } message EntityBatchResult { @@ -562,6 +630,8 @@ message EntityBatchResult { TaskFailureDetails failureDetails = 4; string completionToken = 5; repeated OperationInfo operationInfos = 6; // used only with DTS + // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided. + bool requiresState = 7; } message EntityRequest { @@ -575,6 +645,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; } message OperationResult { @@ -591,10 +662,14 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationAction { @@ -610,6 +685,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -618,6 +695,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } message AbandonActivityTaskRequest { @@ -644,6 +723,17 @@ message AbandonEntityTaskResponse { // Empty. } +message SkipGracefulOrchestrationTerminationsRequest { + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) + repeated string unterminatedInstanceIds = 1; +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -657,18 +747,21 @@ service TaskHubSidecarService { // Rewinds an orchestration instance to last known good state and replays from there. rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + // Waits for an orchestration instance to reach a running or completion state. rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); // Raises an event to a running orchestration instance. rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - + // Terminates a running orchestration instance. rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - + // Suspends a running orchestration instance. rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); @@ -678,6 +771,9 @@ service TaskHubSidecarService { // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + + rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); @@ -714,6 +810,10 @@ service TaskHubSidecarService { // Abandon an entity work item rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); } message GetWorkItemsRequest { @@ -722,6 +822,7 @@ message GetWorkItemsRequest { int32 maxConcurrentEntityWorkItems = 3; repeated WorkerCapability capabilities = 10; + WorkItemFilters workItemFilters = 11; } enum WorkerCapability { @@ -732,6 +833,36 @@ enum WorkerCapability { // When set, the service may return work items without any history events as an optimization. // It is strongly recommended that all SDKs support this capability. WORKER_CAPABILITY_HISTORY_STREAMING = 1; + + // Indicates that the worker supports scheduled tasks. + // The service may send schedule-triggered orchestration work items, + // and the worker must handle them, including the scheduledTime field. + WORKER_CAPABILITY_SCHEDULED_TASKS = 2; + + // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage). + // Work items may contain URI references instead of inline data, and the worker must fetch them. + // This avoids message size limits and reduces network overhead. + WORKER_CAPABILITY_LARGE_PAYLOADS = 3; +} + +message WorkItemFilters { + repeated OrchestrationFilter orchestrations = 1; + repeated ActivityFilter activities = 2; + repeated EntityFilter entities = 3; +} + +message OrchestrationFilter { + string name = 1; + repeated string versions = 2; +} + +message ActivityFilter { + string name = 1; + repeated string versions = 2; +} + +message EntityFilter { + string name = 1; } message WorkItem { @@ -750,7 +881,7 @@ message CompleteTaskResponse { } message HealthPing { - // No payload + // No payload } message StreamInstanceHistoryRequest { @@ -764,3 +895,8 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; } + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; +}