MergeHub and BroadcastHub

Hi all,

I’m playing around with the MergeHub and BroadcastHub in Akka Stream but I am a bit confused by the process of constructing the stream.

Consider the example for BroadcastHub from the documentation:

val producer : Source[String, Cancellable] = Source.tick(1.second, 1.second, "New message")

var sink : Sink[String, Source[String, NotUsed]] = BroadcastHub.sink(bufferSize = 256)

val runnableGraph: RunnableGraph[Source[String, NotUsed]] = producer.toMat(sink)(Keep.right)

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
val fromProducer: Source[String, NotUsed] = runnableGraph.run()

// Print out messages from the producer in two independent consumers
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.runForeach(msg => println("consumer2: " + msg))

The first line creates a Source for the entire stream, which produces Strings.

Conceptually speaking, attaching a BroadcastHub as a Sink to this stream means you can extend it at runtime. So while the stream is running you can add additional Sinks to it. Makes perfect sense.

However, the documentation in the beginning also states the following:

It is important to remember that even after constructing the RunnableGraph by connecting all the source, sink and different operators, no data will flow through it until it is materialized.

After running (materializing) the RunnableGraph[T] we get back the materialized value of type T. Every stream operator can produce a materialized value, and it is the responsibility of the user to combine them to a new type. In the above example, we used toMat to indicate that we want to transform the materialized value of the source and sink, and we used the convenience function Keep.right to say that we are only interested in the materialized value of the sink.

So if we look back at the example code indeed, we only care about the result of the Sink, which in this case is indeed Source[String, NotUsed].

But what I do not understand, is, conceptually speaking, the code shown produces a stream to which an arbitrary amount of streams can be connected (in the above example 2).

The semantics I’m guessing are that for each String emitted by producer is cached in the BroadcastHub. Everytime you connect to the fromProducer a new source is created which emits all the cached values from before?

But then still, shouldn’t the stream fromProducer emit a new Source everytime the producer ticks?

Any clarification is greatly appreciated,
– Christophe

The semantics I’m guessing are that for each String emitted by producer is cached in the BroadcastHub . Everytime you connect to the fromProducer a new source is created which emits all the cached values from before?

No that is not quite correct.

The running BroadcastHub will contain a buffer (size determined by 256 in your case).

When there are no connected downstreams the BroadcastHub will demand elements from upstream until the buffer is full, after that it will backpressure.

When the first downstream is started it will see and consume the buffered elements, subsequent downstreams will only see elements from the point where they were started and forward.

Once all downstreams has consumed a buffered element it is removed from the single shared buffer, this means that a slow downstream can at most be bufferSize behind the fastest dowsntream before the buffer fills up. When the buffer is full the BroadcastHub itself will backpressure slowing down the upstream producer.

Here’s a small playground sample that will make it easier to observe how it works:

val toHub = Source(1 to 100)
    .throttle(1, 1.second)
    .wireTap(n => println(s"Producing $n"))

  val broadcastHub = BroadcastHub.sink[Int](bufferSize = 4)
  val fromHub = toHub.runWith(broadcastHub)

  for (i <- 1 to 3) {
    StdIn.readLine("Press enter to start next source")
    println(s"Adding consumer $i")
    // consumers added att different points in time, consuming roughly at the same rate
    fromHub.runForeach(n => println(s"Downstream $i: element $n"))
    // or consuming at different rates:
    // fromHub.throttle(1, i.seconds).runForeach(n => println(s"Downstream $i: element $n"))
  }