It is there a way to intercept the completion of a route that complete with a source, in particular, when using SSE?
Taking as example a route like this one https://doc.akka.io/docs/akka-http/10.0.11/scala/http/sse-support.html, i would like to do some operations when a client disconnects/nobody is consuming messages from the source.
I have tried the directives mapRouteResult and mapRouteResultFuture, but they doesn’t work fine in this particular case, intercepting the start of the streaming, rather than the end. Any idea?
the easiest that comes to mind would be to use something like
mapResponseEntity(_.transformDataBytes(Flow[ByteString].watchTermination()((_, completionFuture) => /* do something with the future here */))) {
// inner route
}
Can you explain to me how this code works? What really does that .transformDataBytes? I see some seconds of delay between the client disconnection and the completion of ‘completionFuture’, it is due to that transformDataBytes or it is a simple async delay?
transformDataBytes allows to change the (response) entity stream, i.e. you can attach further operations to the stream. watchTermination is a special kind of operation that passes elements through unchanged but also provides the future as a way to observe when the stream has been materialized and completed.
How do you measure that time, how do you observe the disconnection of the client? There’s no good reason for that delay.
One reason could be that the SSE stream is never actively completed and the connection hangs until the idle-timeout kicks in?
so, basically, it should print a ‘DONE’ when the client disconnects, it happens, but 4-5 seconds after the closing of the connection (that i close manually).
What you mean for ‘actively completed’? An user can disconnects in any moment, there is no guarantee that all the elements of the stream will be consumed, should i expect this delay on all SSE requests?