I have an Akka stream that appears to get held up at around 64 elements - a nice magic number! I suspect that I have a deadlock somewhere, but it continues to elude me after many hours. I was hoping that someone might be able to offer some advice on how to debug this kind of situation. Interestingly, the problem is evident on a single core machine, but not my multi-core laptop. That could also be down to a difference in the amount of time to process elements etc. I just don’t know.
Not an “expert” on this issue, but I can tell you what I did.
I had a graph where one of the Flows of a Graph was consuming one more element than it was emitting, which resulted in a deadlock. The difficulty was to track down where this was happening.
In my case it was a very complex graph (relatively speaking). So I ended up using this utility function in GraphDSL.create to find the problem
That is, experimentally inserting a ~> buffer[Something] ~> b until the deadlock resolved and I found where the graph locked. You might need a greater buffer size.
You might not be using GraphDSL.create but the same principle can be applied to the normal API using .buffer.
Maybe not the cleanest or best solution, but I was able to track the issue down this way.
Also, check out the documentation on MergePreferred in Working with Graphs • Akka Documentation - a solution for a common cause of deadlocks. Section Graph cycles, liveness and deadlocks
If we modify our feedback loop by replacing the Merge junction with a MergePreferred we can avoid the deadlock. …
You can request for a snapshotString that contains a representation of the stream internal wiring with backpressure information as well, from which it should be able to see which part of the stream is backpressuring.
Do you have any cycles in your graph? Those are notorious for causing deadlocks.
If so the approach with selectively adding buffering will defer the deadlock moment.
The difference between multimode and single core seems to point to the configuration how many cores akka thinks it has and hence how much true parallelism and how many is threads it will use. So I’d check that.
With one core you have no parallelism at all. Can it be that your stream graph requires parallelism, in its IO for instance?
Thanks @joost-de-vries. There is a kill switch in the graph so I’ll check that out further.
I removed IO from the graph as I suspected that was a problem area. I don’t think it is. I’ve also updated my configuration on this multi-core machine to bring down the parallelism to 1 but couldn’t reproduce it. It may also bear some relationship to performance - the single core machine is very slow. If only I could reproduce it locally!
controlCenterIoxSss 2019-06-27T07:55:35.526Z [07:55:35.520UTC] 127.0.0.1 com.cisco.streambed.controlcenter ERROR OneForOneStrategy - Index 67108866 out of bounds for length 20
controlCenterIoxSss java.lang.ArrayIndexOutOfBoundsException: Index 67108866 out of bounds for length 20
controlCenterIoxSss at akka.stream.impl.fusing.GraphInterpreter.$anonfun$toSnapshot$5(GraphInterpreter.scala:688)
I don’t think it is about available memory. There appears to be sufficient. AFAIK the inboxes are unbounded by default.
I’ve since been narrowing down things by removing stages. There are two via stages in my flow. If I remove both then all elements of the source are consumed. If I put one of them back in then 256 elements are consumed. With them both in, 128 elements. So, may be something around via?
To follow up, it doesn’t appear to be a deadlock, but rather, an issue with the source we have - it isn’t pushing when it supposed to. To debug this (the motivation for this conversation), I inserted a custom stage that logs demand:
.via(new GraphStage[FlowShape[DurableQueue.Received,
DurableQueue.Received]] {
val in = Inlet[DurableQueue.Received]("DQ.in")
val out = Outlet[DurableQueue.Received]("DQ.out")
override def shape: FlowShape[DurableQueue.Received,
DurableQueue.Received] =
FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes)
: GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
setHandler(in, new InHandler {
override def onPush(): Unit = {
log.info("pushing")
push(out, grab(in))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
log.info("pulling")
pull(in)
}
})
}
})
I noticed that the stage indicated that it was pulling and thus ready for more, but the source didn’t want to push anymore (when it should be). On the topic of the source, it is likely that some weird condition happens given the slowness of the device we operate on.
I remain interested in any other tips on debugging streams, but largely see this issue as resolved. Thanks for the conversation.
By the way, it’s very inconvenient that this flag is hardcoded. I wouldn’t want to have to go through the hassle of building Akka and changing dependencies just for the sake of changing one flag.