I need to synchronize a Flow.mapAsyncUnordered
such that it does not pull until the future has completed. I have a cyclical graph where the next element depends on the results of the previous, therefore I need to way until the downstream has finished before pulling again. Flow.mapAsyncUnordered
currently always buffers at least one element, so it is pulling too early. I have come up with the following:
val ref: ActorRef = <some dummy actor>
Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[DummyMessage](2))
val zip = b.add(Zip[DummyMessage, Done])
val flow = b.add(Flow[DummyMessage].mapAsyncUnordered(1)(ref ? _))
broadcast ~> zip.in0
broadcast ~> flow ~> zip.in1
zip.out ~> Sink.ignore
SinkShape(broadcast.in)
})
Is there a better way to pull messages one at a time? I had also considered the following:
Flow[DummyMessage].flatMapConcat(m => Source.fromFuture(ref ? m)).to(Sink.ignore)
Thanks
Jeff