I’ve been troubleshooting a problem over the past several months (off and on) that has me completely flummoxed. I’ve setup a simple repro case: a gRPC emitter and a gRPC consumer. The emitter simulates firing events and the consumer simulates slow processing of the events. Periodically the HTTP/2 stream is cancelled without any additional context or reasoning why this was so. I am unable to solve this issue because the lack of error messaging and handling prevents me from carefully diagnosing the root cause.
Here is the code for the emitter (fired repeatedly over gRPC):
def publish(event: FooEvent): IO[Unit] = {
IO.fromFuture {
IO {
eventService
.publish()
.invoke(
Source
.single(event)
.log("before-dispatch")
.withAttributes(
Attributes
.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel
)
)
)
}
} map (_ => ())
Protobuf service definition:
service EventService {
rpc publish (stream FooEvent) returns (Received);
}
The sink:
val sink: Sink[GrpcFooEvent, NotUsed] =
(MergeHub.source[GrpcFooEvent] named "Test Sink" async) to {
Sink.foreach { _ =>
Thread.sleep(5000)
}
} run
override def publish(
in: Source[FooEvent, NotUsed],
metadata: Metadata
): Future[Received] = {
in runWith sink
Future.successful(Received())
}
And to generate traffic I’m running a Monad[IO].whileM_ ...
for about 25 seconds that fires the publish
method.
Here is the error I am seeing periodically:
[ERROR] [04/20/2022 11:00:05.045] [foo-events-akka.actor.default-dispatcher-7] [akka://foo-events/system/Materializers/StreamSupervisor-0/flow-732-0-unnamed] Error in stage [akka.stream.scaladsl.MergeHub$$anon$2$$anon$3]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
at akka.stream.scaladsl.MergeHub$$anon$2$$anon$3.onUpstreamFailure(Hub.scala:352)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:525)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:580)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:295)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: akka.http.scaladsl.model.http2.PeerClosedStreamException: Stream with ID [731] was closed by peer with code CANCEL(0x08)
What can I do to properly / carefully isolate the root cause of these errors?