Persistence and Clustering

Hi,

I am running a POC with persistent actors and clustering. I am very new to AKKA. Is it a fair assumption when I have multiple nodes in a cluster, any state changes to an actor on one node is reflected on the other nodes. I use Cassandra for journals, and the persistent state is accessed through HTTP routes. I hit both nodes alternatively using curl commands. What I notice is each node only retains state changes that are specific to that node and don’t reflect any changes on the other node. Also, when I restart both nodes, both of them only seem to have the state that was changed on the last node.

Persistent Actor

package com.example

import akka.actor.typed.ActorRef
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}

object ShoppingCartActor {

  trait CartCommand extends CborSerializable
  case class AddItem(id: String, quantity: Int, replyTo: ActorRef[CartState]) extends CartCommand
  case class RemoveItem(id: String, eplyTo: ActorRef[CartState]) extends CartCommand
  case class ViewCart(replyTo: ActorRef[CartState]) extends CartCommand

  trait CartEvent extends CborSerializable
  case class ItemAdded(id: String, quantity: Int) extends CartEvent
  case class ItemRemoved(id: String) extends CartEvent

  case class CartState(items: Map[String, Int] = Map.empty)

  def apply(): EventSourcedBehavior[CartCommand, CartEvent, CartState]  = {
    EventSourcedBehavior[CartCommand, CartEvent, CartState] (
      PersistenceId("ShoppingCart", "CartId"), CartState(), commandHandler, eventHandler
    )
  }

  def commandHandler(state: CartState, command: CartCommand): Effect[CartEvent, CartState] = command match {
    case ViewCart(replyTo) =>
      replyTo ! state
      Effect.none
    case AddItem(id, quantity, replyTo) =>
      Effect.persist(ItemAdded(id, quantity)).thenRun(replyTo ! _)
    case RemoveItem(id, replyTo) =>
      Effect.persist(ItemRemoved(id)).thenRun(replyTo ! _)
  }

  def eventHandler(state: CartState, event: CartEvent): CartState = event match {
    case ItemAdded(id, quantity) => CartState(state.items + (id -> quantity))
    case ItemRemoved(id) => CartState(state.items.removed(id))
  }
}

HTTP Route

import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.util.Timeout
import com.example.ShoppingCartActor._
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.concurrent.Future
import scala.concurrent.duration._

class ShopingCartRoute(shoppingCartActor: ActorRef[ShoppingCartActor.CartCommand])(implicit val system: ActorSystem[_]) {

  implicit val timeout: Timeout = 5.seconds

  def addItem(id: String, quantity: Int) : Future[CartState] = shoppingCartActor ? (AddItem(id, quantity, _))
  def removeItem(id: String) : Future[CartState] = shoppingCartActor ? (RemoveItem(id, _))
  def viewCart() : Future[CartState] = shoppingCartActor ? (ViewCart(_))

  implicit val cartFormat = jsonFormat1(CartState)
  implicit val ec = system.executionContext

  val routes: Route = pathPrefix("cart") {
    concat(
      pathEnd {
        concat(
          get {
            complete {
              viewCart().map(_.toJson.prettyPrint)
            }
          } ~
          post {
            parameters(('id, 'quantity.as[Int])) { (id, quantity) =>
              complete {
                addItem(id, quantity).map(_.toJson.prettyPrint)
              }
            }
          } ~
          delete {
            parameters('id) {id =>
              complete {
                removeItem(id).map(_.toJson.prettyPrint)
              }
            }
          }
        )
      }
    )
  }

}

Bootstrap Code

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigFactory

import scala.util.{Failure, Success}

object Bootstrap extends App {

  val rootBehavior = Behaviors.setup[Nothing] { context =>
    context.spawn(ClusterListener(), "ClusterListener")
    val shoppingCartActor = context.spawn(ShoppingCartActor(), "shopping-cart-actor")
    startHttpServer(new ShopingCartRoute(shoppingCartActor)(context.system).routes, context.system)
    Behaviors.empty
  }

  private def startHttpServer(routes: Route, system: ActorSystem[_]): Unit = {

    implicit val classicSystem: akka.actor.ActorSystem = system.toClassic
    import system.executionContext

    val futureBinding = Http().bindAndHandle(routes, interface = "localhost", 0)
    futureBinding.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        system.log.error("Server online at http://{}:{}/", address.getHostString, address.getPort)
      case Failure(ex) =>
        system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
        system.terminate()
    }
  }

  val port = args(0).toInt
  val config = ConfigFactory.parseString(s"akka.remote.artery.canonical.port=$port").withFallback(ConfigFactory.load())
  val system = ActorSystem[Nothing](rootBehavior, "shopping-cart", config)


}

Configuration

akka {
  loglevel = ERROR

  actor {
    provider = cluster
    serialization-bindings {
      "com.example.CborSerializable" = jackson-cbor
    }
  }

  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    snapshot-store.local.dir = "target/snapshot"
    cassandra {
      journal {
        class = "akka.persistence.cassandra.journal.CassandraJournal"
        keyspace-autocreate = true
        tables-autocreate = true
      }
    }
  }

  remote {
    artery {
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:25251",
      "akka://ClusterSystem@127.0.0.1:25252"]
  }

}

When I look at the journal messages table, I can see three records, even though I had only sent two POST requests.

Thank you

What I have noticed is, when I shutdown the servers and restart them again, I only see on both the data that was persisted by the last node that was shutdown. Even though, the messages table I can still see two records.

You should use EventSourcedBehavior together with Cluster Sharding. There must only be one active instance of the event sourced actor with a given persistenceId in the cluster. That is what Cluster Sharding takes care of. See https://doc.akka.io/docs/akka/current/typed/persistence.html#cluster-sharding-and-eventsourcedbehavior

1 Like