I am trying to build a graph modelling a case statement, with the added capability to combine the output of the merge with the input of the partition.
The graph basically looks like this:
It works as I expect it to, as long as Flow1, Flow2, … do not create and merge substreams - in which case the composed flow deadlocks.
The graph is created with:
def when1[A, B, C](partition: A => Int, actions: List[Flow[A, B, NotUsed]], map: (A, B) => C): Flow[A, C, NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val b = builder.add(Broadcast[A](2))
val p = builder.add(Partition(actions.size, partition))
val m = builder.add(Merge[B](actions.size))
actions.foreach(p ~> builder.add(_) ~> m)
val agg = builder.add(Flow[(A, B)].map(x => map(x._1, x._2)))
val z = builder.add(Zip[A, B])
b ~> z.in0
b ~> p
m ~> z.in1
z.out ~> agg
FlowShape(b.in, agg.out)
})
}
I want to treat the flows used to parametrize the branches as black boxes. What am I overlooking?
The problem is proably with the “what will backpressure and when”.
Zip pulls all inputs if its pulled.
BC pulls if all of its legs pulled.
I think merge pulls all inputs if its pulled.
Partition pulls if any output pulled.
So in theory, zip pulled, bc leg2 pulled, merge pulled, all of the Flows pulled, partition pulls bc leg1, bc pulls upstream. This is ok.
Bc gets an element, duplicates it, gives it to the zip, and to the partition. Partition gives to a flow, and it asks a new element from BC. If the flow drops the element its a deadlock, bcs the broadcast cant produce a new element while the zip is not pulling leg2. If the flow produces exactly one element, merge will merge, zip will zip, everything good again. If it produces more than one element, zip will zip, then it will be pulled again, so pull bc leg2, and merge, merge will give down the second element from the flow, but bc will not produce a new element on leg2 bcs it has no pulled leg1 yet.
With buffers and .async this will deadlock later, but its a ticking time bomb either way.
The “nobrainer” fix for this is to refactor your code to a partition-flows-merge and build the zip functionality inside the flow logics. (actions: List[Flow[A, (A,B), NotUsed]])
BTW we have a flow with context or something like that functionality. I never used it, but It worth to read, but I think you cant drop or duplicate elements in its substream.
Thank you @tg44 - again (you helped me out before). That makes sense and I was thinking through what backpressures/emits but am not clear about what it means “if the flow drops an element”. In my test case the oddWithSubstreams flow https://github.com/asmyth20/dl-7264/blob/master/src/test/scala/com/workday/maya/ControlTest.scala#L25 (when treated as a black box) does not do that - it emits exactly one element for each element received. Internally however, that flow merges in additional elements which it folds over then to arrive at the exact one element to emit again. Is the reduction of 2 elements to one by the fold considered a drop by the outside (BC)?
With a backbox flow the three interesting cases are;
droping Flow[String].filter(_.size < 4),
keeping Flow[String],
creating Flow[String].mapConcat(x => x :: x :: Nil)
elements. I tried to cover these, with only slightly looking at your tests (and not noticing the fold).
I think in your example the mergeSubstreams and the fold are the problems. You don’t use a groupBy or something similar, so you have no substreams that you need to merge. Also, the fold will only emit if the upstream is closed (if I remember well). This means the graph-stages before the flow are closed/completed/run-out-of-elements which will not happen bcs the BC not pulled in both legs, so it can’t pull the upstream to find out if we run out of elements or not… (You basically “drops” elements here, bcs you accidentally aggregate them.)
Hmm, I am using splitWhen to create a substream, and I believe fold on a substream emits when the substream has no more elements - not when the entire upstream has no more elements. That’s what I have seen when running oddWithSubstream on its own.
Hmmm, I should be rest more :D You are right, I missed the splitWhen part completely, and the fold and the mergeSubstream works as you described! However I find the problem!
def printer[A](s: String): Flow[A, A, NotUsed] = {
Flow[A].map{ x =>
println(s"$s - $x")
x
}
}
val r2 = Source(0 until 10).via(printer("a")).via(oddWithSubStreams).via(printer("b")).runWith(Sink.seq)
If you run this, you will get the problem too I think :D
splitWhen closes the opened substream when it needs to start a new. Sooo;
splitwhen gets a element
starts a sustream
give down to fold
fold pulls up
splitwhen pulls up
(here we have a deadlock bcs we “aggregated” or rather delayed an element)
if splitWhen would get a new element, it would run the predicate, find out that it needs to start a new substream so it closes the prev
prev fold get a close, pass down the element
new fold instantiated, get an element, and pull up
merge will merg prev folds element, and pass down to the other merge which will pass down to the zip
So with the splitWhen you need two elements at least to produce one element in your construction. But you can’t get two elements from the BC bcs its blocked by the zip which is blocked by not getting the first element out from the merge…
Thank you, this all makes sense. What worries me now is that in order to safely use a given graph, I may need to have implementation details of the components used to build it. Which defeats the purpose of modularity and composition.
Fortunately in my case I could modify the flows so that I would not need the outer Broadcast and Zip, while still being able to use flows that create and merge substreams (although in such cases the order is not preserved).