I have a sizeable application which uses in a few places Source.queue(BUFFER_SIZE, OverflowStrategy.dropTail.withLogLevel(Logging.WarningLevel))
.
When, under load, some elements get dropped, we see warning messages in the logs saying Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
.
Since we use a few queues, it is not very obvious which queue did drop the element. I see from the code that it uses the name
attribute, however it does not seem to pick up the name attribute I set:
val queue = Source
.queue[String](8, OverflowStrategy.dropTail.withLogLevel(Logging.WarningLevel))
.named("name-after-queue")
.throttle(1, 3.seconds)
.toMat(Sink.foreach[String](m => println(s"${Instant.now()} - $m")).named("name-on-sink"))(Keep.left)
.named("name-on-graph")
.run()
val eventualDone = Source(1 to 100)
.throttle(10, 1.seconds)
.mapAsync(1)(m => queue.offer(m.toString))
.runWith(Sink.ignore)
This still gives warnings as
[WARN] [05/04/2022 18:00:06.151] [test-akka.actor.default-dispatcher-4] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
[WARN] [05/04/2022 18:00:06.251] [test-akka.actor.default-dispatcher-5] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
[WARN] [05/04/2022 18:00:06.349] [test-akka.actor.default-dispatcher-4] [QueueSource(akka://test)] Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [queueSource]
Thanks for any help!