I just tried to add a delay to my graph with builder.add(Delay(offset, strategy)). And there is no Delay stage in akka.stream.scaladsl bcs its tagged as internal.
If this is done for a reason; doesn’t it worth to mention in the streams-overview docs, or the stream-graphs docs? (So you can only use it with builder.add(Flow[A].delay(offset,strategy)).)
BTW where is the line where you can use a stage in a dsl with add or not? As I’m trying to figure out maybe the multi input or multi output stages whose can be used (the others need the Source, Sink, or Flow starting), but I never see that written before.
This code is just a spike right now, I’m just playing with stuffs and concepts.
private lazy val graph = {
val source = Source.queue[Task](10000, akka.stream.OverflowStrategy.backpressure)
RunnableGraph.fromGraph(GraphDSL.create(source) { implicit builder: GraphDSL.Builder[SourceQueueWithComplete[Task]] => source =>
import GraphDSL.Implicits._
val mergePrioritized = builder.add(MergePrioritized[Task](Seq(99,1)))
val delay1 = builder.add(Flow[Task].delay(5.seconds))
val delay2 = builder.add(Flow[Task].delay(5.seconds))
val groupedDelay = builder.add(Flow[Task].buffer(1000, OverflowStrategy.fail).groupedWithin(1000, 5.minutes).mapConcat(identity))
val check = builder.add(Flow[Task].map(_.copy(stats = TaskStats.empty)).via(checkerFlow))
val log = builder.add(Flow[Task].map{t => logger.info(t.toString); t})
val dropNonPeriodically = builder.add(Flow[Task].filter(_.isPeriodically))
source ~> delay1 ~> mergePrioritized.in(0); mergePrioritized.out ~> check ~> log ~> dropNonPeriodically
delay2 ~> mergePrioritized.in(1)
delay2 <~ groupedDelay <~ dropNonPeriodically
ClosedShape
})
}
I think if you have a graph similar to this, the reader get a better picture if the things named as is (like log or dropNonPeriodically) instead of a long doMagic flow. And because most of the time I get my flows as a param, it was really strange to “import” with builder.add(Flow[Task].delay(5.seconds)) instead of just builder.add(Delay[Task](5.seconds)).