Is it possible at all in Akka Streams to cancel the futures produced in mapAsync on stream failure? It seems to be desirable, because some computations are cancellable, and only this graph stage knows the current unresolved futures. Keeping track on the side is quite inconvenient.
Here’s the use case:
executing many requests through Cassandra driver,
with a common deadline.
Statement.executeAsync produces a ResultSetFuture, which has method cancel() — although it doesn’t cancel the request in Cassandra, it cancels its retry if it fails and it eagerly releases resources allocated in the driver to track this request.
I see that Alpakka Cassandra connector sidesteps this issue and never calls cancel(). Is it possible at all with Akka Streams?
Yes, I know they don’t support cancelling. I was wondering if I could do something like register some kind of “on failure” handler so that if an element is being processed then this handler is called.
As for Alpakka Cassandra: suppose I’m using CassandraFlow and combine it with a timeout operator. I’d expect that when the stream is failed due to timeout, CassandraFlow will take care to call cancel().
ResultSetFuture future = session.executeAsync(...some query...);
try {
ResultSet result = future.get(1, TimeUnit.SECONDS);
... process result ...
} catch (TimeoutException e) {
future.cancel(true); // Ensure any resource used by this query driver
// side is released immediately
... handle timeout ...
}
I’m afraid this kind of thing — cancellation on timeout — isn’t possible to express with Akka Streams at all
This specific case about the Alpakka Cassandra source not calling cancel can easily be solved in the CassandraSourceStage.
The Alpakka team at Lightbend plans to work on Cassandra the coming weeks as we want to consolidate with the tools built for Akka Persistence Cassandra. So you can expect major changes/improvements to Alpakka Cassandra.
Thanks. However I see it as a problem that you can’t do things like cancel() when you don’t own the stage implementation — am I correct that this is a limitation of Akka Streams architecture?
I might get you wrong, but the design of Futures in Scala and their use in use in Akka Streams does deliberately not have the possibility to cancel futures.
This future in the Cassandra API has a different design.
The Cassandra API usage in this stage might need to be improved, but I don’t think that reaching into the stage’s inner workings should be something a library user should be able to do.