Hi,
I am trying to port an existing Java “workflow system” over to Akka Streams. Since the different processing stages in the existing system are heavily stateful, I created my own GraphStages with their own GraphStageLogic. That works perfectly. Also mapping the existing workflow layout over to Akka Streams using the GraphDSL.Builder in the following way (code directly copied from the Akka documentation, not my custom GraphStages) was straight forward:
RunnableGraph.fromGraph(
GraphDSL.create(builder -> {
final SourceShape<Integer> A = builder.add(Source.single(0));
final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
final FlowShape<Integer, Integer> D =
builder.add(Flow.of(Integer.class).map(i -> i + 1));
final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
final SinkShape<Integer> G = builder.add(Sink.foreach(System.out::println));
builder.from(F.out()).toInlet(C.in(0));
builder.from(A).toInlet(B.in());
builder.from(B.out(0)).toInlet(C.in(1));
builder.from(C.out()).toInlet(F.in(0));
builder.from(B.out(1)).via(D).toInlet(E.in());
builder.from(E.out(0)).toInlet(F.in(1));
builder.from(E.out(1)).to(G);
return ClosedShape.getInstance();
}));
All that is already running with a very good performance
What I am struggling with now is the problem, that each of my stages produces a materialized value. I would like to combine all these materialized values using my own combination method so that the resulting graph will produce that combined materialized value.
Can anybody point me into the correct direction how do this? Or am I thinking in the totally wrong direction and should do it totaly differnt?
Thanks in advance,
Lay