Skip to content

fix: handle cancelled gRPC streams in supervisors#216

Draft
adarsh0728 wants to merge 2 commits intomainfrom
fix-guard
Draft

fix: handle cancelled gRPC streams in supervisors#216
adarsh0728 wants to merge 2 commits intomainfrom
fix-guard

Conversation

@adarsh0728
Copy link
Copy Markdown
Member

@adarsh0728 adarsh0728 commented May 5, 2026

Summary

  • Guard supervisor response streams so cancelled/closed gRPC calls do not throw back into Akka restart handling.
  • Separate inbound request stream errors from child actor failures to avoid corrupting active actor counts.
  • Defer normal stream completion until all in-flight child actors have replied or failed.

Issue observed in Pipeline

udf container was stuck after this error

Caused by: java.lang.IllegalStateException: call already closedat java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)at akka.dispatch.Mailbox.run(Mailbox.scala:230)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:535)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:516)at akka.actor.ActorCell.faultRecreate(ActorCell.scala:410)at akka.actor.dungeon.FaultHandling.faultRecreate$(FaultHandling.scala:36)at akka.actor.dungeon.FaultHandling.faultRecreate(FaultHandling.scala:98)at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)at akka.actor.dungeon.FaultHandling$$anon$1.applyOrElse(FaultHandling.scala:336)at akka.actor.dungeon.FaultHandling$$anon$1.applyOrElse(FaultHandling.scala:341)at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)at akka.actor.dungeon.FaultHandling.$anonfun$1(FaultHandling.scala:96)at akka.actor.PreRestartException$.apply(Actor.scala:214)akka.actor.PreRestartException: akka://mapper/user/$c: exception in preRestart(class io.grpc.StatusRuntimeException, class io.numaproj.numaflow.map.v1.MapOuterClass$MapResponse)[ERROR] [04/27/2026 10:17:25.177] [mapper-akka.actor.default-dispatcher-372] [akka://mapper/user/$c] call already closed[WARN] [04/27/2026 10:17:25.176] [mapper-akka.actor.default-dispatcher-372] [akka.actor.ActorSystemImpl(mapper)] supervisor pre restart was executed due to: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception[INFO] [akkaDeadLetter][04/27/2026 10:17:25.176] [mapper-akka.actor.default-dispatcher-377] [akka://mapper/user/$c/$e] Message [akka.dispatch.sysmsg.Suspend] from Actor[akka://mapper/user/$c/$e#-865804973] to Actor[akka://mapper/user/$c/$e#-865804973] was not delivered. [7] dead letters encountered. If this is not an expected behavior then Actor[akka://mapper/user/$c/$e#-865804973] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.actor.ActorCell.invoke(ActorCell.scala:547)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:219)at akka.actor.Actor.aroundReceive$(Actor.scala:471)at akka.actor.Actor.aroundReceive(Actor.scala:537)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:270)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:269)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:213)at scala.PartialFunction.applyOrElse(PartialFunction.scala:214)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at io.numaproj.numaflow.mapper.MapSupervisorActor.sendResponse(MapSupervisorActor.java:118)at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)at io.grpc.Status.asRuntimeException(Status.java:525)io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception[ERROR] [04/27/2026 10:17:25.176] [mapper-akka.actor.internal-dispatcher-380] [akka://mapper/user/$c] CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception... 12 moreat akka.actor.dungeon.FaultHandling.faultRecreate(FaultHandling.scala:94)

Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@KeranYang KeranYang self-requested a review May 5, 2026 13:13
Signed-off-by: adarsh0728 <gooneriitk@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

❌ Patch coverage is 46.74556% with 90 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@0c3b01c). Learn more about missing BASE report.

Files with missing lines Patch % Lines
...numaflow/mapstreamer/MapStreamSupervisorActor.java 41.50% 28 Missing and 3 partials ⚠️
...o/numaproj/numaflow/mapper/MapSupervisorActor.java 54.00% 20 Missing and 3 partials ⚠️
...ow/sourcetransformer/TransformSupervisorActor.java 54.00% 20 Missing and 3 partials ⚠️
.../io/numaproj/numaflow/shared/InputStreamError.java 0.00% 4 Missing ⚠️
.../main/java/io/numaproj/numaflow/mapper/Server.java 25.00% 2 Missing and 1 partial ⚠️
...io/numaproj/numaflow/sourcetransformer/Server.java 25.00% 2 Missing and 1 partial ⚠️
...main/java/io/numaproj/numaflow/mapper/Service.java 0.00% 1 Missing ⚠️
...java/io/numaproj/numaflow/mapstreamer/Service.java 50.00% 1 Missing ⚠️
...o/numaproj/numaflow/sourcetransformer/Service.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #216   +/-   ##
=======================================
  Coverage        ?   60.99%           
  Complexity      ?      586           
=======================================
  Files           ?      156           
  Lines           ?     3705           
  Branches        ?      259           
=======================================
  Hits            ?     2260           
  Misses          ?     1252           
  Partials        ?      193           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant