Greetings, I need to put gRPC on the same port as Akka HTTP. The path I have taken so far is to instantiate the generated gRPC handler via partial()
, then use it in a route as follows:
val echoService = EchoServiceHandler.partial(new EchoServiceImpl(mat))
...
extractRequest { r =>
if (echoService.isDefinedAt(r)) complete(echoService(r)) else reject
} ~
This fails with an exception that is attached. If I wire the route without the domain test as complete(echoService(r))
, the request completes successfully.
Not sure if I am misusing the API or the generation of the gRPC handler leaves something to be desired. Thoughts?
java.lang.IllegalStateException: Substream Source(substream-out-1) cannot be materialized more than once
at akka.stream.impl.fusing.SubSource$$anon$13.createMaterializedTwiceException(StreamOfStreams.scala:826)
at akka.stream.impl.fusing.SubSource$$anon$13.<init>(StreamOfStreams.scala:797)
at akka.stream.impl.fusing.SubSource.createLogic(StreamOfStreams.scala:793)
at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:106)
at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:50)
at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:701)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:500)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:442)
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:703)
at akka.stream.scaladsl.Source.runWith(Source.scala:118)
at akka.grpc.scaladsl.GrpcMarshalling$.unmarshal(GrpcMarshalling.scala:53)
at com.example.protobuf.EchoServiceHandler$.$anonfun$partial$1(EchoServiceHandler.scala:91)
at akka.grpc.scaladsl.GrpcMarshalling$.$anonfun$negotiated$2(GrpcMarshalling.scala:45)
at scala.util.Success.map(Try.scala:262)
at akka.grpc.scaladsl.GrpcMarshalling$.$anonfun$negotiated$1(GrpcMarshalling.scala:45)
at scala.Option.map(Option.scala:242)
at akka.grpc.scaladsl.GrpcMarshalling$.negotiated(GrpcMarshalling.scala:43)
at com.example.protobuf.EchoServiceHandler$.handle$1(EchoServiceHandler.scala:86)
at com.example.protobuf.EchoServiceHandler$.$anonfun$partial$5(EchoServiceHandler.scala:102)
at scala.PartialFunction$Unlifted.applyOrElse(PartialFunction.scala:319)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
at com.example.service.MainService.$anonfun$routes$2(Main.scala:120)
at akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47)
at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
at akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$2(RouteConcatenation.scala:47)
at akka.http.scaladsl.util.FastFuture$.strictTransform$1(FastFuture.scala:41)
at akka.http.scaladsl.util.FastFuture$.transformWith$extension(FastFuture.scala:45)
at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation.$anonfun$$tilde$1(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$mapRouteResultWith$2(BasicDirectives.scala:74)
at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
at akka.http.scaladsl.server.directives.ExecutionDirectives.$anonfun$handleExceptions$2(ExecutionDirectives.scala:32)
at akka.http.scaladsl.server.directives.BasicDirectives.$anonfun$textract$2(BasicDirectives.scala:161)
at akka.http.scaladsl.server.Route$.$anonfun$asyncHandler$1(Route.scala:86)
at akka.http.impl.engine.http2.Http2Blueprint$.$anonfun$handleWithStreamIdHeader$1(Http2Blueprint.scala:123)
at akka.stream.impl.fusing.MapAsyncUnordered$$anon$31.onPush(Ops.scala:1401)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:56)
at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:52)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:95)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:535)
at akka.actor.Actor.aroundReceive$(Actor.scala:533)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:575)
at akka.actor.ActorCell.invoke(ActorCell.scala:545)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)