Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import io.grpc.Status;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.shared.InputStreamError;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* MapSupervisorActor actor is responsible for distributing the messages to actors and handling failure.
Expand Down Expand Up @@ -52,6 +51,8 @@ class MapSupervisorActor extends AbstractActor {
private final Mapper mapper;
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
private final CompletableFuture<Void> shutdownSignal;
private final AtomicBoolean streamClosed = new AtomicBoolean(false);
private boolean inputCompleted;
private int activeMapperCount;
private Exception userException;

Expand All @@ -62,6 +63,7 @@ public MapSupervisorActor(
this.mapper = mapper;
this.responseObserver = responseObserver;
this.shutdownSignal = failureFuture;
this.inputCompleted = false;
this.userException = null;
this.activeMapperCount = 0;
}
Expand All @@ -79,8 +81,7 @@ public void preRestart(Throwable reason, Optional<Object> message) {
.getSystem()
.log()
.warning("supervisor pre restart was executed due to: {}", reason.getMessage());
shutdownSignal.completeExceptionally(reason);
responseObserver.onError(Status.INTERNAL
sendError(Status.INTERNAL
.withDescription(reason.getMessage())
.withCause(reason)
.asException());
Expand All @@ -99,9 +100,10 @@ public Receive createReceive() {
.create()
.match(MapOuterClass.MapRequest.class, this::processRequest)
.match(MapOuterClass.MapResponse.class, this::sendResponse)
.match(InputStreamError.class, this::handleInputStreamError)
.match(Exception.class, this::handleFailure)
.match(AllDeadLetters.class, this::handleDeadLetters)
.match(String.class, eof -> responseObserver.onCompleted())
.match(String.class, eof -> handleInputCompleted())
.build();
}

Expand All @@ -113,25 +115,28 @@ private void handleFailure(Exception e) {
// one exception should trigger a container restart
// Build gRPC Status
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
sendError(StatusProto.toStatusRuntimeException(status));
}
activeMapperCount--;
finishIfDrained();
}

private void sendResponse(MapOuterClass.MapResponse mapResponse) {
responseObserver.onNext(mapResponse);
activeMapperCount--;
try {
if (!streamClosed.get()) {
responseObserver.onNext(mapResponse);
}
} catch (RuntimeException e) {
handleResponseObserverFailure(e);
} finally {
activeMapperCount--;
finishIfDrained();
}
}

private void processRequest(MapOuterClass.MapRequest mapRequest) {
if (userException != null) {
log.info("a previous mapper actor failed, not processing any more requests");
if (activeMapperCount == 0) {
log.info("there is no more active mapper AKKA actors - stopping the system");
getContext().getSystem().stop(getSelf());
log.info("AKKA system stopped");
shutdownSignal.completeExceptionally(userException);
}
return;
}

Expand All @@ -145,26 +150,83 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) {
activeMapperCount++;
}

private void handleInputStreamError(InputStreamError error) {
log.error("inbound request stream error, stopping mapper supervisor", error.getCause());
streamClosed.set(true);
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(error.getCause());
}

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.error("got a dead letter, stopping the execution");
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
sendError(Status.INTERNAL.withDescription("dead letters").asException());
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
}

private void handleInputCompleted() {
inputCompleted = true;
finishIfDrained();
}

// EOF and failures can arrive while child actors are still processing.
// Only finish the stream once all started child actors have replied or failed.
private void finishIfDrained() {
if (activeMapperCount != 0) {
return;
}
if (userException != null) {
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(userException);
return;
}
if (inputCompleted) {
completeResponse();
}
}

private void completeResponse() {
if (streamClosed.compareAndSet(false, true)) {
try {
responseObserver.onCompleted();
} catch (RuntimeException e) {
handleResponseObserverFailure(e);
} finally {
getContext().getSystem().stop(getSelf());
}
}
}

private void sendError(Throwable throwable) {
if (streamClosed.compareAndSet(false, true)) {
try {
responseObserver.onError(throwable);
} catch (RuntimeException e) {
handleResponseObserverFailure(e);
}
}
}

private void handleResponseObserverFailure(RuntimeException e) {
log.warn("response stream is already closed; stopping mapper supervisor", e);
streamClosed.set(true);
getContext().getSystem().stop(getSelf());
shutdownSignal.completeExceptionally(e);
}

@Override
public SupervisorStrategy supervisorStrategy() {
// we want to stop all child actors in case of any exception
return new AllForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> {
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
sendError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
shutdownSignal.completeExceptionally(e);
return SupervisorStrategy.stop();
})
.build()
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class Server {
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private final GrpcServerWrapper server;
private final boolean exitOnFailure;

/**
* constructor to create gRPC server.
Expand All @@ -43,6 +44,7 @@ public Server(Mapper mapper, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
this.grpcConfig = grpcConfig;
this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapper, this.shutdownSignal));
this.exitOnFailure = true;
}

@VisibleForTesting
Expand All @@ -53,6 +55,7 @@ protected Server(GRPCConfig grpcConfig, Mapper service, ServerInterceptor interc
interceptor,
serverName,
new Service(service, this.shutdownSignal));
this.exitOnFailure = false;
}

/**
Expand Down Expand Up @@ -102,7 +105,9 @@ public void start() throws Exception {
this.stop();
// FIXME - this is a workaround to immediately terminate the JVM process
// The correct way to do this is to stop all the actors and wait for them to terminate
System.exit(0);
if (exitOnFailure) {
System.exit(0);
}
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.InputStreamError;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -62,7 +63,7 @@ public void onNext(MapOuterClass.MapRequest request) {

@Override
public void onError(Throwable throwable) {
mapSupervisorActor.tell(new Exception(throwable), ActorRef.noSender());
mapSupervisorActor.tell(new InputStreamError(throwable), ActorRef.noSender());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Constants {
public static final String DEFAULT_HOST = "localhost";
public static final String MAP_MODE_KEY = "MAP_MODE";
public static final String MAP_MODE = "stream-map";
public static final String EOF = "EOF";

// Private constructor to prevent instantiation
private Constants() {
Expand Down
Loading
Loading