From 748cce38aee97506f816a89d6a8bb783b1f88be1 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 10 Mar 2026 13:15:49 +0100 Subject: [PATCH] chore: Add test to verify workflow reconnection logic Signed-off-by: Javier Aliaga --- .../DurableTaskGrpcWorkerReconnectTest.java | 205 ++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerReconnectTest.java diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerReconnectTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerReconnectTest.java new file mode 100644 index 000000000..4042f9ad4 --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerReconnectTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.durabletask; + +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests that DurableTaskGrpcWorker auto-heals (reconnects) when the gRPC + * connection to the sidecar drops with UNAVAILABLE or CANCELLED status. + * + * @see Issue #1652 + */ +class DurableTaskGrpcWorkerReconnectTest { + + private DurableTaskGrpcWorker worker; + private Server server; + private ManagedChannel channel; + + @AfterEach + void tearDown() throws Exception { + if (worker != null) { + worker.close(); + } + if (channel != null) { + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + if (server != null) { + server.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + + @Test + void workerReconnectsAfterUnavailableError() throws Exception { + int requiredCalls = 3; + CountDownLatch latch = new CountDownLatch(requiredCalls); + AtomicInteger callCount = new AtomicInteger(0); + + String serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems( + OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + callCount.incrementAndGet(); + latch.countDown(); + // Simulate sidecar being unavailable + responseObserver.onError(Status.UNAVAILABLE + .withDescription("Sidecar is unavailable") + .asRuntimeException()); + } + }) + .build() + .start(); + + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + worker = new DurableTaskGrpcWorkerBuilder() + .grpcChannel(channel) + .build(); + worker.start(); + + // The worker should retry multiple times after UNAVAILABLE errors. + // With a 5-second retry delay, we wait long enough for at least 3 attempts. + boolean reached = latch.await(30, TimeUnit.SECONDS); + assertTrue(reached, + "Expected at least " + requiredCalls + " getWorkItems calls (reconnect attempts), but got " + callCount.get()); + } + + @Test + void workerReconnectsAfterCancelledError() throws Exception { + int requiredCalls = 2; + CountDownLatch latch = new CountDownLatch(requiredCalls); + AtomicInteger callCount = new AtomicInteger(0); + + String serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems( + OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + callCount.incrementAndGet(); + latch.countDown(); + // Simulate connection cancelled (e.g., sidecar restart) + responseObserver.onError(Status.CANCELLED + .withDescription("Connection cancelled") + .asRuntimeException()); + } + }) + .build() + .start(); + + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + worker = new DurableTaskGrpcWorkerBuilder() + .grpcChannel(channel) + .build(); + worker.start(); + + boolean reached = latch.await(30, TimeUnit.SECONDS); + assertTrue(reached, + "Expected at least " + requiredCalls + " getWorkItems calls after CANCELLED, but got " + callCount.get()); + } + + @Test + void workerReconnectsAfterStreamEndsNormally() throws Exception { + int requiredCalls = 2; + CountDownLatch latch = new CountDownLatch(requiredCalls); + AtomicInteger callCount = new AtomicInteger(0); + + String serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems( + OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + callCount.incrementAndGet(); + latch.countDown(); + // Simulate stream ending normally (server completes without sending items) + responseObserver.onCompleted(); + } + }) + .build() + .start(); + + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + worker = new DurableTaskGrpcWorkerBuilder() + .grpcChannel(channel) + .build(); + worker.start(); + + // When the stream ends normally, the outer while(true) loop should + // re-establish the stream immediately (no 5s delay for normal completion). + boolean reached = latch.await(10, TimeUnit.SECONDS); + assertTrue(reached, + "Expected at least " + requiredCalls + " getWorkItems calls after normal stream end, but got " + callCount.get()); + } + + @Test + void workerStopsCleanlyOnClose() throws Exception { + CountDownLatch firstCallLatch = new CountDownLatch(1); + + String serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(new TaskHubSidecarServiceGrpc.TaskHubSidecarServiceImplBase() { + @Override + public void getWorkItems( + OrchestratorService.GetWorkItemsRequest request, + StreamObserver responseObserver) { + firstCallLatch.countDown(); + // Keep stream open (simulate connected state) + // The worker should be interrupted by close() + } + }) + .build() + .start(); + + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + worker = new DurableTaskGrpcWorkerBuilder() + .grpcChannel(channel) + .build(); + worker.start(); + + // Wait for the worker to connect + assertTrue(firstCallLatch.await(10, TimeUnit.SECONDS), "Worker should have connected"); + + // Close should stop the worker cleanly without hanging + worker.close(); + worker = null; // prevent double-close in tearDown + } +}