From bfdb4a47147333ea7e7ceeca48c9deb81d4fbd20 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 11 Mar 2026 17:50:20 +0000 Subject: [PATCH 1/2] feat: add application-level keepalive to prevent ALB idle connection timeouts AWS ALBs do not forward HTTP/2 PING frames, causing idle gRPC connections to be closed. This adds a background loop that periodically calls the existing Hello RPC as application-level traffic to keep the connection alive through L7 load balancers. Signed-off-by: joshvanl --- .../durabletask/DurableTaskGrpcWorker.java | 102 +++++++++++++----- 1 file changed, 76 insertions(+), 26 deletions(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 1e08d0804..12bb41e56 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -13,6 +13,7 @@ package io.dapr.durabletask; +import com.google.protobuf.Empty; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -48,6 +50,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private static final int DEFAULT_PORT = 4001; private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); + private static final long KEEPALIVE_INTERVAL_SECONDS = 30; private final TaskOrchestrationFactories orchestrationFactories; @@ -63,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; + private ScheduledExecutorService keepaliveScheduler; private Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { @@ -136,6 +140,7 @@ public void close() { this.workerThread.interrupt(); } this.isNormalShutdown = true; + this.stopKeepaliveLoop(); this.shutDownWorkerPool(); this.closeSideCarChannel(); } @@ -175,33 +180,38 @@ public void startAndBlock() { OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest .newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); - while (workItemStream.hasNext()) { - OrchestratorService.WorkItem workItem = workItemStream.next(); - OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); - - if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { - OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - logger.log(Level.FINEST, - String.format("Processing orchestrator request for instance: {0}", - orchestratorRequest.getInstanceId())); - - this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); - } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { - OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); - - logger.log(Level.INFO, - String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", - activityRequest.getName(), - activityRequest.getOrchestrationInstance().getInstanceId(), - Context.current())); - - this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); - - } else { - logger.log(Level.WARNING, - "Received and dropped an unknown '{0}' work-item from the sidecar.", - requestType); + startKeepaliveLoop(); + try { + while (workItemStream.hasNext()) { + OrchestratorService.WorkItem workItem = workItemStream.next(); + OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); + + if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { + OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + logger.log(Level.FINEST, + String.format("Processing orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId())); + + this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); + } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { + OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); + + logger.log(Level.INFO, + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", + activityRequest.getName(), + activityRequest.getOrchestrationInstance().getInstanceId(), + Context.current())); + + this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); + + } else { + logger.log(Level.WARNING, + "Received and dropped an unknown '{0}' work-item from the sidecar.", + requestType); + } } + } finally { + stopKeepaliveLoop(); } } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { @@ -262,6 +272,46 @@ private void shutDownWorkerPool() { } } + /** + * Starts a background keepalive loop to keep the gRPC connection alive. + * This is an application-level keepalive to prevent AWS ALBs from + * killing idle HTTP/2 connections. + */ + private synchronized void startKeepaliveLoop() { + stopKeepaliveLoop(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "durabletask-keepalive"); + t.setDaemon(true); + return t; + }); + try { + scheduler.scheduleWithFixedDelay(() -> { + try { + this.sidecarClient + .withDeadlineAfter(5, TimeUnit.SECONDS) + .hello(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + logger.log(Level.FINE, "keepalive failed", e); + } + }, KEEPALIVE_INTERVAL_SECONDS, KEEPALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS); + } catch (RuntimeException e) { + scheduler.shutdownNow(); + throw e; + } + this.keepaliveScheduler = scheduler; + } + + /** + * Stops the background keepalive loop if one is running. + */ + private synchronized void stopKeepaliveLoop() { + ScheduledExecutorService scheduler = this.keepaliveScheduler; + this.keepaliveScheduler = null; + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + private String getSidecarAddress() { return this.sidecarClient.getChannel().authority(); } From 44b4e4c2d9ad812fcdc3af914cd25605252d2f2b Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 11 Mar 2026 18:40:43 +0000 Subject: [PATCH 2/2] Fix Signed-off-by: joshvanl --- .../main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 12bb41e56..73a09dd24 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -192,7 +192,8 @@ public void startAndBlock() { String.format("Processing orchestrator request for instance: {0}", orchestratorRequest.getInstanceId())); - this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); + this.workerPool.submit( + new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();