How to configure Flow on the fly?

Hello everybody!
I have a simple question. Let’s say there is a Flow depending on some configuration parameter:
Source -> Flow(Parameter) -> Sink
Source emits data pretty fast - every second. Configuration parameter is relatively stable, but also may get changed. May be once a day user wants to modify it. So, what is the best way to implement this?
I see two ways

  1. Every time user submit new parameter rebuild everything.
  2. Consider Parameter as separate stream with its own source. Every time user updates it emit new value.
    Or may there is something else?
    What would be the best way? Any advice?

Thank you,
Andrey

What is this configuration parameter? Is it of the same type than the usual? If the source is the same you could apply a kind of MessageRouter pattern so that messages can be passed to different flows depending on a set of conditions.

May be you should have a look at https://doc.akka.io/docs/alpakka/current/patterns.html

I would go with something like:

def createUserParamedFlow[A, P, O](bufferSize: Int, overflowStrategy: OverflowStrategy, initialParam: P)(fun: (A, P) => O) =
  Flow.fromGraph(GraphDSL.create(Source.queue[P](bufferSize, overflowStrategy)) { implicit builder =>
    queue =>
      import GraphDSL.Implicits._
      val zip = builder.add(Zip[A, P]())
      //based on https://doc.akka.io/docs/akka/current/stream/stream-rate.html#understanding-extrapolate-and-expand
      val extra = builder.add(Flow[P].extrapolate(Iterator.continually(_), Some(initialParam)))
      val map = builder.add(Flow[(A, P)].map(r => fun(r._1, r._2)))

      queue ~> extra ~> zip.in1
      zip.out ~> map
      FlowShape(zip.in0, map.out)
  })

and use it like:

/* imports for all of the code I pasted
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink, Source, Zip}

import concurrent.duration._
*/

implicit val as = ActorSystem("myas")
implicit val mat = ActorMaterializer() //I have old akka in the project I started to draft it, this is probably not needed

val f =
  Source.tick(0.seconds, 500.millis, 10)
    //1 element buffer with dropbuffer will be what you need I think
    .viaMat(createUserParamedFlow(1, OverflowStrategy.dropBuffer, 0.5)((a,b)=> (a*b).toString))(Keep.both)
    .map{x => println(x); x} //for debug
    .toMat(Sink.seq)(Keep.both)
    .run()

Thread.sleep(5000)
f._1._2.offer(1.0) //this is how you set params
Thread.sleep(2000)
f._1._2.offer(1.5)
Thread.sleep(2000)
f._1._1.cancel() //stop the "test"
Thread.sleep(1000)
println(f._2) //whole output
1 Like

Yes, Flow[P].extrapolate is what I need.
Thank you very much!
Andrey