diff --git a/client/build.gradle b/client/build.gradle
index 9ba98dce..9ed1c737 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -57,7 +57,8 @@ compileTestJava {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
options.fork = true
- options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac"
+ def javacName = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'javac.exe' : 'javac'
+ options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/${javacName}"
}
task downloadProtoFiles {
@@ -90,7 +91,9 @@ protobuf {
}
generateProtoTasks {
all()*.plugins { grpc {} }
- all()*.dependsOn downloadProtoFiles
+ if (project.gradle.startParameter.taskNames.any { it.contains('downloadProtoFiles') }) {
+ all()*.dependsOn downloadProtoFiles
+ }
}
}
@@ -107,7 +110,8 @@ sourceSets {
}
tasks.withType(Test) {
- executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", 'bin/java')
+ def javaName = org.gradle.internal.os.OperatingSystem.current().isWindows() ? 'java.exe' : 'java'
+ executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", "bin/${javaName}")
}
test {
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
index 552cf579..7415c695 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
@@ -34,6 +34,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
+ private final ExceptionPropertiesProvider exceptionPropertiesProvider;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
@@ -65,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.versioningOptions = builder.versioningOptions;
+ this.exceptionPropertiesProvider = builder.exceptionPropertiesProvider;
}
/**
@@ -118,7 +120,8 @@ public void startAndBlock() {
this.dataConverter,
this.maximumTimerInterval,
logger,
- this.versioningOptions);
+ this.versioningOptions,
+ this.exceptionPropertiesProvider);
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
this.activityFactories,
this.dataConverter,
@@ -228,11 +231,9 @@ public void startAndBlock() {
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
- failureDetails = TaskFailureDetails.newBuilder()
- .setErrorType(e.getClass().getName())
- .setErrorMessage(e.getMessage())
- .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
- .build();
+ Exception ex = e instanceof Exception ? (Exception) e : new RuntimeException(e);
+ failureDetails = FailureDetails.fromException(
+ ex, this.exceptionPropertiesProvider).toProto();
}
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
index ec39fee2..cba3a7e5 100644
--- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
+++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java
@@ -18,6 +18,7 @@ public final class DurableTaskGrpcWorkerBuilder {
DataConverter dataConverter;
Duration maximumTimerInterval;
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
+ ExceptionPropertiesProvider exceptionPropertiesProvider;
/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -125,6 +126,21 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin
return this;
}
+ /**
+ * Sets the {@link ExceptionPropertiesProvider} to use for extracting custom properties from exceptions.
+ *
+ * When set, the provider is invoked whenever an activity or orchestration fails with an exception. The returned
+ * properties are included in the {@link FailureDetails} and can be retrieved via
+ * {@link FailureDetails#getProperties()}.
+ *
+ * @param provider the exception properties provider
+ * @return this builder object
+ */
+ public DurableTaskGrpcWorkerBuilder exceptionPropertiesProvider(ExceptionPropertiesProvider provider) {
+ this.exceptionPropertiesProvider = provider;
+ return this;
+ }
+
/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskGrpcWorker} object
diff --git a/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java b/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java
new file mode 100644
index 00000000..96573be9
--- /dev/null
+++ b/client/src/main/java/com/microsoft/durabletask/ExceptionPropertiesProvider.java
@@ -0,0 +1,45 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.microsoft.durabletask;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Provider interface for extracting custom properties from exceptions.
+ *
+ * Implementations of this interface can be registered with a {@link DurableTaskGrpcWorkerBuilder} to include
+ * custom exception properties in {@link FailureDetails} when activities or orchestrations fail.
+ * These properties are then available via {@link FailureDetails#getProperties()}.
+ *
+ * Example usage:
+ *
{@code
+ * DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ * .exceptionPropertiesProvider(exception -> {
+ * if (exception instanceof MyCustomException) {
+ * MyCustomException custom = (MyCustomException) exception;
+ * Map props = new HashMap<>();
+ * props.put("errorCode", custom.getErrorCode());
+ * props.put("retryable", custom.isRetryable());
+ * return props;
+ * }
+ * return null;
+ * })
+ * .addOrchestration(...)
+ * .build();
+ * }
+ */
+@FunctionalInterface
+public interface ExceptionPropertiesProvider {
+
+ /**
+ * Extracts custom properties from the given exception.
+ *
+ * Return {@code null} or an empty map if no custom properties should be included for this exception.
+ *
+ * @param exception the exception to extract properties from
+ * @return a map of property names to values, or {@code null}
+ */
+ @Nullable
+ Map getExceptionProperties(Exception exception);
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/FailureDetails.java b/client/src/main/java/com/microsoft/durabletask/FailureDetails.java
index ad3f1ba1..42ac9713 100644
--- a/client/src/main/java/com/microsoft/durabletask/FailureDetails.java
+++ b/client/src/main/java/com/microsoft/durabletask/FailureDetails.java
@@ -2,11 +2,16 @@
// Licensed under the MIT License.
package com.microsoft.durabletask;
+import com.google.protobuf.NullValue;
import com.google.protobuf.StringValue;
+import com.google.protobuf.Value;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
/**
* Class that represents the details of a task failure.
@@ -20,29 +25,76 @@ public final class FailureDetails {
private final String errorMessage;
private final String stackTrace;
private final boolean isNonRetriable;
+ private final FailureDetails innerFailure;
+ private final Map properties;
FailureDetails(
String errorType,
@Nullable String errorMessage,
@Nullable String errorDetails,
boolean isNonRetriable) {
+ this(errorType, errorMessage, errorDetails, isNonRetriable, null, null);
+ }
+
+ FailureDetails(
+ String errorType,
+ @Nullable String errorMessage,
+ @Nullable String errorDetails,
+ boolean isNonRetriable,
+ @Nullable FailureDetails innerFailure,
+ @Nullable Map properties) {
this.errorType = errorType;
this.stackTrace = errorDetails;
// Error message can be null for things like NullPointerException but the gRPC contract doesn't allow null
this.errorMessage = errorMessage != null ? errorMessage : "";
this.isNonRetriable = isNonRetriable;
+ this.innerFailure = innerFailure;
+ this.properties = properties != null ? Collections.unmodifiableMap(new HashMap<>(properties)) : null;
}
FailureDetails(Exception exception) {
- this(exception.getClass().getName(), exception.getMessage(), getFullStackTrace(exception), false);
+ this(exception.getClass().getName(),
+ exception.getMessage(),
+ getFullStackTrace(exception),
+ false,
+ exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), null) : null,
+ null);
+ }
+
+ /**
+ * Creates a {@code FailureDetails} from an exception, optionally using the provided
+ * {@link ExceptionPropertiesProvider} to extract custom properties.
+ *
+ * @param exception the exception that caused the failure
+ * @param provider the provider for extracting custom properties, or {@code null}
+ * @return a new {@code FailureDetails} instance
+ */
+ static FailureDetails fromException(Exception exception, @Nullable ExceptionPropertiesProvider provider) {
+ Map properties = null;
+ if (provider != null) {
+ try {
+ properties = provider.getExceptionProperties(exception);
+ } catch (Exception ignored) {
+ // Don't let provider errors mask the original failure
+ }
+ }
+ return new FailureDetails(
+ exception.getClass().getName(),
+ exception.getMessage(),
+ getFullStackTrace(exception),
+ false,
+ exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), provider) : null,
+ properties);
}
FailureDetails(TaskFailureDetails proto) {
this(proto.getErrorType(),
proto.getErrorMessage(),
proto.getStackTrace().getValue(),
- proto.getIsNonRetriable());
+ proto.getIsNonRetriable(),
+ proto.hasInnerFailure() ? new FailureDetails(proto.getInnerFailure()) : null,
+ convertProtoProperties(proto.getPropertiesMap()));
}
/**
@@ -86,6 +138,28 @@ public boolean isNonRetriable() {
return this.isNonRetriable;
}
+ /**
+ * Gets the inner failure that caused this failure, or {@code null} if there is no inner cause.
+ *
+ * @return the inner {@code FailureDetails} or {@code null}
+ */
+ @Nullable
+ public FailureDetails getInnerFailure() {
+ return this.innerFailure;
+ }
+
+ /**
+ * Gets additional properties associated with the exception, or {@code null} if no properties are available.
+ *
+ * The returned map is unmodifiable.
+ *
+ * @return an unmodifiable map of property names to values, or {@code null}
+ */
+ @Nullable
+ public Map getProperties() {
+ return this.properties;
+ }
+
/**
* Returns {@code true} if the task failure was provided by the specified exception type, otherwise {@code false}.
*
@@ -112,6 +186,11 @@ public boolean isCausedBy(Class extends Exception> exceptionClass) {
}
}
+ @Override
+ public String toString() {
+ return this.errorType + ": " + this.errorMessage;
+ }
+
static String getFullStackTrace(Throwable e) {
StackTraceElement[] elements = e.getStackTrace();
@@ -124,10 +203,98 @@ static String getFullStackTrace(Throwable e) {
}
TaskFailureDetails toProto() {
- return TaskFailureDetails.newBuilder()
+ TaskFailureDetails.Builder builder = TaskFailureDetails.newBuilder()
.setErrorType(this.getErrorType())
.setErrorMessage(this.getErrorMessage())
.setStackTrace(StringValue.of(this.getStackTrace() != null ? this.getStackTrace() : ""))
- .build();
+ .setIsNonRetriable(this.isNonRetriable);
+
+ if (this.innerFailure != null) {
+ builder.setInnerFailure(this.innerFailure.toProto());
+ }
+
+ if (this.properties != null) {
+ builder.putAllProperties(convertToProtoProperties(this.properties));
+ }
+
+ return builder.build();
+ }
+
+ @Nullable
+ private static FailureDetails fromExceptionRecursive(
+ @Nullable Throwable exception,
+ @Nullable ExceptionPropertiesProvider provider) {
+ if (exception == null) {
+ return null;
+ }
+ Map properties = null;
+ if (provider != null && exception instanceof Exception) {
+ try {
+ properties = provider.getExceptionProperties((Exception) exception);
+ } catch (Exception ignored) {
+ // Don't let provider errors mask the original failure
+ }
+ }
+ return new FailureDetails(
+ exception.getClass().getName(),
+ exception.getMessage(),
+ getFullStackTrace(exception),
+ false,
+ exception.getCause() != null ? fromExceptionRecursive(exception.getCause(), provider) : null,
+ properties);
+ }
+
+ @Nullable
+ private static Map convertProtoProperties(Map protoProperties) {
+ if (protoProperties == null || protoProperties.isEmpty()) {
+ return null;
+ }
+
+ Map result = new HashMap<>();
+ for (Map.Entry entry : protoProperties.entrySet()) {
+ result.put(entry.getKey(), convertProtoValue(entry.getValue()));
+ }
+ return result;
+ }
+
+ @Nullable
+ private static Object convertProtoValue(Value value) {
+ if (value == null) {
+ return null;
+ }
+ switch (value.getKindCase()) {
+ case NULL_VALUE:
+ return null;
+ case NUMBER_VALUE:
+ return value.getNumberValue();
+ case STRING_VALUE:
+ return value.getStringValue();
+ case BOOL_VALUE:
+ return value.getBoolValue();
+ default:
+ return value.toString();
+ }
+ }
+
+ private static Map convertToProtoProperties(Map properties) {
+ Map result = new HashMap<>();
+ for (Map.Entry entry : properties.entrySet()) {
+ result.put(entry.getKey(), convertToProtoValue(entry.getValue()));
+ }
+ return result;
+ }
+
+ private static Value convertToProtoValue(@Nullable Object obj) {
+ if (obj == null) {
+ return Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
+ } else if (obj instanceof Number) {
+ return Value.newBuilder().setNumberValue(((Number) obj).doubleValue()).build();
+ } else if (obj instanceof Boolean) {
+ return Value.newBuilder().setBoolValue((Boolean) obj).build();
+ } else if (obj instanceof String) {
+ return Value.newBuilder().setStringValue((String) obj).build();
+ } else {
+ return Value.newBuilder().setStringValue(obj.toString()).build();
+ }
}
-}
\ No newline at end of file
+}
diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java
index 9c70db02..9de15c68 100644
--- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java
+++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java
@@ -31,6 +31,7 @@ final class TaskOrchestrationExecutor {
private final Logger logger;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
+ private final ExceptionPropertiesProvider exceptionPropertiesProvider;
public TaskOrchestrationExecutor(
HashMap orchestrationFactories,
@@ -38,11 +39,22 @@ public TaskOrchestrationExecutor(
Duration maximumTimerInterval,
Logger logger,
DurableTaskGrpcWorkerVersioningOptions versioningOptions) {
+ this(orchestrationFactories, dataConverter, maximumTimerInterval, logger, versioningOptions, null);
+ }
+
+ public TaskOrchestrationExecutor(
+ HashMap orchestrationFactories,
+ DataConverter dataConverter,
+ Duration maximumTimerInterval,
+ Logger logger,
+ DurableTaskGrpcWorkerVersioningOptions versioningOptions,
+ ExceptionPropertiesProvider exceptionPropertiesProvider) {
this.orchestrationFactories = orchestrationFactories;
this.dataConverter = dataConverter;
this.maximumTimerInterval = maximumTimerInterval;
this.logger = logger;
this.versioningOptions = versioningOptions;
+ this.exceptionPropertiesProvider = exceptionPropertiesProvider;
}
public TaskOrchestratorResult execute(List pastEvents, List newEvents) {
@@ -68,7 +80,7 @@ public TaskOrchestratorResult execute(List pastEvents, List details.getProperties().put("newKey", "newValue"));
+ }
+
+ @Test
+ void constructFromException_capturesInnerCause() {
+ IOException innerCause = new IOException("disk full");
+ RuntimeException outer = new RuntimeException("operation failed", innerCause);
+
+ FailureDetails details = new FailureDetails(outer);
+
+ assertEquals("java.lang.RuntimeException", details.getErrorType());
+ assertEquals("operation failed", details.getErrorMessage());
+ assertNotNull(details.getStackTrace());
+ assertFalse(details.isNonRetriable());
+
+ assertNotNull(details.getInnerFailure());
+ FailureDetails inner = details.getInnerFailure();
+ assertEquals("java.io.IOException", inner.getErrorType());
+ assertEquals("disk full", inner.getErrorMessage());
+ assertNull(inner.getInnerFailure());
+ }
+
+ @Test
+ void constructFromException_deeplyNestedCauses() {
+ NullPointerException root = new NullPointerException("null ref");
+ IOException mid = new IOException("io error", root);
+ RuntimeException top = new RuntimeException("top", mid);
+
+ FailureDetails details = new FailureDetails(top);
+
+ assertNotNull(details.getInnerFailure());
+ assertEquals("java.io.IOException", details.getInnerFailure().getErrorType());
+
+ assertNotNull(details.getInnerFailure().getInnerFailure());
+ assertEquals("java.lang.NullPointerException",
+ details.getInnerFailure().getInnerFailure().getErrorType());
+
+ assertNull(details.getInnerFailure().getInnerFailure().getInnerFailure());
+ }
+
+ @Test
+ void constructFromException_noCause_innerFailureIsNull() {
+ IllegalStateException ex = new IllegalStateException("bad state");
+
+ FailureDetails details = new FailureDetails(ex);
+
+ assertNull(details.getInnerFailure());
+ assertNull(details.getProperties());
+ }
+
+ @Test
+ void constructFromException_nullMessage_defaultsToEmpty() {
+ NullPointerException ex = new NullPointerException();
+
+ FailureDetails details = new FailureDetails(ex);
+
+ assertEquals("java.lang.NullPointerException", details.getErrorType());
+ assertEquals("", details.getErrorMessage());
+ }
+
+ @Test
+ void toProto_roundTrip_basicFields() {
+ FailureDetails original = new FailureDetails(
+ "TestError", "test message", "stack trace", true);
+
+ TaskFailureDetails proto = original.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertEquals(original.getErrorType(), roundTripped.getErrorType());
+ assertEquals(original.getErrorMessage(), roundTripped.getErrorMessage());
+ assertEquals(original.getStackTrace(), roundTripped.getStackTrace());
+ assertEquals(original.isNonRetriable(), roundTripped.isNonRetriable());
+ }
+
+ @Test
+ void toProto_roundTrip_withInnerFailure() {
+ FailureDetails inner = new FailureDetails(
+ "InnerError", "inner msg", "inner stack", false);
+ FailureDetails outer = new FailureDetails(
+ "OuterError", "outer msg", "outer stack", false, inner, null);
+
+ TaskFailureDetails proto = outer.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertEquals("OuterError", roundTripped.getErrorType());
+ assertNotNull(roundTripped.getInnerFailure());
+ assertEquals("InnerError", roundTripped.getInnerFailure().getErrorType());
+ assertEquals("inner msg", roundTripped.getInnerFailure().getErrorMessage());
+ }
+
+ @Test
+ void toProto_roundTrip_withProperties() {
+ Map properties = new HashMap<>();
+ properties.put("stringProp", "value1");
+ properties.put("intProp", 42);
+ properties.put("longProp", 999999999L);
+ properties.put("boolProp", true);
+ properties.put("nullProp", null);
+
+ FailureDetails original = new FailureDetails(
+ "TestError", "msg", "stack", false, null, properties);
+
+ TaskFailureDetails proto = original.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertNotNull(roundTripped.getProperties());
+ assertEquals(5, roundTripped.getProperties().size());
+ assertEquals("value1", roundTripped.getProperties().get("stringProp"));
+ assertEquals(42.0, roundTripped.getProperties().get("intProp"));
+ assertEquals(999999999.0, roundTripped.getProperties().get("longProp"));
+ assertEquals(true, roundTripped.getProperties().get("boolProp"));
+ assertNull(roundTripped.getProperties().get("nullProp"));
+ }
+
+ @Test
+ void toProto_roundTrip_withInnerFailureAndProperties() {
+ Map innerProps = new HashMap<>();
+ innerProps.put("errorCode", "DISK_FULL");
+ innerProps.put("retryCount", 3);
+
+ FailureDetails inner = new FailureDetails(
+ "java.io.IOException", "disk full", "stack", false, null, innerProps);
+ FailureDetails outer = new FailureDetails(
+ "java.lang.RuntimeException", "operation failed", "outer stack", false, inner, null);
+
+ TaskFailureDetails proto = outer.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertNull(roundTripped.getProperties());
+ assertNotNull(roundTripped.getInnerFailure());
+
+ FailureDetails roundTrippedInner = roundTripped.getInnerFailure();
+ assertNotNull(roundTrippedInner.getProperties());
+ assertEquals("DISK_FULL", roundTrippedInner.getProperties().get("errorCode"));
+ assertEquals(3.0, roundTrippedInner.getProperties().get("retryCount"));
+ }
+
+ @Test
+ void toProto_noInnerFailure_protoDoesNotHaveInnerFailure() {
+ FailureDetails details = new FailureDetails(
+ "TestError", "msg", "stack", false);
+
+ TaskFailureDetails proto = details.toProto();
+
+ assertFalse(proto.hasInnerFailure());
+ assertTrue(proto.getPropertiesMap().isEmpty());
+ }
+
+ @Test
+ void toProto_setsIsNonRetriable() {
+ FailureDetails details = new FailureDetails(
+ "TestError", "msg", "stack", true);
+
+ TaskFailureDetails proto = details.toProto();
+
+ assertTrue(proto.getIsNonRetriable());
+ }
+
+ @Test
+ void toProto_fromException_roundTripsInnerFailure() {
+ IOException cause = new IOException("root cause");
+ RuntimeException ex = new RuntimeException("outer", cause);
+
+ FailureDetails details = new FailureDetails(ex);
+ TaskFailureDetails proto = details.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertEquals("java.lang.RuntimeException", roundTripped.getErrorType());
+ assertNotNull(roundTripped.getInnerFailure());
+ assertEquals("java.io.IOException", roundTripped.getInnerFailure().getErrorType());
+ assertEquals("root cause", roundTripped.getInnerFailure().getErrorMessage());
+ }
+
+ @Test
+ void toString_returnsTypeAndMessage() {
+ FailureDetails details = new FailureDetails(
+ "java.lang.RuntimeException", "something went wrong", null, false);
+
+ assertEquals("java.lang.RuntimeException: something went wrong", details.toString());
+ }
+
+ @Test
+ void fourArgConstructor_backwardCompatible() {
+ FailureDetails details = new FailureDetails(
+ "ErrorType", "message", "stack", true);
+
+ assertEquals("ErrorType", details.getErrorType());
+ assertEquals("message", details.getErrorMessage());
+ assertEquals("stack", details.getStackTrace());
+ assertTrue(details.isNonRetriable());
+ assertNull(details.getInnerFailure());
+ assertNull(details.getProperties());
+ }
+
+ @Test
+ void constructFromProto_withInnerFailureAndProperties() {
+ TaskFailureDetails innerProto = TaskFailureDetails.newBuilder()
+ .setErrorType("java.io.IOException")
+ .setErrorMessage("disk full")
+ .setStackTrace(StringValue.of(""))
+ .putProperties("retryable", Value.newBuilder().setBoolValue(false).build())
+ .build();
+
+ TaskFailureDetails outerProto = TaskFailureDetails.newBuilder()
+ .setErrorType("java.lang.RuntimeException")
+ .setErrorMessage("wrapped")
+ .setStackTrace(StringValue.of(""))
+ .setInnerFailure(innerProto)
+ .putProperties("correlationId", Value.newBuilder().setStringValue("abc-123").build())
+ .build();
+
+ FailureDetails details = new FailureDetails(outerProto);
+
+ assertNotNull(details.getProperties());
+ assertEquals("abc-123", details.getProperties().get("correlationId"));
+
+ assertNotNull(details.getInnerFailure());
+ assertNotNull(details.getInnerFailure().getProperties());
+ assertEquals(false, details.getInnerFailure().getProperties().get("retryable"));
+ }
+
+ @Test
+ void fromException_withProvider_extractsProperties() {
+ ExceptionPropertiesProvider provider = exception -> {
+ if (exception instanceof IllegalArgumentException) {
+ Map props = new HashMap<>();
+ props.put("paramName", exception.getMessage());
+ props.put("severity", 3);
+ props.put("isCritical", true);
+ return props;
+ }
+ return null;
+ };
+
+ IllegalArgumentException ex = new IllegalArgumentException("userId");
+
+ FailureDetails details = FailureDetails.fromException(ex, provider);
+
+ assertEquals("java.lang.IllegalArgumentException", details.getErrorType());
+ assertEquals("userId", details.getErrorMessage());
+ assertNotNull(details.getProperties());
+ assertEquals(3, details.getProperties().size());
+ assertEquals("userId", details.getProperties().get("paramName"));
+ assertEquals(3, details.getProperties().get("severity"));
+ assertEquals(true, details.getProperties().get("isCritical"));
+ }
+
+ @Test
+ void fromException_withProvider_propertiesOnInnerCauseToo() {
+ ExceptionPropertiesProvider provider = exception -> {
+ Map props = new HashMap<>();
+ props.put("exceptionType", exception.getClass().getSimpleName());
+ return props;
+ };
+
+ IOException inner = new IOException("disk full");
+ RuntimeException outer = new RuntimeException("failed", inner);
+
+ FailureDetails details = FailureDetails.fromException(outer, provider);
+
+ assertNotNull(details.getProperties());
+ assertEquals("RuntimeException", details.getProperties().get("exceptionType"));
+
+ assertNotNull(details.getInnerFailure());
+ assertNotNull(details.getInnerFailure().getProperties());
+ assertEquals("IOException", details.getInnerFailure().getProperties().get("exceptionType"));
+ }
+
+ @Test
+ void fromException_withProvider_returnsNull_noProperties() {
+ ExceptionPropertiesProvider provider = exception -> null;
+
+ RuntimeException ex = new RuntimeException("test");
+
+ FailureDetails details = FailureDetails.fromException(ex, provider);
+
+ assertNull(details.getProperties());
+ }
+
+ @Test
+ void fromException_withNullProvider_noProperties() {
+ RuntimeException ex = new RuntimeException("test");
+
+ FailureDetails details = FailureDetails.fromException(ex, null);
+
+ assertEquals("java.lang.RuntimeException", details.getErrorType());
+ assertEquals("test", details.getErrorMessage());
+ assertNull(details.getProperties());
+ }
+
+ @Test
+ void fromException_providerThrows_gracefullyIgnored() {
+ ExceptionPropertiesProvider provider = exception -> {
+ throw new RuntimeException("provider error");
+ };
+
+ IllegalStateException ex = new IllegalStateException("original error");
+
+ FailureDetails details = FailureDetails.fromException(ex, provider);
+
+ assertEquals("java.lang.IllegalStateException", details.getErrorType());
+ assertEquals("original error", details.getErrorMessage());
+ assertNull(details.getProperties());
+ }
+
+ @Test
+ void fromException_withProvider_roundTripsViaProto() {
+ ExceptionPropertiesProvider provider = exception -> {
+ Map props = new HashMap<>();
+ props.put("errorCode", "VALIDATION_FAILED");
+ props.put("retryCount", 3);
+ props.put("isCritical", true);
+ return props;
+ };
+
+ IllegalArgumentException ex = new IllegalArgumentException("bad input");
+ FailureDetails details = FailureDetails.fromException(ex, provider);
+
+ TaskFailureDetails proto = details.toProto();
+ FailureDetails roundTripped = new FailureDetails(proto);
+
+ assertNotNull(roundTripped.getProperties());
+ assertEquals("VALIDATION_FAILED", roundTripped.getProperties().get("errorCode"));
+ assertEquals(3.0, roundTripped.getProperties().get("retryCount"));
+ assertEquals(true, roundTripped.getProperties().get("isCritical"));
+ }
+
+ @Test
+ void constructFromProto_withProperties_containsNullKey() {
+ // Properties map with a null-valued entry should be preserved
+ TaskFailureDetails proto = TaskFailureDetails.newBuilder()
+ .setErrorType("CustomException")
+ .setErrorMessage("msg")
+ .setStackTrace(StringValue.of(""))
+ .putProperties("presentKey", Value.newBuilder().setStringValue("value").build())
+ .putProperties("nullKey", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
+ .build();
+
+ FailureDetails details = new FailureDetails(proto);
+
+ assertNotNull(details.getProperties());
+ assertEquals(2, details.getProperties().size());
+ assertTrue(details.getProperties().containsKey("nullKey"));
+ assertNull(details.getProperties().get("nullKey"));
+ assertEquals("value", details.getProperties().get("presentKey"));
+ }
+}
diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
index 3d3f9e98..0ef1ed22 100644
--- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
+++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH
@@ -1 +1 @@
-fbe5bb20835678099fc51a44993ed9b045dee5a6
\ No newline at end of file
+1caadbd7ecfdf5f2309acbeac28a3e36d16aa156
\ No newline at end of file
diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto
index 88928c3b..0c34d986 100644
--- a/internal/durabletask-protobuf/protos/orchestrator_service.proto
+++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto
@@ -41,6 +41,7 @@ message TaskFailureDetails {
google.protobuf.StringValue stackTrace = 3;
TaskFailureDetails innerFailure = 4;
bool isNonRetriable = 5;
+ map properties = 6;
}
enum OrchestrationStatus {
@@ -95,6 +96,7 @@ message TaskScheduledEvent {
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
TraceContext parentTraceContext = 4;
+ map tags = 5;
}
message TaskCompletedEvent {
@@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message SubOrchestrationInstanceCompletedEvent {
@@ -192,7 +195,7 @@ message EntityOperationCalledEvent {
}
message EntityLockRequestedEvent {
- string criticalSectionId = 1;
+ string criticalSectionId = 1;
repeated string lockSet = 2;
int32 position = 3;
google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories
@@ -217,7 +220,19 @@ message EntityUnlockSentEvent {
message EntityLockGrantedEvent {
string criticalSectionId = 1;
}
-
+
+message ExecutionRewoundEvent {
+ google.protobuf.StringValue reason = 1;
+ google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
+ TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
+ google.protobuf.StringValue name = 5; // used by DTS backend only
+ google.protobuf.StringValue version = 6; // used by DTS backend only
+ google.protobuf.StringValue input = 7; // used by DTS backend only
+ ParentInstanceInfo parentInstance = 8; // used by DTS backend only
+ map tags = 9; // used by DTS backend only
+}
+
message HistoryEvent {
int32 eventId = 1;
google.protobuf.Timestamp timestamp = 2;
@@ -244,11 +259,12 @@ message HistoryEvent {
ExecutionResumedEvent executionResumed = 22;
EntityOperationSignaledEvent entityOperationSignaled = 23;
EntityOperationCalledEvent entityOperationCalled = 24;
- EntityOperationCompletedEvent entityOperationCompleted = 25;
- EntityOperationFailedEvent entityOperationFailed = 26;
+ EntityOperationCompletedEvent entityOperationCompleted = 25;
+ EntityOperationFailedEvent entityOperationFailed = 26;
EntityLockRequestedEvent entityLockRequested = 27;
EntityLockGrantedEvent entityLockGranted = 28;
EntityUnlockSentEvent entityUnlockSent = 29;
+ ExecutionRewoundEvent executionRewound = 30;
}
}
@@ -256,6 +272,8 @@ message ScheduleTaskAction {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
+ map tags = 4;
+ TraceContext parentTraceContext = 5;
}
message CreateSubOrchestrationAction {
@@ -263,6 +281,8 @@ message CreateSubOrchestrationAction {
string name = 2;
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
+ TraceContext parentTraceContext = 5;
+ map tags = 6;
}
message CreateTimerAction {
@@ -282,6 +302,7 @@ message CompleteOrchestrationAction {
google.protobuf.StringValue newVersion = 4;
repeated HistoryEvent carryoverEvents = 5;
TaskFailureDetails failureDetails = 6;
+ map tags = 7;
}
message TerminateOrchestrationAction {
@@ -312,6 +333,11 @@ message OrchestratorAction {
}
}
+message OrchestrationTraceContext {
+ google.protobuf.StringValue spanID = 1;
+ google.protobuf.Timestamp spanStartTime = 2;
+}
+
message OrchestratorRequest {
string instanceId = 1;
google.protobuf.StringValue executionId = 2;
@@ -320,6 +346,8 @@ message OrchestratorRequest {
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map properties = 7;
+
+ OrchestrationTraceContext orchestrationTraceContext = 8;
}
message OrchestratorResponse {
@@ -331,6 +359,17 @@ message OrchestratorResponse {
// The number of work item events that were processed by the orchestrator.
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
google.protobuf.Int32Value numEventsProcessed = 5;
+ OrchestrationTraceContext orchestrationTraceContext = 6;
+
+ // Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
+ bool requiresHistory = 7;
+
+ // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
+ bool isPartial = 8;
+
+ // Zero-based position of the current chunk within a chunked completion sequence.
+ // This field is omitted for non-chunked completions.
+ google.protobuf.Int32Value chunkIndex = 9;
}
message CreateInstanceRequest {
@@ -343,6 +382,7 @@ message CreateInstanceRequest {
google.protobuf.StringValue executionId = 7;
map tags = 8;
TraceContext parentTraceContext = 9;
+ google.protobuf.Timestamp requestTime = 10;
}
message OrchestrationIdReusePolicy {
@@ -449,12 +489,28 @@ message QueryInstancesResponse {
google.protobuf.StringValue continuationToken = 2;
}
+message ListInstanceIdsRequest {
+ repeated OrchestrationStatus runtimeStatus = 1;
+ google.protobuf.Timestamp completedTimeFrom = 2;
+ google.protobuf.Timestamp completedTimeTo = 3;
+ int32 pageSize = 4;
+ google.protobuf.StringValue lastInstanceKey = 5;
+}
+
+message ListInstanceIdsResponse {
+ repeated string instanceIds = 1;
+ google.protobuf.StringValue lastInstanceKey = 2;
+}
+
message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
+ InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
+ // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity)
+ bool isOrchestration = 5;
}
message PurgeInstanceFilter {
@@ -468,6 +524,15 @@ message PurgeInstancesResponse {
google.protobuf.BoolValue isComplete = 2;
}
+message RestartInstanceRequest {
+ string instanceId = 1;
+ bool restartWithNewInstanceId = 2;
+}
+
+message RestartInstanceResponse {
+ string instanceId = 1;
+}
+
message CreateTaskHubRequest {
bool recreateIfExists = 1;
}
@@ -490,10 +555,12 @@ message SignalEntityRequest {
google.protobuf.StringValue input = 3;
string requestId = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ TraceContext parentTraceContext = 6;
+ google.protobuf.Timestamp requestTime = 7;
}
message SignalEntityResponse {
- // no payload
+ // no payload
}
message GetEntityRequest {
@@ -553,6 +620,7 @@ message EntityBatchRequest {
string instanceId = 1;
google.protobuf.StringValue entityState = 2;
repeated OperationRequest operations = 3;
+ map properties = 4;
}
message EntityBatchResult {
@@ -562,6 +630,8 @@ message EntityBatchResult {
TaskFailureDetails failureDetails = 4;
string completionToken = 5;
repeated OperationInfo operationInfos = 6; // used only with DTS
+ // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided.
+ bool requiresState = 7;
}
message EntityRequest {
@@ -575,6 +645,7 @@ message OperationRequest {
string operation = 1;
string requestId = 2;
google.protobuf.StringValue input = 3;
+ TraceContext traceContext = 4;
}
message OperationResult {
@@ -591,10 +662,14 @@ message OperationInfo {
message OperationResultSuccess {
google.protobuf.StringValue result = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationResultFailure {
TaskFailureDetails failureDetails = 1;
+ google.protobuf.Timestamp startTimeUtc = 2;
+ google.protobuf.Timestamp endTimeUtc = 3;
}
message OperationAction {
@@ -610,6 +685,8 @@ message SendSignalAction {
string name = 2;
google.protobuf.StringValue input = 3;
google.protobuf.Timestamp scheduledTime = 4;
+ google.protobuf.Timestamp requestTime = 5;
+ TraceContext parentTraceContext = 6;
}
message StartNewOrchestrationAction {
@@ -618,6 +695,8 @@ message StartNewOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledTime = 5;
+ google.protobuf.Timestamp requestTime = 6;
+ TraceContext parentTraceContext = 7;
}
message AbandonActivityTaskRequest {
@@ -644,6 +723,17 @@ message AbandonEntityTaskResponse {
// Empty.
}
+message SkipGracefulOrchestrationTerminationsRequest {
+ InstanceBatch instanceBatch = 1;
+ google.protobuf.StringValue reason = 2;
+}
+
+message SkipGracefulOrchestrationTerminationsResponse {
+ // Those instances which could not be terminated because they had locked entities at the time of this termination call,
+ // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
+ repeated string unterminatedInstanceIds = 1;
+}
+
service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -657,18 +747,21 @@ service TaskHubSidecarService {
// Rewinds an orchestration instance to last known good state and replays from there.
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
+ // Restarts an orchestration instance.
+ rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
+
// Waits for an orchestration instance to reach a running or completion state.
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
-
+
// Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.).
rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse);
// Raises an event to a running orchestration instance.
rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse);
-
+
// Terminates a running orchestration instance.
rpc TerminateInstance(TerminateRequest) returns (TerminateResponse);
-
+
// Suspends a running orchestration instance.
rpc SuspendInstance(SuspendRequest) returns (SuspendResponse);
@@ -678,6 +771,9 @@ service TaskHubSidecarService {
// rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse);
rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse);
+
+ rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse);
+
rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse);
rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
@@ -714,6 +810,10 @@ service TaskHubSidecarService {
// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
+
+ // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
+ // Note that a maximum of 500 orchestrations can be terminated at a time using this method.
+ rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
}
message GetWorkItemsRequest {
@@ -722,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;
repeated WorkerCapability capabilities = 10;
+ WorkItemFilters workItemFilters = 11;
}
enum WorkerCapability {
@@ -732,6 +833,36 @@ enum WorkerCapability {
// When set, the service may return work items without any history events as an optimization.
// It is strongly recommended that all SDKs support this capability.
WORKER_CAPABILITY_HISTORY_STREAMING = 1;
+
+ // Indicates that the worker supports scheduled tasks.
+ // The service may send schedule-triggered orchestration work items,
+ // and the worker must handle them, including the scheduledTime field.
+ WORKER_CAPABILITY_SCHEDULED_TASKS = 2;
+
+ // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage).
+ // Work items may contain URI references instead of inline data, and the worker must fetch them.
+ // This avoids message size limits and reduces network overhead.
+ WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
+}
+
+message WorkItemFilters {
+ repeated OrchestrationFilter orchestrations = 1;
+ repeated ActivityFilter activities = 2;
+ repeated EntityFilter entities = 3;
+}
+
+message OrchestrationFilter {
+ string name = 1;
+ repeated string versions = 2;
+}
+
+message ActivityFilter {
+ string name = 1;
+ repeated string versions = 2;
+}
+
+message EntityFilter {
+ string name = 1;
}
message WorkItem {
@@ -750,7 +881,7 @@ message CompleteTaskResponse {
}
message HealthPing {
- // No payload
+ // No payload
}
message StreamInstanceHistoryRequest {
@@ -764,3 +895,8 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
}
+
+message InstanceBatch {
+ // A maximum of 500 instance IDs can be provided in this list.
+ repeated string instanceIds = 1;
+}