Hi,
I’m working on dynamically assembling Akka streams from a beautiful GUI. I’m trying to create a runnable graph with a completion stage and a kill switch. I got the CompletionStage working by pre-creating the all sinks that the user might add to the graph via GUI and later finding the freshly created SinkShapes that are actually passed into the builder (a dirty hack but seems to work).
for (int i = 0; i < sinkNodes.size(); i++) {
String sinkNodeName = sinkNodes.get(i);
Sink sink = ...
sinks.add(sink);
map.put(sinkNodeName, i);
}
RunnableGraph.fromGraph(GraphDSL.create(sinks,
(GraphDSL.Builder<List<Pair<UniqueKillSwitch, CompletionStage<Done>>>> builder, List<SinkShape<Tuple>> outs) -> {...
intr index = map.get(sinkNodeName);
// assuming order of sinks and SinkShapes remains identical
SinkShape<Tuple> dst = outs.get(index);
builder.from(oldOut).to(dst);
However, I have challenges with my KillSwitch that I add to my Source:
Source<Tuple, CompletionStage<Done>> source = ...
// Add Kill Switch
Graph<FlowShape<Tuple, Tuple>, UniqueKillSwitch> single = KillSwitches.single();
Source<Tuple, UniqueKillSwitch> sourceWithKillSwitch =
sourceWithCheckpoint.viaMat(single, Keep.right());
SourceShape<Tuple> shape = builder.add(sourceWithKillSwitch);
builder.from(shape).toInlet(operator)
When running my stream I basically only get a CompletableFuture but not the expected Pair<UniqueKillSwitch, CompletionStage<Done>>.
I assume that somewhere in my code the toMat(sink, Keep.both ()) is missing but I’m unclear where to add this using the GraphDSL. I did extensive googling but could not find anything. Any pointers are highly apprechiated.
Thanks.