Source.queue bug?

My code is below

val (chromeQueue, chromeSources) =
  Source
    .queue[Int](1, OverflowStrategy.dropNew)
    .preMaterialize()

val (orderQueue, orderSources) =
  Source
    .queue[String](1, OverflowStrategy.dropNew)
    .preMaterialize()

orderSources
  .zip(chromeSources)
  .to(Sink.ignore)
  .run()

Source(Seq("a", "b", "c", "d"))
  .mapAsync(1) { i =>
    orderQueue.offer(i).map {
      case result: QueueCompletionResult => println("completion")
      case QueueOfferResult.Enqueued     => println("enqueued")
      case QueueOfferResult.Dropped      => println("dropped")
    }
  }
  .runWith(Sink.ignore)

I Expected code results

enqueued
dropped
dropped
dropped

But actual results

enqueued
enqueued
enqueued
enqueued

akka version: 2.6.15

So, you expected this to log dropped because the zip backpressures when it hasn’t received objects on chromeQueue yet.

One cause for this is the internal buffers Akka Streams can insert between stages: you can read more about those at Buffers and working with rate • Akka Documentation. When you add the following configuration (just for illustration, I wouldn’t recommend doing this in production):

akka.stream.materializer.initial-input-buffer-size = 1
akka.stream.materializer.max-input-buffer-size = 1

… and add some more elements to the Seq, you get output like:

enqueued
enqueued
enqueued
enqueued
dropped
dropped
dropped
dropped
dropped
dropped

There still seems to be some buffering going on, though I’m not entirely sure where that is coming from - that would need some further investigation. In any case: the backpressure in Akka Streams is mainly intended to avoid overwhelming a slow downstream, but may still add buffers for throughput. Many stages won’t really do ‘per-element’ flow control, but batch things up.