I am using akka-stream-alpakka-cassandra to read from one table and then do some transformations and save in another table. I modeled the reading stage through a CassandraSource and the saving stage through a CassandraFlow.createWithPassThrough.
Implementing the happy path is straightforward. Just the unhappy path poses some difficulties to me.
E.g. it can happen that during the save something goes wrong on Cassandra and the stream ends up failing (I didn’t define any SupervisionStrategy, yet). What I would like to do is to be in full control of the error that occurred per emitted element, therefore the implementation of createWithPassThrough is not particularly useful in this case because it is based on mapAsync and if the Future returned by
fails, the stream fails and stops. Putting a recover at the end of it doesn’t solve the problem because the stream completes successfully and stops consuming from upstream.
Wouldn’t it give more control to the developer, to guarantee that the Future in mapAsync is always successful, and emit instead an Either[Throwable, T]?, or even better a pair (T, Either[Throwable, Done]) (we don’t want to expose the ResultSet), so one knows the input for the statement being bound, together with the outcome of the (write) query?
If there are other strategies to handle this, can someone give some suggestions?
For the record in our scenario this happens when trying to write to Cassandra and the keyspace or the table does not exist. As of today we are using CassandraFlow.createBatch of alpakka 3.0.4 with Scala 2.13.8
A workaround is to check if keyspace and table already exist before even creating a stream recipe. In order to check one could execute a statement more directly on the CassandraSession as in
cs.select(s"SELECT ${column_name} from ${keyspace_name}.${table_name} limit 1").run()
Basically you can assume that the keyspace and table both exist if the Future does not fail. No matter if you then get 0 or 1 row back. This workaround is a bit tedious but works in my scenario.
It is relatively easy to recreate such a situation:
Implement a simple stream recipe for reading from or writing to Cassandra.
Run the stream and ensure one element is processed while you have not yet created the relevant Cassandra keyspace.
Alternatively to 2. run the stream and ensure one element is processed while you have not yet created the relevant Cassandra table.
You should notice that the stage reading from or writing to Cassandra basically stops working, but without debugging etc. you will not notice any exception about that. The whole thing just halts rather silently (even if you have implemented and set via the attributes a supervision strategy that logs any exceptions it would handle if an exception would reach it).