Hi all,
I’m playing around with the MergeHub and BroadcastHub in Akka Stream but I am a bit confused by the process of constructing the stream.
Consider the example for BroadcastHub from the documentation:
val producer : Source[String, Cancellable] = Source.tick(1.second, 1.second, "New message")
var sink : Sink[String, Source[String, NotUsed]] = BroadcastHub.sink(bufferSize = 256)
val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(sink)(Keep.right)
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
val fromProducer: Source[String, NotUsed] = runnableGraph.run()
// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.runForeach(msg => println("consumer2: " + msg))
The first line creates a Source
for the entire stream, which produces String
s.
Conceptually speaking, attaching a BroadcastHub
as a Sink
to this stream means you can extend it at runtime. So while the stream is running you can add additional Sink
s to it. Makes perfect sense.
However, the documentation in the beginning also states the following:
It is important to remember that even after constructing the
RunnableGraph
by connecting all the source, sink and different operators, no data will flow through it until it is materialized.
…
After running (materializing) theRunnableGraph[T]
we get back the materialized value of type T. Every stream operator can produce a materialized value, and it is the responsibility of the user to combine them to a new type. In the above example, we usedtoMat
to indicate that we want to transform the materialized value of the source and sink, and we used the convenience functionKeep.right
to say that we are only interested in the materialized value of the sink.
So if we look back at the example code indeed, we only care about the result of the Sink
, which in this case is indeed Source[String, NotUsed]
.
But what I do not understand, is, conceptually speaking, the code shown produces a stream to which an arbitrary amount of streams can be connected (in the above example 2).
The semantics I’m guessing are that for each String
emitted by producer
is cached in the BroadcastHub
. Everytime you connect to the fromProducer
a new source is created which emits all the cached values from before?
But then still, shouldn’t the stream fromProducer
emit a new Source
everytime the producer
ticks?
Any clarification is greatly appreciated,
– Christophe