Hi,
We are using akka-persistence with cluster sharding. We aim to store our actor state and then separately to store the sharding state.
Reason for this is that in our PROD env we will have 2 akka clusters, running in each of our DC’s and configured so that the sharding state is configured for each DC and the actor state separated
to allow for the akka clusters to be run in a hot/standby configuration.
So the akka node has persistence for
- actor-journal
- actor-snapshot
- sharding-journal
- shardin-snapshot
Configuring akka using the akka.persistence.cassandra.ConfigSessionProvider, would result in 4 instances of the com.datastax.driver.core.Cluster. Datastax doc’s state that multiple Cluster instances should be avoided, given that a single instance is able to server multiple sessions.
We tried this by implementing our own SessionProvider as below
class CassandraSessionProvider extends SessionProvider {
implicit class ListenableFutureToFuture[T](lf: ListenableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
Futures.addCallback(lf, new FutureCallback[T] {
def onFailure(t: Throwable): Unit = p failure t
def onSuccess(result: T): Unit = p success result
}, MoreExecutors.directExecutor())
p.future
}
}
override def connect()(implicit ec: ExecutionContext): Future[Session] = {
Boot.getCassandraCluster().connectAsync().asScala
}
}
But when starting our nodes we would see the following in the logs
2018-04-13 09:07:24,790 [DmEventStateSystem-akka.actor.default-dispatcher-23] ERROR a.c.s.PersistentShardCoordinator - Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionRegistered] with sequence number [12] for persistenceId [/sharding/eventStateFsmAggregateCoordinator].
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
By making a change to allow a new Cluster instance, we no longer see the above problem
class CassandraSessionProvider extends SessionProvider {
implicit class ListenableFutureToFuture[T](lf: ListenableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
Futures.addCallback(lf, new FutureCallback[T] {
def onFailure(t: Throwable): Unit = p failure t
def onSuccess(result: T): Unit = p success result
}, MoreExecutors.directExecutor())
p.future
}
}
override def connect()(implicit ec: ExecutionContext): Future[Session] = {
Boot.getCassandraClusterBuilder().build().connectAsync().asScala
}
}
Like I said the datastax docs recommends having a single Cluster instance (per JVM/Cassandra cluster) and allowing multiple Sessions, as we did in the first instance. But I can see that in some places in the plugin a close is issued on the cluster, which basically impacts on anything that has sessions hanging off it. I can guess that having each component in the plugin managing its own cluster instance simplifies things, but it does use un-necessary resources and impacts on initialisation times.
Are there any plans to re-think the approach taken in the plugin re. management of the cluster instance?
Thanks
Paul