Hello everybody!
I have a simple question. Let’s say there is a Flow depending on some configuration parameter:
Source -> Flow(Parameter) -> Sink
Source emits data pretty fast - every second. Configuration parameter is relatively stable, but also may get changed. May be once a day user wants to modify it. So, what is the best way to implement this?
I see two ways
Every time user submit new parameter rebuild everything.
Consider Parameter as separate stream with its own source. Every time user updates it emit new value.
Or may there is something else?
What would be the best way? Any advice?
What is this configuration parameter? Is it of the same type than the usual? If the source is the same you could apply a kind of MessageRouter pattern so that messages can be passed to different flows depending on a set of conditions.
def createUserParamedFlow[A, P, O](bufferSize: Int, overflowStrategy: OverflowStrategy, initialParam: P)(fun: (A, P) => O) =
Flow.fromGraph(GraphDSL.create(Source.queue[P](bufferSize, overflowStrategy)) { implicit builder =>
queue =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[A, P]())
//based on https://doc.akka.io/docs/akka/current/stream/stream-rate.html#understanding-extrapolate-and-expand
val extra = builder.add(Flow[P].extrapolate(Iterator.continually(_), Some(initialParam)))
val map = builder.add(Flow[(A, P)].map(r => fun(r._1, r._2)))
queue ~> extra ~> zip.in1
zip.out ~> map
FlowShape(zip.in0, map.out)
})
and use it like:
/* imports for all of the code I pasted
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, Keep, Sink, Source, Zip}
import concurrent.duration._
*/
implicit val as = ActorSystem("myas")
implicit val mat = ActorMaterializer() //I have old akka in the project I started to draft it, this is probably not needed
val f =
Source.tick(0.seconds, 500.millis, 10)
//1 element buffer with dropbuffer will be what you need I think
.viaMat(createUserParamedFlow(1, OverflowStrategy.dropBuffer, 0.5)((a,b)=> (a*b).toString))(Keep.both)
.map{x => println(x); x} //for debug
.toMat(Sink.seq)(Keep.both)
.run()
Thread.sleep(5000)
f._1._2.offer(1.0) //this is how you set params
Thread.sleep(2000)
f._1._2.offer(1.5)
Thread.sleep(2000)
f._1._1.cancel() //stop the "test"
Thread.sleep(1000)
println(f._2) //whole output