I have quite simple Akka Graph with one source and 2 sinks.
What I expect if sink in one branch fails it should stop executing second branch, but it’s not what I see in reality. In reality second branch is continuing to consume rest of the tuples. Do I miss anything?
Sink<Integer, CompletionStage<Done>> topHeadSink = Sink.foreach(o -> {
System.out.println("Upper sink " + o);
throw new RuntimeException("UPS");
});
Sink<Integer, CompletionStage<Done>> bottomHeadSink = Sink.foreach( o -> System.out.println("Bottom sink " + o));
final RunnableGraph<Pair<CompletionStage<Done>, CompletionStage<Done>>> g =
RunnableGraph.<Pair<CompletionStage<Done>, CompletionStage<Done>>>fromGraph(
GraphDSL.create(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
Keep.both(),
(b, top, bottom) -> {
final UniformFanOutShape<Integer, Integer> bcast = b.add(Broadcast.create(2));
b.from(b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))))
.viaFanOut(bcast)
.to(top);
b.from(bcast).toInlet(bottom.in());
return ClosedShape.getInstance();
}));
g.run(_runner.actorSystem());
Result of executing this graph is
Upper sink 1
Bottom sink 1
Bottom sink 2
Bottom sink 3
Bottom sink 4
Bottom sink 5
Bottom sink 6
Bottom sink 7
Bottom sink 8
Bottom sink 9
Bottom sink 10