I have an akka-http server that receives a possibly large stream of ByteStrings and needs to write the data to a number of websockets as well as other HTTP servers.
I can use a broadcast graph for writing to the websockets, however the only API I can find for streaming to an HTTP server requires a Source for the data. For example:
def makeHttpRequest(uri: String, data: Source[ByteString, Any]): HttpRequest = {
HttpRequest(HttpMethods.POST, uri,
entity = HttpEntity(ContentTypes.`application/octet-stream`, data))
}
Normally a Source can only be used once, so I can’t use it in the HTTP request and also other places.
I tried using BroadcastHub to get a reusable source:
val runnableGraph = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
val reusableSource = runnableGraph.run()
However the problem then is that in some cases the stream completes before it can be used again.
Here is a small example that shows how that can happen:
implicit val system = ActorSystem("StreamTest")
implicit val materializer = ActorMaterializer()
import system._
val source = Source(0 to 5)
val runnableGraph = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
val reusableSource = runnableGraph.run()
val a = reusableSource
.mapAsync(1) { i =>
Future { println(s"A: $i") }
}
.runWith(Sink.ignore)
val b = reusableSource
.alsoTo(Sink.foreach(i => println(s"B1: $i")))
.mapAsync(1) { i =>
Future { Thread.sleep(100); println(s"B2: $i") }
}
.runWith(Sink.ignore)
val c = reusableSource.runWith(Sink.ignore)
Future.sequence(List(a, b, c)).onComplete {
case Success(_) =>
println("OK")
system.terminate()
case Failure(ex) =>
println(s"Error: $ex")
system.terminate()
}
In the above test “A” completes the stream before “B” can consume it.
Another thing I tried is using MergeHub to get a sink that I could use to feed a source from within a broadcast graph:
val (remoteSink, remoteSource) = MergeHub.source[ByteString](1).preMaterialize()
However that resulted in this error:
[ERROR] [01/16/2019 19:32:44.783] [vbds-system-akka.actor.default-dispatcher-16] [akka://vbds-system/system/StreamSupervisor-0/flow-5-0-ignoreSink] Error in stage [akka.stream.scaladsl.MergeHub$$anon$2@4b6a39de]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
at akka.stream.scaladsl.MergeHub$$anon$2$$anon$3.onUpstreamFailure(Hub.scala:271)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:505)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:472)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.http.scaladsl.model.EntityStreamException: Entity stream truncation
at akka.http.impl.engine.parsing.HttpMessageParser$$anonfun$2.applyOrElse(HttpMessageParser.scala:330)
at akka.http.impl.engine.parsing.HttpMessageParser$$anonfun$2.applyOrElse(HttpMessageParser.scala:328)
at akka.stream.impl.fusing.Collect$$anon$6.$anonfun$wrappedPf$1(Ops.scala:217)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:178)
at akka.stream.impl.fusing.Collect$$anon$6.onPush(Ops.scala:219)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
... 18 more
For now I am doing a separate POST to the other HTTP servers for each ByteString received, but what I really want to do is stream the source in the HTTP request, in the same way as when you upload a file.
Any suggestions on how to do this?
Note that the HTTP servers also form an akka cluster, but I assume using akka messages to send the data would not be recommended (since it is bulk data). StreamRefs are also not recommended for bulk data.