diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 1e08d0804..73a09dd24 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -13,6 +13,7 @@ package io.dapr.durabletask; +import com.google.protobuf.Empty; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -48,6 +50,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private static final int DEFAULT_PORT = 4001; private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName()); private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3); + private static final long KEEPALIVE_INTERVAL_SECONDS = 30; private final TaskOrchestrationFactories orchestrationFactories; @@ -63,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; private volatile boolean isNormalShutdown = false; + private ScheduledExecutorService keepaliveScheduler; private Thread workerThread; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { @@ -136,6 +140,7 @@ public void close() { this.workerThread.interrupt(); } this.isNormalShutdown = true; + this.stopKeepaliveLoop(); this.shutDownWorkerPool(); this.closeSideCarChannel(); } @@ -175,33 +180,39 @@ public void startAndBlock() { OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest .newBuilder().build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); - while (workItemStream.hasNext()) { - OrchestratorService.WorkItem workItem = workItemStream.next(); - OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); - - if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { - OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - logger.log(Level.FINEST, - String.format("Processing orchestrator request for instance: {0}", - orchestratorRequest.getInstanceId())); - - this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); - } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { - OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); - - logger.log(Level.INFO, - String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", - activityRequest.getName(), - activityRequest.getOrchestrationInstance().getInstanceId(), - Context.current())); - - this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); - - } else { - logger.log(Level.WARNING, - "Received and dropped an unknown '{0}' work-item from the sidecar.", - requestType); + startKeepaliveLoop(); + try { + while (workItemStream.hasNext()) { + OrchestratorService.WorkItem workItem = workItemStream.next(); + OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); + + if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { + OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + logger.log(Level.FINEST, + String.format("Processing orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId())); + + this.workerPool.submit( + new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); + } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { + OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); + + logger.log(Level.INFO, + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", + activityRequest.getName(), + activityRequest.getOrchestrationInstance().getInstanceId(), + Context.current())); + + this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); + + } else { + logger.log(Level.WARNING, + "Received and dropped an unknown '{0}' work-item from the sidecar.", + requestType); + } } + } finally { + stopKeepaliveLoop(); } } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { @@ -262,6 +273,46 @@ private void shutDownWorkerPool() { } } + /** + * Starts a background keepalive loop to keep the gRPC connection alive. + * This is an application-level keepalive to prevent AWS ALBs from + * killing idle HTTP/2 connections. + */ + private synchronized void startKeepaliveLoop() { + stopKeepaliveLoop(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "durabletask-keepalive"); + t.setDaemon(true); + return t; + }); + try { + scheduler.scheduleWithFixedDelay(() -> { + try { + this.sidecarClient + .withDeadlineAfter(5, TimeUnit.SECONDS) + .hello(Empty.getDefaultInstance()); + } catch (StatusRuntimeException e) { + logger.log(Level.FINE, "keepalive failed", e); + } + }, KEEPALIVE_INTERVAL_SECONDS, KEEPALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS); + } catch (RuntimeException e) { + scheduler.shutdownNow(); + throw e; + } + this.keepaliveScheduler = scheduler; + } + + /** + * Stops the background keepalive loop if one is running. + */ + private synchronized void stopKeepaliveLoop() { + ScheduledExecutorService scheduler = this.keepaliveScheduler; + this.keepaliveScheduler = null; + if (scheduler != null) { + scheduler.shutdownNow(); + } + } + private String getSidecarAddress() { return this.sidecarClient.getChannel().authority(); }