How to get metadata of a Cassandra session in Akka Projection?

I need to register some UDT codecs to the Cassandra session.

But the session built by CassandraSessionRegistry(system).sessionFor(...) hasn’t the member getContext or getMetadata. I can’t get specific instances of UserDefinedType and CodecRegistry of the current session. Then the codec registration can’t go on.

Event executing a bound statement, it should do flatMap like this, because it’s packaged in a Future.

val dml = s"UPDATE $keyspace.$table SET details = :detail WHERE entityId = :entityId;"
  
session.prepare(dml)
       .map {
         _.bind()
         .setString("entityId", entityId)
         .setMap("detail", Map("confirmed" -> detail).asJava, classOf[String], classOf[Detail])
       }
       .flatMap(session.executeWrite(_))

Secondly, the result of session.prepare() is type Future[BoundStatement]. It doesn’t match the param type of BatchStatement.addStatement(). The latter needs BatchableStatement. So, It’s difficult to build a BatchStatement with bound statement built by session.prepare().

val dmlConfirmed = s"UPDATE $keyspace.$table SET details = :confirmed WHERE entityId = :entityId;"
val statement = session.prepare(dmlConfirmed)
                       .map {
                         _.bind()
                          .setString("entityId", entityId)
                          .setMap("confirmed", Map("confirmed" -> current).asJava, classOf[String], classOf[Detail])
                       }


val batch = BatchStatement.builder(DefaultBatchType.LOGGED)
                          .addStatement(statement)
                          .addStatement(...)
                          .build()

session.executeWriteBatch(batch)

The compiler shows Type mismatch. Required: BatchableStatement[_], found: Future[BoundStatement] at the line addStatement().

Then, should it use a session built by CqlSession.builder() directly, or any approach to go with CassandraSession?

Thanks.

I get cql session by Await.result(cassandraSession.underlying(), 2.seconds).

val sessionUnderlying = Await.result(session.underlying(), timeout)

val dmlConfirmed       = s"UPDATE $keyspace.$table SET details = :confirmed WHERE entityId = :entityId;"
val statementConfirmed = sessionUnderlying.prepare(dmlConfirmed)
                                .bind()
                                .setString("entityId", entityId)
                                .setMap("confirmed", Map("confirmed" -> current).asJava, classOf[String], classOf[Detail])

val dmlPending       = s"UPDATE $keyspace.$table SET details = details + :pending WHERE entityId = :entityId;"
val statementPending = sessionUnderlying.prepare(dmlPending)
                              .bind()
                              .setString("entityId", entityId)
                              .setMap("pending", Map("pending" -> pending).asJava, classOf[String], classOf[Detail])

val batch = BatchStatement.builder(DefaultBatchType.LOGGED)
                          .addStatement(statementConfirmed)
                          .addStatement(statementPending)
                          .build()

session.executeWriteBatch(batch)