Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.dapr.durabletask;

import com.google.protobuf.Empty;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
Expand All @@ -35,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -48,6 +50,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private static final int DEFAULT_PORT = 4001;
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
private static final long KEEPALIVE_INTERVAL_SECONDS = 30;

private final TaskOrchestrationFactories orchestrationFactories;

Expand All @@ -63,6 +66,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;
private volatile boolean isNormalShutdown = false;
private ScheduledExecutorService keepaliveScheduler;
private Thread workerThread;

DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
Expand Down Expand Up @@ -136,6 +140,7 @@ public void close() {
this.workerThread.interrupt();
}
this.isNormalShutdown = true;
this.stopKeepaliveLoop();
this.shutDownWorkerPool();
this.closeSideCarChannel();
}
Expand Down Expand Up @@ -175,33 +180,39 @@ public void startAndBlock() {
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest
.newBuilder().build();
Iterator<OrchestratorService.WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();

if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
logger.log(Level.FINEST,
String.format("Processing orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId()));

this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer));
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();

logger.log(Level.INFO,
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId(),
Context.current()));

this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));

} else {
logger.log(Level.WARNING,
"Received and dropped an unknown '{0}' work-item from the sidecar.",
requestType);
startKeepaliveLoop();
try {
while (workItemStream.hasNext()) {
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();

if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
logger.log(Level.FINEST,
String.format("Processing orchestrator request for instance: {0}",
orchestratorRequest.getInstanceId()));

this.workerPool.submit(
new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer));
} else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) {
OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest();

logger.log(Level.INFO,
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
activityRequest.getName(),
activityRequest.getOrchestrationInstance().getInstanceId(),
Context.current()));

this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));

} else {
logger.log(Level.WARNING,
"Received and dropped an unknown '{0}' work-item from the sidecar.",
requestType);
}
}
} finally {
stopKeepaliveLoop();
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
Expand Down Expand Up @@ -262,6 +273,46 @@ private void shutDownWorkerPool() {
}
}

/**
* Starts a background keepalive loop to keep the gRPC connection alive.
* This is an application-level keepalive to prevent AWS ALBs from
* killing idle HTTP/2 connections.
*/
private synchronized void startKeepaliveLoop() {
stopKeepaliveLoop();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "durabletask-keepalive");
t.setDaemon(true);
return t;
});
try {
scheduler.scheduleWithFixedDelay(() -> {
try {
this.sidecarClient
.withDeadlineAfter(5, TimeUnit.SECONDS)
.hello(Empty.getDefaultInstance());
} catch (StatusRuntimeException e) {
logger.log(Level.FINE, "keepalive failed", e);
}
}, KEEPALIVE_INTERVAL_SECONDS, KEEPALIVE_INTERVAL_SECONDS, TimeUnit.SECONDS);
} catch (RuntimeException e) {
scheduler.shutdownNow();
throw e;
}
this.keepaliveScheduler = scheduler;
}

/**
* Stops the background keepalive loop if one is running.
*/
private synchronized void stopKeepaliveLoop() {
ScheduledExecutorService scheduler = this.keepaliveScheduler;
this.keepaliveScheduler = null;
if (scheduler != null) {
scheduler.shutdownNow();
}
}

private String getSidecarAddress() {
return this.sidecarClient.getChannel().authority();
}
Expand Down
Loading