Is the DDL script of Projection_Management wrong?

Environment:

  • Akka 2.7.0
  • Akka Projection: 1.3.1
  • Cassandra Driver: 4.15.0
  • Akka Persistence Cassandra: 1.1.0

Problem

I copy the DDL scripts for offset and projection_management schema, replace the keyspace with mine, then create the tables.

CREATE TABLE IF NOT EXISTS eproject_models.offset_store (
  projection_name text,
  partition int,
  projection_key text,
  offset text,
  manifest text,
  last_updated timestamp,
  PRIMARY KEY ((projection_name, partition), projection_key));

CREATE TABLE IF NOT EXISTS eproject_models.projection_management (
  projection_name text,
  partition int,
  projection_key text,
  paused boolean,
  last_updated timestamp,
  PRIMARY KEY ((projection_name, partition), projection_key));

But when I run my app, the log shows: “Undefined column name offset in table eproject_models.projection_management”.

[2023-04-15 12:00:39,065] [WARN] [akka.stream.scaladsl.RestartWithBackoffSource] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-12] - Restarting stream due to failure [1]: com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Undefined column name offset in table eproject_models.projection_management

Thanks.

The DDL script for projection_management above in the official project should add two text fields: offset and manifest.

But I get another error when it binds the service to the http.

osecond precision
Exception in thread "main" java.lang.NoClassDefFoundError: akka/parboiled2/ParserInput
	at akka.http.scaladsl.model.headers.User$minusAgent$.apply(headers.scala:1094)
	at akka.http.impl.settings.ClientConnectionSettingsImpl$.$anonfun$fromSubConfig$1(ClientConnectionSettingsImpl.scala:58)
	at scala.Option.map(Option.scala:242)
	at akka.http.impl.settings.ClientConnectionSettingsImpl$.fromSubConfig(ClientConnectionSettingsImpl.scala:58)
	at akka.http.impl.settings.ConnectionPoolSettingsImpl$.fromSubConfig(ConnectionPoolSettingsImpl.scala:102)
	at akka.http.impl.settings.ConnectionPoolSettingsImpl$.fromSubConfig(ConnectionPoolSettingsImpl.scala:88)
	at akka.http.impl.util.SettingsCompanionImpl.apply(SettingsCompanionImpl.scala:51)
	at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:97)
	at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:87)
	at akka.http.scaladsl.settings.SettingsCompanion.apply(SettingsCompanion.scala:19)
	at akka.http.scaladsl.settings.SettingsCompanion.apply$(SettingsCompanion.scala:19)
	at akka.http.scaladsl.settings.ConnectionPoolSettings$.apply(ConnectionPoolSettings.scala:87)
	at akka.http.scaladsl.HttpExt.<init>(Http.scala:84)
	at akka.http.scaladsl.Http$.createExtension(Http.scala:1112)
	at akka.http.scaladsl.Http$.createExtension(Http.scala:846)
	at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1165)
	at akka.actor.ExtensionId.apply(Extension.scala:78)
	at akka.actor.ExtensionId.apply$(Extension.scala:77)
	at akka.http.scaladsl.Http$.apply(Http.scala:1107)
	at akka.http.scaladsl.Http$.apply(Http.scala:846)
	at akka.actor.ExtensionId.apply(Extension.scala:84)
	at akka.actor.ExtensionId.apply$(Extension.scala:84)
	at akka.http.scaladsl.Http$.apply(Http.scala:1106)
	at org.abbey.Main$.main(Main.scala:53)
	at org.abbey.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: akka.parboiled2.ParserInput
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	... 25 more

It’s main.scala and persistence.conf

Main.scala

object Main {
  def main(args: Array[String]): Unit = {
    
    val system = ActorSystem[Nothing](Behaviors.empty, "eProject")
    implicit val timeout: Timeout                  = Timeout.create(system.settings.config.getDuration("eproject.ask_timeout"))
    implicit val context: ExecutionContextExecutor = system.executionContext
    implicit val sys: ActorSystem[_] = system
    
//    AkkaManagement(system).start()
//    ClusterBootstrap(system).start()
    
    Annunciator.init(system)
    
    val session    = CassandraSessionRegistry(system).sessionFor("akka.persistence.cassandra")
    val keyspace   = system.settings.config.getString("akka.projection.cassandra.offset-store.keyspace")
    val repository = new AnnunciatorModelRepositoryImpl(session, keyspace)
    AnnunciatorModel.init(system, repository)
    
    val interface = system.settings.config.getString("eproject.grpc.interface")
    val port      = system.settings.config.getInt("eproject.grpc.port")
    val serviceImpl   = new AnnunciatorServiceImpl(system, repository)
    
    val service: HttpRequest => Future[HttpResponse] =
      ServiceHandler.concatOrNotFound(
        rpc.AnnunciatorServiceHandler.partial(serviceImpl),
        ServerReflection.partial(List(AnnunciatorService))
        )
    
    val bound =
      Http()
        .newServerAt(interface, port)
        .bind(service)
        .map(_.addToCoordinatedShutdown(3.seconds))
    
    bound.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        system.log.info(
          "Annunciator service at gRPC server {}:{}",
          address.getHostString,
          address.getPort)
      case Failure(ex)      =>
        system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
        system.terminate()
    }
  }
}

Persistence.conf

akka {
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    journal.auto-start-journals = ["akka.persistence.cassandra.journal"]
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"

    cassandra {
      events-by-tag {
        bucket-size = "Day"
        eventual-consistency-delay = 2s
        flush-interval = 50ms
        pubsub-notification = off
        first-time-bucket = "20230101T00:00"
      }

      query {
        refresh-interval = 2s
      }

      journal {
        keyspace = "eproject_journals"
        keyspace-autocreate = true
        tables-autocreate = true
      }

      snapshot {
        keyspace = "eproject_snapshots"
        keyspace-autocreate = true
        tables-autocreate = true
      }
    }
  }

  projection {
    cassandra {
      session-config-path = "akka.persistence.cassandra"

      session-config {
        session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"

        service-discovery {
          name = ""
          lookup-timeout = 1 s
        }

        session-dispatcher = "akka.actor.default-dispatcher"

        datastax-java-driver-config = "datastax-java-driver"
      }

      offset-store {
        keyspace = "eproject_models"
        table = "offset_store"
        table = "projection_management"
      }

    }
  }
}

// reference: https://doc.akka.io/docs/akka-persistence-cassandra/current/configuration.html#contact-points-configuration
datastax-java-driver {
  advanced.reconnect-on-init = true
  basic.contact-points = ["192.168.1.200:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

The problem with the column is because you have defined projection_management as the table for the offset store.

Remove table = "projection_management".
If you want to change the name of the projection_management table the config property is

management-table = "projection_management"

Thank you very much!

I fixed the error in the persistence.conf and recreated the tables with script. it found the tables now.

But I get another question: why was the auto-creation of keyspace and table not triggered? Or when should the Cassandra plugin create keyspaces and tables automatically?


I’ve written a piece of test. Then the Cassandra created the journal and snapshot tables automatically.

implicit val system: ActorSystem[Nothing] = ctx.system

val cluster = Cluster(system)
cluster.manager ! Join(cluster.selfMember.address)
  
Project.init(system)
val project = ClusterSharding(system).entityRefFor(Project.EntityKey, "2023-001")

val jack                = AssignedContact(TransientId, "Jack", "Department 222", Mobile("13900122288"))
val reply: Future[Done] = project.askWithStatus(Project.OpenProject("1st Project", jack, _))
project.askWithStatus(Project.CloseProject)

Thanks.

keyspace and tables for the Cassandra plugin (journal, snapshot) have their own config properties keyspace-autocreate and tables-autocreate. See Configuration • Akka Persistence Cassandra

The tables for Projections with Cassandra are created with CassandraProjection.createTablesIfNotExists See Offset in Cassandra • Akka Projection

Auto-creation is only intended to be used in tests.