The DDL script for projection_management above in the official project should add two text fields: offset and manifest.
But I get another error when it binds the service to the http.
osecond precision
Exception in thread "main" java.lang.NoClassDefFoundError: akka/parboiled2/ParserInput
at akka.http.scaladsl.model.headers.User$minusAgent$.apply(headers.scala:1094)
at akka.http.impl.settings.ClientConnectionSettingsImpl$.$anonfun$fromSubConfig$1(ClientConnectionSettingsImpl.scala:58)
at scala.Option.map(Option.scala:242)
at akka.http.impl.settings.ClientConnectionSettingsImpl$.fromSubConfig(ClientConnectionSettingsImpl.scala:58)
at akka.http.impl.settings.ConnectionPoolSettingsImpl$.fromSubConfig(ConnectionPoolSettingsImpl.scala:102)
at akka.http.impl.settings.ConnectionPoolSettingsImpl$.fromSubConfig(ConnectionPoolSettingsImpl.scala:88)
at akka.http.impl.util.SettingsCompanionImpl.apply(SettingsCompanionImpl.scala:51)
at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:97)
at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:87)
at akka.http.scaladsl.settings.SettingsCompanion.apply(SettingsCompanion.scala:19)
at akka.http.scaladsl.settings.SettingsCompanion.apply$(SettingsCompanion.scala:19)
at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:87)
at akka.http.scaladsl.HttpExt.<init>(Http.scala:84)
at akka.http.scaladsl.Http$.createExtension(Http.scala:1112)
at akka.http.scaladsl.Http$.createExtension(Http.scala:846)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1165)
at akka.actor.ExtensionId.apply(Extension.scala:78)
at akka.actor.ExtensionId.apply$(Extension.scala:77)
at akka.http.scaladsl.Http$.apply(Http.scala:1107)
at akka.http.scaladsl.Http$.apply(Http.scala:846)
at akka.actor.ExtensionId.apply(Extension.scala:84)
at akka.actor.ExtensionId.apply$(Extension.scala:84)
at akka.http.scaladsl.Http$.apply(Http.scala:1106)
at org.abbey.Main$.main(Main.scala:53)
at org.abbey.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: akka.parboiled2.ParserInput
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 25 more
It’s main.scala and persistence.conf
Main.scala
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem[Nothing](Behaviors.empty, "eProject")
implicit val timeout: Timeout = Timeout.create(system.settings.config.getDuration("eproject.ask_timeout"))
implicit val context: ExecutionContextExecutor = system.executionContext
implicit val sys: ActorSystem[_] = system
// AkkaManagement(system).start()
// ClusterBootstrap(system).start()
Annunciator.init(system)
val session = CassandraSessionRegistry(system).sessionFor("akka.persistence.cassandra")
val keyspace = system.settings.config.getString("akka.projection.cassandra.offset-store.keyspace")
val repository = new AnnunciatorModelRepositoryImpl(session, keyspace)
AnnunciatorModel.init(system, repository)
val interface = system.settings.config.getString("eproject.grpc.interface")
val port = system.settings.config.getInt("eproject.grpc.port")
val serviceImpl = new AnnunciatorServiceImpl(system, repository)
val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(
rpc.AnnunciatorServiceHandler.partial(serviceImpl),
ServerReflection.partial(List(AnnunciatorService))
)
val bound =
Http()
.newServerAt(interface, port)
.bind(service)
.map(_.addToCoordinatedShutdown(3.seconds))
bound.onComplete {
case Success(binding) =>
val address = binding.localAddress
system.log.info(
"Annunciator service at gRPC server {}:{}",
address.getHostString,
address.getPort)
case Failure(ex) =>
system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
system.terminate()
}
}
}
I fixed the error in the persistence.conf and recreated the tables with script. it found the tables now.
But I get another question: why was the auto-creation of keyspace and table not triggered? Or when should the Cassandra plugin create keyspaces and tables automatically?
I’ve written a piece of test. Then the Cassandra created the journal and snapshot tables automatically.
implicit val system: ActorSystem[Nothing] = ctx.system
val cluster = Cluster(system)
cluster.manager ! Join(cluster.selfMember.address)
Project.init(system)
val project = ClusterSharding(system).entityRefFor(Project.EntityKey, "2023-001")
val jack = AssignedContact(TransientId, "Jack", "Department 222", Mobile("13900122288"))
val reply: Future[Done] = project.askWithStatus(Project.OpenProject("1st Project", jack, _))
project.askWithStatus(Project.CloseProject)
keyspace and tables for the Cassandra plugin (journal, snapshot) have their own config properties keyspace-autocreate and tables-autocreate. See Configuration • Akka Persistence Cassandra