Hi There,
We basically built a nice looking Akka Stream GUI, where people can point and click together any kind of complex stream. I have this represented in our own graph data structure and now want to convert that into an Akka stream graph. My challenge is when building the stream graph usingRunnableGraph.fromGraph(GraphDSL.create(... I only get NotUsed as completion stage, however, I need CompletionStage or if I have multiple sinks a list of completion stages.
I can not pre-create my sinks and pass in as I would not know which sink wires up with wich another element. I tried name or attribute but can’t access those as only SinkShapes are available in the builder function.
I’m sure I’m missing something.
Any pointer how I could get the completion stages?
Thanks.
If you want a completion stage, you’d typically materialize a graph and then call a Sink.seq on it. Make sure you put a limit on the number of CompletionStage you are getting back.
final int MAX_ALLOWED_SIZE = 100;
// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
final CompletionStage<List<String>> strings =
mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);
// OK. Collect up until max-th elements only, then cancel upstream
final CompletionStage<List<String>> strings =
mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);
If you want something more dynamic, you would typically use BroadcastHub / MergeHub to dynamically work with streams, but I’m not sure this is what you’re asking.