How to detect that a consumer has disconnected from a stream

Given the following

    val ((queue, ks), source) =
      Source
        .queue[MyThing](queueSize, OverflowStrategy.dropHead)
        .viaMat(KillSwitches.single)(Keep.both)
        .preMaterialize()

How can I detect when the consumer of the source has disconnected so that I can clean up any resources that were created to serve this stream?

I tried adding

stream.watchTermination() { (_, done) =>
      done.onComplete { _ =>
        // cleanup
      }
      NotUsed
    }

but it is never invoked

Actually - cancel that. When I use watchTermination correctly, it does work. :doh:

1 Like

Hello,

when the consumer disconnects and clean up resources, you should apply watchTermination() to the stream after it has been materialized. Here’s how you can update your code:

val ((queue, ks), source) =
Source
.queue[MyThing](queueSize, OverflowStrategy.dropHead)
.viaMat(KillSwitches.single)(Keep.both)
.watchTermination() { (_, done) =>
done.onComplete {
case _ =>
// Cleanup logic here
}
NotUsed
}
.preMaterialize()

Ensure that watchTermination() is placed after materialization and that the stream terminates correctly (either through KillSwitch or natural completion). E-ZPassRI

Best Regard
Kerolina