diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 9aad2d10..7e0c3ef5 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -156,8 +156,8 @@ jobs: - name: Archive test report uses: actions/upload-artifact@v4 with: - name: Integration test report - path: client/build/reports/tests/endToEndTest + name: E2E test report + path: endtoendtests/build/reports/tests/endToEndTest functions-sample-tests: @@ -195,5 +195,5 @@ jobs: - name: Archive test report uses: actions/upload-artifact@v4 with: - name: Integration test report - path: client/build/reports/tests/endToEndTest \ No newline at end of file + name: Functions Sample test report + path: samples-azure-functions/build/reports/tests/sampleTest \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e17f35f..9751d3ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## Unreleased +* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)). Note: orchestration processing for rewind is supported with Azure Functions but not with the standalone `GrpcDurableTaskWorker`. + ## v1.7.0 * Add descriptive error when orchestration type is not registered ([#261](https://github.com/microsoft/durabletask-java/pull/261)) * Update all dependencies to latest versions ([#260](https://github.com/microsoft/durabletask-java/pull/260)) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java index f78c00f9..dace60b7 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/HttpManagementPayload.java @@ -13,6 +13,7 @@ public class HttpManagementPayload { private final String id; private final String purgeHistoryDeleteUri; private final String restartPostUri; + private final String rewindPostUri; private final String sendEventPostUri; private final String statusQueryGetUri; private final String terminatePostUri; @@ -33,6 +34,7 @@ public HttpManagementPayload( this.id = instanceId; this.purgeHistoryDeleteUri = instanceStatusURL + "?" + requiredQueryStringParameters; this.restartPostUri = instanceStatusURL + "/restart?" + requiredQueryStringParameters; + this.rewindPostUri = instanceStatusURL + "/rewind?reason={text}&" + requiredQueryStringParameters; this.sendEventPostUri = instanceStatusURL + "/raiseEvent/{eventName}?" + requiredQueryStringParameters; this.statusQueryGetUri = instanceStatusURL + "?" + requiredQueryStringParameters; this.terminatePostUri = instanceStatusURL + "/terminate?reason={text}&" + requiredQueryStringParameters; @@ -94,4 +96,13 @@ public String getRestartPostUri() { return restartPostUri; } + /** + * Gets the HTTP POST instance rewind endpoint. + * + * @return The HTTP URL for posting instance rewind commands. + */ + public String getRewindPostUri() { + return rewindPostUri; + } + } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java index 4590277f..1e1b3cb0 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java @@ -292,6 +292,41 @@ public abstract OrchestrationMetadata waitForInstanceCompletion( */ public abstract String restartInstance(String instanceId, boolean restartWithNewInstanceId); + /** + * Rewinds a failed orchestration instance to the last known good state and replays from there. + *

+ * This method can only be used on orchestration instances that are in a Failed state. + * When rewound, the orchestration instance will restart from the point of failure as if the failure + * never occurred. It rewinds the orchestration by replaying any + * Failed Activities and Failed suborchestrations that themselves have Failed Activities + *

+ * Note: Rewind requires a backend that supports it. When using Azure Functions with the + * Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker} + * does not currently support orchestration processing for rewind. + * + * @param instanceId the ID of the orchestration instance to rewind + */ + public void rewindInstance(String instanceId) { + this.rewindInstance(instanceId, null); + } + + /** + * Rewinds a failed orchestration instance to the last known good state and replays from there. + *

+ * This method can only be used on orchestration instances that are in a Failed state. + * When rewound, the orchestration instance will restart from the point of failure as if the failure + * never occurred. It rewinds the orchestration by replaying any + * Failed Activities and Failed suborchestrations that themselves have Failed Activities + *

+ * Note: Rewind requires a backend that supports it. When using Azure Functions with the + * Durable Task extension, rewind is fully supported. The standalone {@code GrpcDurableTaskWorker} + * does not currently support orchestration processing for rewind. + * + * @param instanceId the ID of the orchestration instance to rewind + * @param reason the reason for rewinding the orchestration instance + */ + public abstract void rewindInstance(String instanceId, @Nullable String reason); + /** * Suspends a running orchestration instance. * @param instanceId the ID of the orchestration instance to suspend diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 2ebb6ec8..f87388d9 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -363,6 +363,17 @@ public void resumeInstance(String instanceId, @Nullable String reason) { this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } + @Override + public void rewindInstance(String instanceId, @Nullable String reason) { + Helpers.throwIfArgumentNull(instanceId, "instanceId"); + RewindInstanceRequest.Builder rewindRequestBuilder = RewindInstanceRequest.newBuilder(); + rewindRequestBuilder.setInstanceId(instanceId); + if (reason != null) { + rewindRequestBuilder.setReason(StringValue.of(reason)); + } + this.sidecarClient.rewindInstance(rewindRequestBuilder.build()); + } + @Override public String restartInstance(String instanceId, boolean restartWithNewInstanceId) { OrchestrationMetadata metadata = this.getInstanceMetadata(instanceId, true); diff --git a/endtoendtests/src/main/java/com/functions/RewindTest.java b/endtoendtests/src/main/java/com/functions/RewindTest.java new file mode 100644 index 00000000..71844027 --- /dev/null +++ b/endtoendtests/src/main/java/com/functions/RewindTest.java @@ -0,0 +1,89 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample functions to test the rewind functionality. + * Rewind allows a failed orchestration to be replayed from its last known good state. + */ +public class RewindTest { + + // Flag to control whether the activity should fail (first call fails, subsequent calls succeed) + private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + + /** + * HTTP trigger to start the rewindable orchestration. + */ + @FunctionName("StartRewindableOrchestration") + public HttpResponseMessage startRewindableOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting rewindable orchestration."); + + // Reset the failure flag so the first activity call will fail + shouldFail.set(true); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Orchestration that calls an activity which will fail on the first attempt. + * After rewinding, the orchestration will replay and the activity will succeed. + */ + @FunctionName("RewindableOrchestration") + public String rewindableOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // Call the activity that may fail + String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await(); + return result; + } + + /** + * Activity that fails on the first call but succeeds on subsequent calls. + * This simulates a transient failure that can be recovered by rewinding. + */ + @FunctionName("FailOnceActivity") + public String failOnceActivity( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + if (shouldFail.compareAndSet(true, false)) { + context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input); + throw new RuntimeException("Simulated transient failure - rewind to retry"); + } + context.getLogger().info("FailOnceActivity: Success for input: " + input); + return input + "-rewound-success"; + } + + /** + * HTTP trigger to reset the failure flag (useful for testing). + */ + @FunctionName("ResetRewindFailureFlag") + public HttpResponseMessage resetRewindFailureFlag( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + final ExecutionContext context) { + shouldFail.set(true); + context.getLogger().info("Reset failure flag to true."); + return request.createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) + .body("Failure flag reset to true") + .build(); + } +} diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index c2d0be02..53a0e454 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -4,6 +4,7 @@ import io.restassured.http.ContentType; import io.restassured.path.json.JsonPath; import io.restassured.response.Response; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -26,6 +27,23 @@ @Tag("e2e") public class EndToEndTests { + @BeforeAll + public static void setup() { + RestAssured.baseURI = "http://localhost"; + // Use port 8080 for Docker, 7071 for local func start + String port = System.getenv("FUNCTIONS_PORT"); + if (port != null) { + try { + RestAssured.port = Integer.parseInt(port); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "FUNCTIONS_PORT environment variable must be a valid integer, but was: '" + port + "'", e); + } + } else { + RestAssured.port = 8080; + } + } + @Order(1) @Test public void setupHost() { @@ -216,6 +234,39 @@ public void suspendResume() throws InterruptedException { assertTrue(completed); } + @Test + public void rewindFailedOrchestration() throws InterruptedException { + // Reset the failure flag before starting + post("/api/ResetRewindFailureFlag"); + + // Start the orchestration - it will fail on the first activity call + String startOrchestrationPath = "/api/StartRewindableOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + // Wait for the orchestration to fail + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(failed, "Orchestration should have failed"); + + // Get the rewind URI and rewind the orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); + Response rewindResponse = post(rewindPostUri); + assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); + + // Wait for the orchestration to complete after rewind + Set continueStates = new HashSet<>(); + continueStates.add("Running"); + boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); + assertTrue(completed, "Orchestration should complete after rewind"); + + // Verify the output contains the expected result + Response statusResponse = get(statusQueryGetUri); + String output = statusResponse.jsonPath().get("output"); + assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); + } + @Test public void externalEventDeserializeFail() throws InterruptedException { String startOrchestrationPath = "api/ExternalEventHttp"; diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index 3d3f9e98..fdb90d6a 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -fbe5bb20835678099fc51a44993ed9b045dee5a6 \ No newline at end of file +026329c53fe6363985655857b9ca848ec7238bd2 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 88928c3b..8ef46a4a 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -41,6 +41,7 @@ message TaskFailureDetails { google.protobuf.StringValue stackTrace = 3; TaskFailureDetails innerFailure = 4; bool isNonRetriable = 5; + map properties = 6; } enum OrchestrationStatus { @@ -95,6 +96,7 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + map tags = 5; } message TaskCompletedEvent { @@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; TraceContext parentTraceContext = 5; + map tags = 6; } message SubOrchestrationInstanceCompletedEvent { @@ -192,7 +195,7 @@ message EntityOperationCalledEvent { } message EntityLockRequestedEvent { - string criticalSectionId = 1; + string criticalSectionId = 1; repeated string lockSet = 2; int32 position = 3; google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories @@ -217,7 +220,19 @@ message EntityUnlockSentEvent { message EntityLockGrantedEvent { string criticalSectionId = 1; } - + +message ExecutionRewoundEvent { + google.protobuf.StringValue reason = 1; + google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise + TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue name = 5; // used by DTS backend only + google.protobuf.StringValue version = 6; // used by DTS backend only + google.protobuf.StringValue input = 7; // used by DTS backend only + ParentInstanceInfo parentInstance = 8; // used by DTS backend only + map tags = 9; // used by DTS backend only +} + message HistoryEvent { int32 eventId = 1; google.protobuf.Timestamp timestamp = 2; @@ -244,11 +259,12 @@ message HistoryEvent { ExecutionResumedEvent executionResumed = 22; EntityOperationSignaledEvent entityOperationSignaled = 23; EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; EntityLockRequestedEvent entityLockRequested = 27; EntityLockGrantedEvent entityLockGranted = 28; EntityUnlockSentEvent entityUnlockSent = 29; + ExecutionRewoundEvent executionRewound = 30; } } @@ -256,6 +272,8 @@ message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + map tags = 4; + TraceContext parentTraceContext = 5; } message CreateSubOrchestrationAction { @@ -263,6 +281,8 @@ message CreateSubOrchestrationAction { string name = 2; google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; + map tags = 6; } message CreateTimerAction { @@ -282,6 +302,7 @@ message CompleteOrchestrationAction { google.protobuf.StringValue newVersion = 4; repeated HistoryEvent carryoverEvents = 5; TaskFailureDetails failureDetails = 6; + map tags = 7; } message TerminateOrchestrationAction { @@ -312,6 +333,11 @@ message OrchestratorAction { } } +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + message OrchestratorRequest { string instanceId = 1; google.protobuf.StringValue executionId = 2; @@ -320,6 +346,8 @@ message OrchestratorRequest { OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; } message OrchestratorResponse { @@ -331,6 +359,17 @@ message OrchestratorResponse { // The number of work item events that were processed by the orchestrator. // This field is optional. If not set, the service should assume that the orchestrator processed all events. google.protobuf.Int32Value numEventsProcessed = 5; + OrchestrationTraceContext orchestrationTraceContext = 6; + + // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. + bool requiresHistory = 7; + + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). + bool isPartial = 8; + + // Zero-based position of the current chunk within a chunked completion sequence. + // This field is omitted for non-chunked completions. + google.protobuf.Int32Value chunkIndex = 9; } message CreateInstanceRequest { @@ -343,6 +382,7 @@ message CreateInstanceRequest { google.protobuf.StringValue executionId = 7; map tags = 8; TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; } message OrchestrationIdReusePolicy { @@ -449,12 +489,28 @@ message QueryInstancesResponse { google.protobuf.StringValue continuationToken = 2; } +message ListInstanceIdsRequest { + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp completedTimeFrom = 2; + google.protobuf.Timestamp completedTimeTo = 3; + int32 pageSize = 4; + google.protobuf.StringValue lastInstanceKey = 5; +} + +message ListInstanceIdsResponse { + repeated string instanceIds = 1; + google.protobuf.StringValue lastInstanceKey = 2; +} + message PurgeInstancesRequest { oneof request { string instanceId = 1; PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; } bool recursive = 3; + // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity) + bool isOrchestration = 5; } message PurgeInstanceFilter { @@ -468,6 +524,15 @@ message PurgeInstancesResponse { google.protobuf.BoolValue isComplete = 2; } +message RestartInstanceRequest { + string instanceId = 1; + bool restartWithNewInstanceId = 2; +} + +message RestartInstanceResponse { + string instanceId = 1; +} + message CreateTaskHubRequest { bool recreateIfExists = 1; } @@ -490,10 +555,12 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; } message SignalEntityResponse { - // no payload + // no payload } message GetEntityRequest { @@ -553,6 +620,7 @@ message EntityBatchRequest { string instanceId = 1; google.protobuf.StringValue entityState = 2; repeated OperationRequest operations = 3; + map properties = 4; } message EntityBatchResult { @@ -562,6 +630,8 @@ message EntityBatchResult { TaskFailureDetails failureDetails = 4; string completionToken = 5; repeated OperationInfo operationInfos = 6; // used only with DTS + // Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided. + bool requiresState = 7; } message EntityRequest { @@ -575,6 +645,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; } message OperationResult { @@ -591,10 +662,14 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationAction { @@ -610,6 +685,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -618,6 +695,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } message AbandonActivityTaskRequest { @@ -644,6 +723,17 @@ message AbandonEntityTaskResponse { // Empty. } +message SkipGracefulOrchestrationTerminationsRequest { + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; +} + +message SkipGracefulOrchestrationTerminationsResponse { + // Those instances which could not be terminated because they had locked entities at the time of this termination call, + // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) + repeated string unterminatedInstanceIds = 1; +} + service TaskHubSidecarService { // Sends a hello request to the sidecar service. rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty); @@ -657,18 +747,21 @@ service TaskHubSidecarService { // Rewinds an orchestration instance to last known good state and replays from there. rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse); + // Restarts an orchestration instance. + rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse); + // Waits for an orchestration instance to reach a running or completion state. rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); // Raises an event to a running orchestration instance. rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - + // Terminates a running orchestration instance. rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - + // Suspends a running orchestration instance. rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); @@ -678,6 +771,9 @@ service TaskHubSidecarService { // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + + rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); @@ -714,6 +810,10 @@ service TaskHubSidecarService { // Abandon an entity work item rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse); + + // "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated". + // Note that a maximum of 500 orchestrations can be terminated at a time using this method. + rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse); } message GetWorkItemsRequest { @@ -732,6 +832,16 @@ enum WorkerCapability { // When set, the service may return work items without any history events as an optimization. // It is strongly recommended that all SDKs support this capability. WORKER_CAPABILITY_HISTORY_STREAMING = 1; + + // Indicates that the worker supports scheduled tasks. + // The service may send schedule-triggered orchestration work items, + // and the worker must handle them, including the scheduledTime field. + WORKER_CAPABILITY_SCHEDULED_TASKS = 2; + + // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage). + // Work items may contain URI references instead of inline data, and the worker must fetch them. + // This avoids message size limits and reduces network overhead. + WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } message WorkItem { @@ -750,7 +860,7 @@ message CompleteTaskResponse { } message HealthPing { - // No payload + // No payload } message StreamInstanceHistoryRequest { @@ -764,3 +874,8 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; } + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; +} diff --git a/samples-azure-functions/src/main/java/com/functions/RewindTest.java b/samples-azure-functions/src/main/java/com/functions/RewindTest.java new file mode 100644 index 00000000..8c998fa4 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/RewindTest.java @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpStatus; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample functions to test the rewind functionality. + * Rewind allows a failed orchestration to be replayed from its last known good state. + */ +public class RewindTest { + + // Flag to control whether the activity should fail (first call fails, subsequent calls succeed) + private static final AtomicBoolean shouldFail = new AtomicBoolean(true); + + /** + * HTTP trigger to start the rewindable orchestration. + */ + @FunctionName("StartRewindableOrchestration") + public HttpResponseMessage startRewindableOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting rewindable orchestration."); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("RewindableOrchestration"); + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Orchestration that calls an activity which will fail on the first attempt. + * After rewinding, the orchestration will replay and the activity will succeed. + */ + @FunctionName("RewindableOrchestration") + public String rewindableOrchestration( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + // Call the activity that may fail + String result = ctx.callActivity("FailOnceActivity", "RewindTest", String.class).await(); + return result; + } + + /** + * Activity that fails on the first call but succeeds on subsequent calls. + * This simulates a transient failure that can be recovered by rewinding. + */ + @FunctionName("FailOnceActivity") + public String failOnceActivity( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + if (shouldFail.compareAndSet(true, false)) { + context.getLogger().warning("FailOnceActivity: Simulating failure for input: " + input); + throw new RuntimeException("Simulated transient failure - rewind to retry"); + } + context.getLogger().info("FailOnceActivity: Success for input: " + input); + return input + "-rewound-success"; + } + + /** + * HTTP trigger to reset the failure flag (useful for testing). + */ + @FunctionName("ResetRewindFailureFlag") + public HttpResponseMessage resetRewindFailureFlag( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + final ExecutionContext context) { + shouldFail.set(true); + context.getLogger().info("Reset failure flag to true."); + return request.createResponseBuilder(HttpStatus.OK) + .body("Failure flag reset to true") + .build(); + } +} diff --git a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java index a756b5db..371fc9f9 100644 --- a/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java +++ b/samples-azure-functions/src/test/java/com/functions/EndToEndTests.java @@ -229,6 +229,40 @@ public void orchestrationPOJO() throws InterruptedException { assertEquals("\"TESTNAME\"", outputName); } + @Test + public void rewindFailedOrchestration() throws InterruptedException { + // Reset the failure flag before starting + post("/api/ResetRewindFailureFlag"); + + // Start the orchestration - it will fail on the first activity call + String startOrchestrationPath = "/api/StartRewindableOrchestration"; + Response response = post(startOrchestrationPath); + JsonPath jsonPath = response.jsonPath(); + String statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + + // Wait for the orchestration to fail + boolean failed = pollingCheck(statusQueryGetUri, "Failed", null, Duration.ofSeconds(10)); + assertTrue(failed, "Orchestration should have failed"); + + // Get the rewind URI and rewind the orchestration + String rewindPostUri = jsonPath.get("rewindPostUri"); + rewindPostUri = rewindPostUri.replace("{text}", "Testing rewind functionality"); + Response rewindResponse = post(rewindPostUri); + assertEquals(202, rewindResponse.getStatusCode(), "Rewind should return 202 Accepted"); + + // Wait for the orchestration to complete after rewind + Set continueStates = new HashSet<>(); + continueStates.add("Pending"); + continueStates.add("Running"); + boolean completed = pollingCheck(statusQueryGetUri, "Completed", continueStates, Duration.ofSeconds(15)); + assertTrue(completed, "Orchestration should complete after rewind"); + + // Verify the output contains the expected result + Response statusResponse = get(statusQueryGetUri); + String output = statusResponse.jsonPath().get("output"); + assertTrue(output.contains("rewound-success"), "Output should indicate successful rewind: " + output); + } + private boolean pollingCheck(String statusQueryGetUri, String expectedState, Set continueStates,