Hi,
I am running a POC with persistent actors and clustering. I am very new to AKKA. Is it a fair assumption when I have multiple nodes in a cluster, any state changes to an actor on one node is reflected on the other nodes. I use Cassandra for journals, and the persistent state is accessed through HTTP routes. I hit both nodes alternatively using curl commands. What I notice is each node only retains state changes that are specific to that node and don’t reflect any changes on the other node. Also, when I restart both nodes, both of them only seem to have the state that was changed on the last node.
Persistent Actor
package com.example
import akka.actor.typed.ActorRef
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
object ShoppingCartActor {
trait CartCommand extends CborSerializable
case class AddItem(id: String, quantity: Int, replyTo: ActorRef[CartState]) extends CartCommand
case class RemoveItem(id: String, eplyTo: ActorRef[CartState]) extends CartCommand
case class ViewCart(replyTo: ActorRef[CartState]) extends CartCommand
trait CartEvent extends CborSerializable
case class ItemAdded(id: String, quantity: Int) extends CartEvent
case class ItemRemoved(id: String) extends CartEvent
case class CartState(items: Map[String, Int] = Map.empty)
def apply(): EventSourcedBehavior[CartCommand, CartEvent, CartState] = {
EventSourcedBehavior[CartCommand, CartEvent, CartState] (
PersistenceId("ShoppingCart", "CartId"), CartState(), commandHandler, eventHandler
)
}
def commandHandler(state: CartState, command: CartCommand): Effect[CartEvent, CartState] = command match {
case ViewCart(replyTo) =>
replyTo ! state
Effect.none
case AddItem(id, quantity, replyTo) =>
Effect.persist(ItemAdded(id, quantity)).thenRun(replyTo ! _)
case RemoveItem(id, replyTo) =>
Effect.persist(ItemRemoved(id)).thenRun(replyTo ! _)
}
def eventHandler(state: CartState, event: CartEvent): CartState = event match {
case ItemAdded(id, quantity) => CartState(state.items + (id -> quantity))
case ItemRemoved(id) => CartState(state.items.removed(id))
}
}
HTTP Route
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.util.Timeout
import com.example.ShoppingCartActor._
import spray.json.DefaultJsonProtocol._
import spray.json._
import scala.concurrent.Future
import scala.concurrent.duration._
class ShopingCartRoute(shoppingCartActor: ActorRef[ShoppingCartActor.CartCommand])(implicit val system: ActorSystem[_]) {
implicit val timeout: Timeout = 5.seconds
def addItem(id: String, quantity: Int) : Future[CartState] = shoppingCartActor ? (AddItem(id, quantity, _))
def removeItem(id: String) : Future[CartState] = shoppingCartActor ? (RemoveItem(id, _))
def viewCart() : Future[CartState] = shoppingCartActor ? (ViewCart(_))
implicit val cartFormat = jsonFormat1(CartState)
implicit val ec = system.executionContext
val routes: Route = pathPrefix("cart") {
concat(
pathEnd {
concat(
get {
complete {
viewCart().map(_.toJson.prettyPrint)
}
} ~
post {
parameters(('id, 'quantity.as[Int])) { (id, quantity) =>
complete {
addItem(id, quantity).map(_.toJson.prettyPrint)
}
}
} ~
delete {
parameters('id) {id =>
complete {
removeItem(id).map(_.toJson.prettyPrint)
}
}
}
)
}
)
}
}
Bootstrap Code
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigFactory
import scala.util.{Failure, Success}
object Bootstrap extends App {
val rootBehavior = Behaviors.setup[Nothing] { context =>
context.spawn(ClusterListener(), "ClusterListener")
val shoppingCartActor = context.spawn(ShoppingCartActor(), "shopping-cart-actor")
startHttpServer(new ShopingCartRoute(shoppingCartActor)(context.system).routes, context.system)
Behaviors.empty
}
private def startHttpServer(routes: Route, system: ActorSystem[_]): Unit = {
implicit val classicSystem: akka.actor.ActorSystem = system.toClassic
import system.executionContext
val futureBinding = Http().bindAndHandle(routes, interface = "localhost", 0)
futureBinding.onComplete {
case Success(binding) =>
val address = binding.localAddress
system.log.error("Server online at http://{}:{}/", address.getHostString, address.getPort)
case Failure(ex) =>
system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
system.terminate()
}
}
val port = args(0).toInt
val config = ConfigFactory.parseString(s"akka.remote.artery.canonical.port=$port").withFallback(ConfigFactory.load())
val system = ActorSystem[Nothing](rootBehavior, "shopping-cart", config)
}
Configuration
akka {
loglevel = ERROR
actor {
provider = cluster
serialization-bindings {
"com.example.CborSerializable" = jackson-cbor
}
}
persistence {
journal.plugin = "akka.persistence.cassandra.journal"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
snapshot-store.local.dir = "target/snapshot"
cassandra {
journal {
class = "akka.persistence.cassandra.journal.CassandraJournal"
keyspace-autocreate = true
tables-autocreate = true
}
}
}
remote {
artery {
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:25251",
"akka://ClusterSystem@127.0.0.1:25252"]
}
}
When I look at the journal messages table, I can see three records, even though I had only sent two POST requests.
Thank you