Passing user data to custom graph stages when materializing a stream

I have configured a Akka stream workflow using some custom graph stages. The layout of my workflow is fixed during the runtime of the application. But I have to pass different user data to the GraphStageLogic instances on every stream materialization. How can that be achieved?

Today I am creating the entire workflow again and again using the GraphDSL. During that process I am passing that user data to the GraphStage which then during materialization passes it to the GraphStageLogic. Is that the indended behaviour to do this? Since in fact connecting all the graph stages in theory should be only necessary once for a fixed layout. Only the materialization should happen multiple times.

The same thing in my opinion also appliest to the standard FileSource. If I have a fixed layout, the source should get the file to process on every stream materialization, not when setting up the Source initially.

Any thoughts or comments on this?

Regards,
Lay

In some cases it makes sense to to use attributes to provide a value to a stage without creating the whole flow over and over again, see docs here: https://doc.akka.io/docs/akka/current/stream/stream-composition.html#attributes

Note that if it is about the Source of a flow being the only thing that differs between graphs you can also compose a bigger Flow or Sink beforehand and then combine that with different sources - it’s not limited to composing from the Source side.

Hi Johan,

Thanks for your fast response! I looked at the Attributes example (also before). But these Attributes are also added when creating the graph, not when materializing it, aren’t they? So as far as I understand, all materializations of the workflow are getting the same attribute values.

Just to give you an example what kind of things I want to achieve: Let’s assume I am materializing a number of Akka streams based on the same Graph in different actors so that they are in fact be processed in parallel. In the Flow components of the workflow, I have to add some information into a logfile, The logfile should be different for each materialzed instance of the workflow. How do I get that logfile into the flows?

Regarding the creation of the Stream: with the help of this forum I already managed to create a “huge” FanOut-Stage consisting of my entire Workflow Flow components without the source and sinks. So in the final step I only have to connect the Source and all the Sinks to that FanOut-Stage as you suggested above. That already optimizes the creation of the Stream, but does not entirly remove the step of connecting the Source and the Sinks to the stream before each materialization.

Ok, now I finally understood the Attributes :slight_smile:

I found the withAttributes() method at RunnableGraph(). Didn’t see that before in any examples. I am now able to pass the Logger to the materialized workflow:

      MyLoggerAttribute loggerAttribute = new MyLoggerAttribute(_logger);

      List<CompletionStage<WorkflowResult>> futureList = runnableGraph.withAttributes(Attributes.apply(loggerAttribute)).run(ActorMaterializer.create(getContext()));

Thanks again for leading me on the right track!