Full name of the stream on overflow log messages of QueueSource

I have a sizeable application which uses in a few places Source.queue(BUFFER_SIZE, OverflowStrategy.dropTail.withLogLevel(Logging.WarningLevel)).
When, under load, some elements get dropped, we see warning messages in the logs saying Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource].
Since we use a few queues, it is not very obvious which queue did drop the element. I see from the code that it uses the name attribute, however it does not seem to pick up the name attribute I set:

val queue = Source
        .queue[String](8, OverflowStrategy.dropTail.withLogLevel(Logging.WarningLevel))
        .named("name-after-queue")
        .throttle(1, 3.seconds)
        .toMat(Sink.foreach[String](m => println(s"${Instant.now()} - $m")).named("name-on-sink"))(Keep.left)
        .named("name-on-graph")
        .run()

      val eventualDone = Source(1 to 100)
        .throttle(10, 1.seconds)
        .mapAsync(1)(m => queue.offer(m.toString))
        .runWith(Sink.ignore)

This still gives warnings as

[WARN] [05/04/2022 18:00:06.151] [test-akka.actor.default-dispatcher-4] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
[WARN] [05/04/2022 18:00:06.251] [test-akka.actor.default-dispatcher-5] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
[WARN] [05/04/2022 18:00:06.349] [test-akka.actor.default-dispatcher-4] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]

Thanks for any help!

1 Like