I have two different flows I want to parallelize. I don’t want the “parent” of these two flows to emit upstream until both of these “child” flows have completed, no matter how much longer the slower of the two flows takes. I know I want to use the GraphDSL but, because of this requirement that both complete, I don’t think I want to use the Balance operator since it would allow one of the two flows to fall behind.
The only way I could see to do this is through the Partition operator, but only by hacking the operator. As you see below, with the function being passed to the second argument of Partition, I’m always returning 0
for the partition. And then I’m ignoring the partition and using my own counter to decide which flow to kick off:
private def createParallelFlowsToProduceMessagesToPublishTopics()(
implicit ec: ExecutionContext)
: Flow[PostProcessorContainer, PostProcessorContainer, NotUsed] = {
val graph = GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val numberOfBranches = 2
var counter = 0
val partition = builder.add(Partition[PostProcessorContainer](numberOfBranches, container => 0))
val merge = builder.add(Merge[PostProcessorContainer](numberOfBranches))
for (_ <- 1 to numberOfBranches) {
if (counter % numberOfBranches == 0) {
partition ~> createFlowToProduceMessageToPublishTopic().async ~> merge
} else {
partition ~> createFlowToProduceMessageToUnifiedPublishTopic().async ~> merge
}
counter = counter + 1
}
FlowShape(partition.in, merge.out)
}
Flow.fromGraph(graph)
}
Surely, there must be a simpler, less hacky way to accomplish this with Partition. Or it may be that my misgivings about Balance are unfounded. Or it may be that I can take advantage of one of the Predefined shapes.
What’s the canonical way to express what I’m trying to accomplish?