From b986c19c7ac0ce4e9c4c9079006db60f5de8fdc3 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Tue, 5 May 2026 17:00:37 +0530 Subject: [PATCH 1/3] fix: handle cancelled gRPC streams in supervisors Signed-off-by: adarsh0728 --- .../numaflow/mapper/MapSupervisorActor.java | 98 +++++++++++++---- .../io/numaproj/numaflow/mapper/Service.java | 3 +- .../numaflow/mapstreamer/Constants.java | 1 + .../mapstreamer/MapStreamSupervisorActor.java | 100 +++++++++++++++--- .../numaflow/mapstreamer/Service.java | 5 +- .../numaflow/shared/InputStreamError.java | 16 +++ .../numaflow/sourcetransformer/Service.java | 3 +- .../TransformSupervisorActor.java | 99 +++++++++++++---- 8 files changed, 268 insertions(+), 57 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/shared/InputStreamError.java diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index 05b4e312..12100c82 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -9,17 +9,16 @@ import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; import io.grpc.Status; -import com.google.protobuf.Any; -import com.google.rpc.Code; -import com.google.rpc.DebugInfo; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapOuterClass; import io.numaproj.numaflow.shared.ExceptionUtils; +import io.numaproj.numaflow.shared.InputStreamError; import lombok.extern.slf4j.Slf4j; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * MapSupervisorActor actor is responsible for distributing the messages to actors and handling failure. @@ -52,6 +51,8 @@ class MapSupervisorActor extends AbstractActor { private final Mapper mapper; private final StreamObserver responseObserver; private final CompletableFuture shutdownSignal; + private final AtomicBoolean streamClosed = new AtomicBoolean(false); + private boolean inputCompleted; private int activeMapperCount; private Exception userException; @@ -62,6 +63,7 @@ public MapSupervisorActor( this.mapper = mapper; this.responseObserver = responseObserver; this.shutdownSignal = failureFuture; + this.inputCompleted = false; this.userException = null; this.activeMapperCount = 0; } @@ -79,8 +81,7 @@ public void preRestart(Throwable reason, Optional message) { .getSystem() .log() .warning("supervisor pre restart was executed due to: {}", reason.getMessage()); - shutdownSignal.completeExceptionally(reason); - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) .asException()); @@ -99,9 +100,10 @@ public Receive createReceive() { .create() .match(MapOuterClass.MapRequest.class, this::processRequest) .match(MapOuterClass.MapResponse.class, this::sendResponse) + .match(InputStreamError.class, this::handleInputStreamError) .match(Exception.class, this::handleFailure) .match(AllDeadLetters.class, this::handleDeadLetters) - .match(String.class, eof -> responseObserver.onCompleted()) + .match(String.class, eof -> handleInputCompleted()) .build(); } @@ -113,25 +115,28 @@ private void handleFailure(Exception e) { // one exception should trigger a container restart // Build gRPC Status com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); - responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + sendError(StatusProto.toStatusRuntimeException(status)); } activeMapperCount--; + finishIfDrained(); } private void sendResponse(MapOuterClass.MapResponse mapResponse) { - responseObserver.onNext(mapResponse); - activeMapperCount--; + try { + if (!streamClosed.get()) { + responseObserver.onNext(mapResponse); + } + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + activeMapperCount--; + finishIfDrained(); + } } private void processRequest(MapOuterClass.MapRequest mapRequest) { if (userException != null) { log.info("a previous mapper actor failed, not processing any more requests"); - if (activeMapperCount == 0) { - log.info("there is no more active mapper AKKA actors - stopping the system"); - getContext().getSystem().stop(getSelf()); - log.info("AKKA system stopped"); - shutdownSignal.completeExceptionally(userException); - } return; } @@ -145,26 +150,83 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) { activeMapperCount++; } + private void handleInputStreamError(InputStreamError error) { + log.error("inbound request stream error, stopping mapper supervisor", error.getCause()); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(error.getCause()); + } + // if we see dead letters, we need to stop the execution and exit // to make sure no messages are lost private void handleDeadLetters(AllDeadLetters deadLetter) { log.error("got a dead letter, stopping the execution"); - responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException()); + sendError(Status.INTERNAL.withDescription("dead letters").asException()); getContext().getSystem().stop(getSelf()); shutdownSignal.completeExceptionally(new Throwable("dead letters")); } + private void handleInputCompleted() { + inputCompleted = true; + finishIfDrained(); + } + + // EOF and failures can arrive while child actors are still processing. + // Only finish the stream once all started child actors have replied or failed. + private void finishIfDrained() { + if (activeMapperCount != 0) { + return; + } + if (userException != null) { + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(userException); + return; + } + if (inputCompleted) { + completeResponse(); + } + } + + private void completeResponse() { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onCompleted(); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + getContext().getSystem().stop(getSelf()); + } + } + } + + private void sendError(Throwable throwable) { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onError(throwable); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } + } + } + + private void handleResponseObserverFailure(RuntimeException e) { + log.warn("response stream is already closed; stopping mapper supervisor", e); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(e); + } + @Override public SupervisorStrategy supervisorStrategy() { // we want to stop all child actors in case of any exception return new AllForOneStrategy( DeciderBuilder .match(Exception.class, e -> { - shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); + shutdownSignal.completeExceptionally(e); return SupervisorStrategy.stop(); }) .build() diff --git a/src/main/java/io/numaproj/numaflow/mapper/Service.java b/src/main/java/io/numaproj/numaflow/mapper/Service.java index 8bfb8878..5e6c0604 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Service.java @@ -7,6 +7,7 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.InputStreamError; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -62,7 +63,7 @@ public void onNext(MapOuterClass.MapRequest request) { @Override public void onError(Throwable throwable) { - mapSupervisorActor.tell(new Exception(throwable), ActorRef.noSender()); + mapSupervisorActor.tell(new InputStreamError(throwable), ActorRef.noSender()); } @Override diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java index 4d92978f..781ed22e 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java @@ -8,6 +8,7 @@ class Constants { public static final String DEFAULT_HOST = "localhost"; public static final String MAP_MODE_KEY = "MAP_MODE"; public static final String MAP_MODE = "stream-map"; + public static final String EOF = "EOF"; // Private constructor to prevent instantiation private Constants() { diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java index 39409ddf..2b3942ac 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java @@ -12,10 +12,12 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapOuterClass; import io.numaproj.numaflow.shared.ExceptionUtils; +import io.numaproj.numaflow.shared.InputStreamError; import lombok.extern.slf4j.Slf4j; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * MapStreamSupervisorActor is responsible for managing MapStreamerActor instances and handling failures. @@ -51,6 +53,8 @@ class MapStreamSupervisorActor extends AbstractActor { private final MapStreamer mapStreamer; private final StreamObserver responseObserver; private final CompletableFuture shutdownSignal; + private final AtomicBoolean streamClosed = new AtomicBoolean(false); + private boolean inputCompleted; private int activeMapStreamersCount; private Exception userException; @@ -61,6 +65,7 @@ public MapStreamSupervisorActor( this.mapStreamer = mapStreamer; this.responseObserver = responseObserver; this.shutdownSignal = failureFuture; + this.inputCompleted = false; this.userException = null; this.activeMapStreamersCount = 0; } @@ -80,18 +85,26 @@ public void preRestart(Throwable reason, Optional message) { .getSystem() .log() .warning("supervisor pre restart due to: {}", reason.getMessage()); - shutdownSignal.completeExceptionally(reason); - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) .asException()); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(reason); + } + + private void handleInputStreamError(InputStreamError error) { + log.error("inbound request stream error, stopping map-stream supervisor", error.getCause()); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(error.getCause()); } // if we see dead letters, we need to stop the execution and exit // to make sure no messages are lost private void handleDeadLetters(AllDeadLetters deadLetter) { log.error("got a dead letter, stopping the execution"); - responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException()); + sendError(Status.INTERNAL.withDescription("dead letters").asException()); getContext().getSystem().stop(getSelf()); shutdownSignal.completeExceptionally(new Throwable("dead letters")); } @@ -106,8 +119,10 @@ public Receive createReceive() { return receiveBuilder() .match(MapOuterClass.MapRequest.class, this::processRequest) .match(MapOuterClass.MapResponse.class, this::sendResponse) + .match(InputStreamError.class, this::handleInputStreamError) .match(Exception.class, this::handleFailure) .match(AllDeadLetters.class, this::handleDeadLetters) + .match(String.class, eof -> handleInputCompleted()) .build(); } @@ -116,27 +131,28 @@ private void handleFailure(Exception e) { if (userException == null) { userException = e; com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); - responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + sendError(StatusProto.toStatusRuntimeException(status)); } activeMapStreamersCount--; + finishIfDrained(); } private void sendResponse(MapOuterClass.MapResponse mapResponse) { - responseObserver.onNext(mapResponse); - activeMapStreamersCount--; + try { + if (!streamClosed.get()) { + responseObserver.onNext(mapResponse); + } + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + activeMapStreamersCount--; + finishIfDrained(); + } } private void processRequest(MapOuterClass.MapRequest mapRequest) { if (userException != null) { - getContext() - .getSystem() - .log() - .info("Previous mapStreamer actor failed, not processing further requests"); - if (activeMapStreamersCount == 0) { - getContext().getSystem().log().info("No active mapStreamer actors, shutting down"); - getContext().getSystem().terminate(); - shutdownSignal.completeExceptionally(userException); - } + getContext().getSystem().log().info("Previous mapStreamer actor failed, not processing further requests"); return; } @@ -150,14 +166,64 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) { public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy( DeciderBuilder.match(Exception.class, e -> { - shutdownSignal.completeExceptionally(e); - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); + shutdownSignal.completeExceptionally(e); return SupervisorStrategy.stop(); }).build() ); } + + private void handleInputCompleted() { + inputCompleted = true; + finishIfDrained(); + } + + // EOF and failures can arrive while child actors are still processing. + // Only finish the stream once all started child actors have replied or failed. + private void finishIfDrained() { + if (activeMapStreamersCount != 0) { + return; + } + if (userException != null) { + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(userException); + return; + } + if (inputCompleted) { + completeResponse(); + } + } + + private void completeResponse() { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onCompleted(); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + getContext().getSystem().stop(getSelf()); + } + } + } + + private void sendError(Throwable throwable) { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onError(throwable); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } + } + } + + private void handleResponseObserverFailure(RuntimeException e) { + log.warn("response stream is already closed; stopping map-stream supervisor", e); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(e); + } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index 8d62e735..ab94ece1 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -7,6 +7,7 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.map.v1.MapGrpc; import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.InputStreamError; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -61,12 +62,12 @@ public void onNext(MapOuterClass.MapRequest request) { @Override public void onError(Throwable throwable) { - mapStreamSupervisorActor.tell(new Exception(throwable), ActorRef.noSender()); + mapStreamSupervisorActor.tell(new InputStreamError(throwable), ActorRef.noSender()); } @Override public void onCompleted() { - responseObserver.onCompleted(); + mapStreamSupervisorActor.tell(Constants.EOF, ActorRef.noSender()); } }; } diff --git a/src/main/java/io/numaproj/numaflow/shared/InputStreamError.java b/src/main/java/io/numaproj/numaflow/shared/InputStreamError.java new file mode 100644 index 00000000..523e7f00 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/InputStreamError.java @@ -0,0 +1,16 @@ +package io.numaproj.numaflow.shared; + +/** + * Message sent by gRPC services to supervisor actors when the inbound request stream fails. + */ +public class InputStreamError { + private final Throwable cause; + + public InputStreamError(Throwable cause) { + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } +} diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java index a2c99855..7d430e05 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java @@ -5,6 +5,7 @@ import com.google.protobuf.Empty; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.InputStreamError; import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.AllArgsConstructor; @@ -64,7 +65,7 @@ public void onNext(Sourcetransformer.SourceTransformRequest request) { @Override public void onError(Throwable throwable) { - transformSupervisorActor.tell(new Exception(throwable), ActorRef.noSender()); + transformSupervisorActor.tell(new InputStreamError(throwable), ActorRef.noSender()); } @Override diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index fac41fb0..a692ef8e 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -8,18 +8,17 @@ import akka.actor.SupervisorStrategy; import akka.japi.pf.DeciderBuilder; import akka.japi.pf.ReceiveBuilder; -import io.grpc.stub.StreamObserver; import io.grpc.Status; -import com.google.protobuf.Any; -import com.google.rpc.Code; -import com.google.rpc.DebugInfo; import io.grpc.protobuf.StatusProto; +import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.shared.ExceptionUtils; +import io.numaproj.numaflow.shared.InputStreamError; import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; import lombok.extern.slf4j.Slf4j; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * TransformSupervisorActor actor is responsible for distributing the messages to actors and handling failure. @@ -52,6 +51,8 @@ class TransformSupervisorActor extends AbstractActor { private final SourceTransformer transformer; private final StreamObserver responseObserver; private final CompletableFuture shutdownSignal; + private final AtomicBoolean streamClosed = new AtomicBoolean(false); + private boolean inputCompleted; private int activeTransformersCount; private Exception userException; @@ -69,6 +70,7 @@ public TransformSupervisorActor( this.transformer = transformer; this.responseObserver = responseObserver; this.shutdownSignal = shutdownSignal; + this.inputCompleted = false; this.userException = null; this.activeTransformersCount = 0; } @@ -105,7 +107,7 @@ public void preRestart(Throwable reason, Optional message) { .getSystem() .log() .warning("supervisor pre restart was executed due to: {}", reason.getMessage()); - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) .asException()); @@ -132,9 +134,10 @@ public Receive createReceive() { .create() .match(Sourcetransformer.SourceTransformRequest.class, this::processRequest) .match(Sourcetransformer.SourceTransformResponse.class, this::sendResponse) + .match(InputStreamError.class, this::handleInputStreamError) .match(Exception.class, this::handleFailure) .match(AllDeadLetters.class, this::handleDeadLetters) - .match(String.class, eof -> responseObserver.onCompleted()) + .match(String.class, eof -> handleInputCompleted()) .build(); } @@ -149,12 +152,12 @@ private void handleFailure(Exception e) { userException = e; // only send the very first exception to the client // one exception should trigger a container restart - // Build gRPC Status com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e); - responseObserver.onError(StatusProto.toStatusRuntimeException(status)); + sendError(StatusProto.toStatusRuntimeException(status)); } activeTransformersCount--; + finishIfDrained(); } /** @@ -163,8 +166,16 @@ private void handleFailure(Exception e) { * @param transformResponse The SourceTransformResponse to be sent. */ private void sendResponse(Sourcetransformer.SourceTransformResponse transformResponse) { - responseObserver.onNext(transformResponse); - activeTransformersCount--; + try { + if (!streamClosed.get()) { + responseObserver.onNext(transformResponse); + } + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + activeTransformersCount--; + finishIfDrained(); + } } /** @@ -175,12 +186,6 @@ private void sendResponse(Sourcetransformer.SourceTransformResponse transformRes private void processRequest(Sourcetransformer.SourceTransformRequest transformRequest) { if (userException != null) { log.info("a previous transformer actor failed, not processing any more requests"); - if (activeTransformersCount == 0) { - log.info("there is no more active transformer AKKA actors - stopping the system"); - getContext().getSystem().stop(getSelf()); - log.info("AKKA system stopped"); - shutdownSignal.completeExceptionally(userException); - } return; } // Create a TransformerActor for each incoming request. @@ -193,6 +198,13 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe activeTransformersCount++; } + private void handleInputStreamError(InputStreamError error) { + log.error("inbound request stream error, stopping source-transform supervisor", error.getCause()); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(error.getCause()); + } + /** * Handles any dead letters that occur during the processing of the SourceTransformRequest. * @@ -200,11 +212,61 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe */ private void handleDeadLetters(AllDeadLetters deadLetter) { log.debug("got a dead letter, stopping the execution"); - responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException()); + sendError(Status.INTERNAL.withDescription("dead letters").asException()); getContext().getSystem().stop(getSelf()); shutdownSignal.completeExceptionally(new Throwable("dead letters")); } + private void handleInputCompleted() { + inputCompleted = true; + finishIfDrained(); + } + + // EOF and failures can arrive while child actors are still processing. + // Only finish the stream once all started child actors have replied or failed. + private void finishIfDrained() { + if (activeTransformersCount != 0) { + return; + } + if (userException != null) { + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(userException); + return; + } + if (inputCompleted) { + completeResponse(); + } + } + + private void completeResponse() { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onCompleted(); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } finally { + getContext().getSystem().stop(getSelf()); + } + } + } + + private void sendError(Throwable throwable) { + if (streamClosed.compareAndSet(false, true)) { + try { + responseObserver.onError(throwable); + } catch (RuntimeException e) { + handleResponseObserverFailure(e); + } + } + } + + private void handleResponseObserverFailure(RuntimeException e) { + log.warn("response stream is already closed; stopping source-transform supervisor", e); + streamClosed.set(true); + getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(e); + } + /** * Defines the supervisor strategy for the actor. * @@ -216,10 +278,11 @@ public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy( DeciderBuilder .match(Exception.class, e -> { - responseObserver.onError(Status.INTERNAL + sendError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); + shutdownSignal.completeExceptionally(e); return SupervisorStrategy.stop(); }) .build()); From 648f4e5d664d424938dd1e1c8095eb49370e7e54 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Wed, 6 May 2026 11:47:34 +0530 Subject: [PATCH 2/3] test: avoid JVM exit in server failure tests Signed-off-by: adarsh0728 --- src/main/java/io/numaproj/numaflow/mapper/Server.java | 7 ++++++- .../io/numaproj/numaflow/sourcetransformer/Server.java | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index 7e22414f..041a032b 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -23,6 +23,7 @@ public class Server { private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private final GrpcServerWrapper server; + private final boolean exitOnFailure; /** * constructor to create gRPC server. @@ -43,6 +44,7 @@ public Server(Mapper mapper, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); this.grpcConfig = grpcConfig; this.server = new GrpcServerWrapper(this.grpcConfig, new Service(mapper, this.shutdownSignal)); + this.exitOnFailure = true; } @VisibleForTesting @@ -53,6 +55,7 @@ protected Server(GRPCConfig grpcConfig, Mapper service, ServerInterceptor interc interceptor, serverName, new Service(service, this.shutdownSignal)); + this.exitOnFailure = false; } /** @@ -102,7 +105,9 @@ public void start() throws Exception { this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate - System.exit(0); + if (exitOnFailure) { + System.exit(0); + } } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 055fa41e..01a8dfac 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -22,6 +22,7 @@ public class Server { private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private final GrpcServerWrapper server; + private final boolean exitOnFailure; /** * constructor to create gRPC server. @@ -42,6 +43,7 @@ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { this.shutdownSignal = new CompletableFuture<>(); this.grpcConfig = grpcConfig; this.server = new GrpcServerWrapper(this.grpcConfig, new Service(sourceTransformer, this.shutdownSignal)); + this.exitOnFailure = true; } @VisibleForTesting @@ -52,6 +54,7 @@ protected Server(GRPCConfig grpcConfig, SourceTransformer service, ServerInterce interceptor, serverName, new Service(service, this.shutdownSignal)); + this.exitOnFailure = false; } /** @@ -98,7 +101,9 @@ public void start() throws Exception { this.stop(); // FIXME - this is a workaround to immediately terminate the JVM process // The correct way to do this is to stop all the actors and wait for them to terminate - System.exit(0); + if (exitOnFailure) { + System.exit(0); + } } catch (InterruptedException ex) { Thread.interrupted(); ex.printStackTrace(System.err); From ab346adb678423422a81899c7fe9968663dae0ea Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Thu, 7 May 2026 15:44:52 +0530 Subject: [PATCH 3/3] add tests Signed-off-by: adarsh0728 --- .../mapstreamer/MapStreamSupervisorActor.java | 3 + .../mapper/MapSupervisorActorTest.java | 209 +++++++++++++++++ .../MapStreamSupervisorActorTest.java | 207 +++++++++++++++++ .../TransformSupervisorActorTest.java | 210 ++++++++++++++++++ 4 files changed, 629 insertions(+) create mode 100644 src/test/java/io/numaproj/numaflow/mapper/MapSupervisorActorTest.java create mode 100644 src/test/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActorTest.java create mode 100644 src/test/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActorTest.java diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java index 2b3942ac..d856f7e7 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java @@ -145,6 +145,9 @@ private void sendResponse(MapOuterClass.MapResponse mapResponse) { } catch (RuntimeException e) { handleResponseObserverFailure(e); } finally { + if (!mapResponse.hasStatus() || !mapResponse.getStatus().getEot()) { + return; + } activeMapStreamersCount--; finishIfDrained(); } diff --git a/src/test/java/io/numaproj/numaflow/mapper/MapSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/mapper/MapSupervisorActorTest.java new file mode 100644 index 00000000..97b74a0c --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapper/MapSupervisorActorTest.java @@ -0,0 +1,209 @@ +package io.numaproj.numaflow.mapper; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.InputStreamError; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class MapSupervisorActorTest { + + @Test + public void given_inputStreamError_when_supervisorHandlesIt_then_shutdownCompletesWithoutResponseWrite() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapper-input-error-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + RuntimeException streamError = new RuntimeException("client cancelled"); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapSupervisorActor.props(new BlockingMapper(new CountDownLatch(0), new CountDownLatch(0)), + responseObserver, + shutdownSignal)); + + supervisor.tell(new InputStreamError(streamError), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertSame(streamError, exception.getCause()); + assertTrue(responseObserver.events.isEmpty()); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_eofArrivesBeforeChildResponse_when_childDrains_then_responseIsSentBeforeCompleted() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapper-drain-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + CountDownLatch mapperStarted = new CountDownLatch(1); + CountDownLatch releaseMapper = new CountDownLatch(1); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapSupervisorActor.props(new BlockingMapper(mapperStarted, releaseMapper), + responseObserver, + shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + assertTrue(mapperStarted.await(2, TimeUnit.SECONDS)); + supervisor.tell(Constants.EOF, ActorRef.noSender()); + + Thread.sleep(100); + assertFalse(responseObserver.completed.isDone()); + + releaseMapper.countDown(); + + assertTrue(responseObserver.completed.get(2, TimeUnit.SECONDS)); + assertEquals(List.of("next", "completed"), responseObserver.events); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_childFailure_when_childDrains_then_shutdownCompletesExceptionally() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapper-failure-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapSupervisorActor.props(new FailingMapper(), responseObserver, shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + + assertCompletesExceptionally(shutdownSignal); + assertTrue(responseObserver.error.get(2, TimeUnit.SECONDS) instanceof RuntimeException); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_responseObserverThrowsOnNext_when_supervisorHandlesResponse_then_shutdownCompletesExceptionally() + throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapper-closed-response-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapSupervisorActor.props(new BlockingMapper(new CountDownLatch(0), new CountDownLatch(0)), + new FailingOnNextObserver(), + shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertTrue(exception.getCause() instanceof IllegalStateException); + assertEquals("call already closed", exception.getCause().getMessage()); + } finally { + actorSystem.terminate(); + } + } + + private static MapOuterClass.MapRequest mapRequest() { + return MapOuterClass.MapRequest.newBuilder() + .setId("id") + .setRequest(MapOuterClass.MapRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("input")) + .addKeys("key") + .build()) + .build(); + } + + private static ExecutionException assertCompletesExceptionally(CompletableFuture future) + throws InterruptedException { + try { + future.get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + return e; + } catch (java.util.concurrent.TimeoutException e) { + throw new AssertionError("expected future to complete exceptionally", e); + } + throw new AssertionError("expected future to complete exceptionally"); + } + + private static class BlockingMapper extends Mapper { + private final CountDownLatch started; + private final CountDownLatch release; + + private BlockingMapper(CountDownLatch started, CountDownLatch release) { + this.started = started; + this.release = release; + } + + @Override + public MessageList processMessage(String[] keys, Datum datum) { + started.countDown(); + try { + release.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return MessageList.newBuilder() + .addMessage(new Message("output".getBytes())) + .build(); + } + } + + private static class FailingMapper extends Mapper { + @Override + public MessageList processMessage(String[] keys, Datum datum) { + throw new RuntimeException("user failure"); + } + } + + private static class RecordingObserver implements StreamObserver { + private final List events = new CopyOnWriteArrayList<>(); + private final CompletableFuture completed = new CompletableFuture<>(); + private final CompletableFuture error = new CompletableFuture<>(); + + @Override + public void onNext(MapOuterClass.MapResponse mapResponse) { + events.add("next"); + } + + @Override + public void onError(Throwable throwable) { + events.add("error"); + error.complete(throwable); + } + + @Override + public void onCompleted() { + events.add("completed"); + completed.complete(true); + } + } + + private static class FailingOnNextObserver implements StreamObserver { + @Override + public void onNext(MapOuterClass.MapResponse mapResponse) { + throw new IllegalStateException("call already closed"); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActorTest.java new file mode 100644 index 00000000..44dfe24d --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActorTest.java @@ -0,0 +1,207 @@ +package io.numaproj.numaflow.mapstreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.InputStreamError; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class MapStreamSupervisorActorTest { + + @Test + public void given_inputStreamError_when_supervisorHandlesIt_then_shutdownCompletesWithoutResponseWrite() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapstream-input-error-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + RuntimeException streamError = new RuntimeException("client cancelled"); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapStreamSupervisorActor.props(new BlockingMapStreamer(new CountDownLatch(0), new CountDownLatch(0)), + responseObserver, + shutdownSignal)); + + supervisor.tell(new InputStreamError(streamError), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertSame(streamError, exception.getCause()); + assertTrue(responseObserver.events.isEmpty()); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_eofArrivesBeforeChildResponses_when_childDrains_then_responsesAreSentBeforeCompleted() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapstream-drain-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + CountDownLatch streamerStarted = new CountDownLatch(1); + CountDownLatch releaseStreamer = new CountDownLatch(1); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapStreamSupervisorActor.props(new BlockingMapStreamer(streamerStarted, releaseStreamer), + responseObserver, + shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + assertTrue(streamerStarted.await(2, TimeUnit.SECONDS)); + supervisor.tell(Constants.EOF, ActorRef.noSender()); + + Thread.sleep(100); + assertFalse(responseObserver.completed.isDone()); + + releaseStreamer.countDown(); + + assertTrue(responseObserver.completed.get(2, TimeUnit.SECONDS)); + assertEquals(List.of("next", "next", "completed"), responseObserver.events); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_childFailure_when_childDrains_then_shutdownCompletesExceptionally() throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapstream-failure-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapStreamSupervisorActor.props(new FailingMapStreamer(), responseObserver, shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + + assertCompletesExceptionally(shutdownSignal); + assertTrue(responseObserver.error.get(2, TimeUnit.SECONDS) instanceof RuntimeException); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_responseObserverThrowsOnNext_when_supervisorHandlesResponse_then_shutdownCompletesExceptionally() + throws Exception { + ActorSystem actorSystem = ActorSystem.create("mapstream-closed-response-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + + try { + ActorRef supervisor = actorSystem.actorOf( + MapStreamSupervisorActor.props(new BlockingMapStreamer(new CountDownLatch(0), new CountDownLatch(0)), + new FailingOnNextObserver(), + shutdownSignal)); + + supervisor.tell(mapRequest(), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertTrue(exception.getCause() instanceof IllegalStateException); + assertEquals("call already closed", exception.getCause().getMessage()); + } finally { + actorSystem.terminate(); + } + } + + private static MapOuterClass.MapRequest mapRequest() { + return MapOuterClass.MapRequest.newBuilder() + .setId("id") + .setRequest(MapOuterClass.MapRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("input")) + .addKeys("key") + .build()) + .build(); + } + + private static ExecutionException assertCompletesExceptionally(CompletableFuture future) + throws InterruptedException { + try { + future.get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + return e; + } catch (java.util.concurrent.TimeoutException e) { + throw new AssertionError("expected future to complete exceptionally", e); + } + throw new AssertionError("expected future to complete exceptionally"); + } + + private static class BlockingMapStreamer extends MapStreamer { + private final CountDownLatch started; + private final CountDownLatch release; + + private BlockingMapStreamer(CountDownLatch started, CountDownLatch release) { + this.started = started; + this.release = release; + } + + @Override + public void processMessage(String[] keys, Datum datum, OutputObserver outputObserver) { + started.countDown(); + try { + release.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + outputObserver.send(new Message("output".getBytes())); + } + } + + private static class FailingMapStreamer extends MapStreamer { + @Override + public void processMessage(String[] keys, Datum datum, OutputObserver outputObserver) { + throw new RuntimeException("user failure"); + } + } + + private static class RecordingObserver implements StreamObserver { + private final List events = new CopyOnWriteArrayList<>(); + private final CompletableFuture completed = new CompletableFuture<>(); + private final CompletableFuture error = new CompletableFuture<>(); + + @Override + public void onNext(MapOuterClass.MapResponse mapResponse) { + events.add("next"); + } + + @Override + public void onError(Throwable throwable) { + events.add("error"); + error.complete(throwable); + } + + @Override + public void onCompleted() { + events.add("completed"); + completed.complete(true); + } + } + + private static class FailingOnNextObserver implements StreamObserver { + @Override + public void onNext(MapOuterClass.MapResponse mapResponse) { + throw new IllegalStateException("call already closed"); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActorTest.java new file mode 100644 index 00000000..7b3d6092 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActorTest.java @@ -0,0 +1,210 @@ +package io.numaproj.numaflow.sourcetransformer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.shared.InputStreamError; +import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; +import org.junit.Test; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class TransformSupervisorActorTest { + + @Test + public void given_inputStreamError_when_supervisorHandlesIt_then_shutdownCompletesWithoutResponseWrite() throws Exception { + ActorSystem actorSystem = ActorSystem.create("transform-input-error-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + RuntimeException streamError = new RuntimeException("client cancelled"); + + try { + ActorRef supervisor = actorSystem.actorOf( + TransformSupervisorActor.props(new BlockingTransformer(new CountDownLatch(0), new CountDownLatch(0)), + responseObserver, + shutdownSignal)); + + supervisor.tell(new InputStreamError(streamError), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertSame(streamError, exception.getCause()); + assertTrue(responseObserver.events.isEmpty()); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_eofArrivesBeforeChildResponse_when_childDrains_then_responseIsSentBeforeCompleted() throws Exception { + ActorSystem actorSystem = ActorSystem.create("transform-drain-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + CountDownLatch transformerStarted = new CountDownLatch(1); + CountDownLatch releaseTransformer = new CountDownLatch(1); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + TransformSupervisorActor.props(new BlockingTransformer(transformerStarted, releaseTransformer), + responseObserver, + shutdownSignal)); + + supervisor.tell(transformRequest(), ActorRef.noSender()); + assertTrue(transformerStarted.await(2, TimeUnit.SECONDS)); + supervisor.tell(Constants.EOF, ActorRef.noSender()); + + Thread.sleep(100); + assertFalse(responseObserver.completed.isDone()); + + releaseTransformer.countDown(); + + assertTrue(responseObserver.completed.get(2, TimeUnit.SECONDS)); + assertEquals(List.of("next", "completed"), responseObserver.events); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_childFailure_when_childDrains_then_shutdownCompletesExceptionally() throws Exception { + ActorSystem actorSystem = ActorSystem.create("transform-failure-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + RecordingObserver responseObserver = new RecordingObserver(); + + try { + ActorRef supervisor = actorSystem.actorOf( + TransformSupervisorActor.props(new FailingTransformer(), responseObserver, shutdownSignal)); + + supervisor.tell(transformRequest(), ActorRef.noSender()); + + assertCompletesExceptionally(shutdownSignal); + assertTrue(responseObserver.error.get(2, TimeUnit.SECONDS) instanceof RuntimeException); + } finally { + actorSystem.terminate(); + } + } + + @Test + public void given_responseObserverThrowsOnNext_when_supervisorHandlesResponse_then_shutdownCompletesExceptionally() + throws Exception { + ActorSystem actorSystem = ActorSystem.create("transform-closed-response-test"); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + + try { + ActorRef supervisor = actorSystem.actorOf( + TransformSupervisorActor.props(new BlockingTransformer(new CountDownLatch(0), new CountDownLatch(0)), + new FailingOnNextObserver(), + shutdownSignal)); + + supervisor.tell(transformRequest(), ActorRef.noSender()); + + ExecutionException exception = assertCompletesExceptionally(shutdownSignal); + assertTrue(exception.getCause() instanceof IllegalStateException); + assertEquals("call already closed", exception.getCause().getMessage()); + } finally { + actorSystem.terminate(); + } + } + + private static Sourcetransformer.SourceTransformRequest transformRequest() { + return Sourcetransformer.SourceTransformRequest.newBuilder() + .setRequest(Sourcetransformer.SourceTransformRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("input")) + .addKeys("key") + .setId("id") + .build()) + .build(); + } + + private static ExecutionException assertCompletesExceptionally(CompletableFuture future) + throws InterruptedException { + try { + future.get(2, TimeUnit.SECONDS); + } catch (ExecutionException e) { + return e; + } catch (java.util.concurrent.TimeoutException e) { + throw new AssertionError("expected future to complete exceptionally", e); + } + throw new AssertionError("expected future to complete exceptionally"); + } + + private static class BlockingTransformer extends SourceTransformer { + private final CountDownLatch started; + private final CountDownLatch release; + + private BlockingTransformer(CountDownLatch started, CountDownLatch release) { + this.started = started; + this.release = release; + } + + @Override + public MessageList processMessage(String[] keys, Datum datum) { + started.countDown(); + try { + release.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return MessageList.newBuilder() + .addMessage(new Message("output".getBytes(), Instant.EPOCH)) + .build(); + } + } + + private static class FailingTransformer extends SourceTransformer { + @Override + public MessageList processMessage(String[] keys, Datum datum) { + throw new RuntimeException("user failure"); + } + } + + private static class RecordingObserver implements StreamObserver { + private final List events = new CopyOnWriteArrayList<>(); + private final CompletableFuture completed = new CompletableFuture<>(); + private final CompletableFuture error = new CompletableFuture<>(); + + @Override + public void onNext(Sourcetransformer.SourceTransformResponse sourceTransformResponse) { + events.add("next"); + } + + @Override + public void onError(Throwable throwable) { + events.add("error"); + error.complete(throwable); + } + + @Override + public void onCompleted() { + events.add("completed"); + completed.complete(true); + } + } + + private static class FailingOnNextObserver implements StreamObserver { + @Override + public void onNext(Sourcetransformer.SourceTransformResponse sourceTransformResponse) { + throw new IllegalStateException("call already closed"); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + } +}