Persistent Actor Discovery

Hello,

I am using AKKA Persistence ob its own without clustering of sharding. I have a simple HTTP route with a POST and a GET. In the POST I create the persistent actor instance. I need to access the same actor in the GET to retrieve the state. I also want it accessible after a server restart and recovery. What is the best way to lookup the persistent actor? I tried sending a receptionist subscription from the persistence actor after successful creation and recovery. However, if I try to find the actor I am getting an empty listing. Is there another API to access an instance of a persistent actor?

The code for the persistent actor is shown below.

object PersistentCompanyActor {

  sealed trait Event extends CborSerializable;
  case class CreatedEvent(company: Company) extends Event
  case class UpdatedEvent(company: Company) extends Event

  sealed trait Command extends CborSerializable;
  case class CreateCommand(company: Company, replyTo: ActorRef[Result]) extends Command
  case class UpdateCommand(company: Company, replyTo: ActorRef[Result]) extends Command
  case class GetCommand(replyTo: ActorRef[Company]) extends Command

  case class Result(success: Boolean, error: Option[String]) {
    def this() = this(true, None)
    def this(error: String) = this(false, Some(error))
  }

  def apply(companyId: String): Behavior[Command] = Behaviors.setup[PersistentCompanyActor.Command] { ctx =>
    EventSourcedBehavior[Command, Event, Option[Company]](
      persistenceId = PersistenceId.ofUniqueId(companyId),
      emptyState = None,
      commandHandler = commandHandler(ctx, _, _),
      eventHandler = eventHandler
    ).receiveSignal {
      case (state, RecoveryCompleted) => notifyRegistry(ctx, companyId)
    }
  }

  def commandHandler(ctx: ActorContext[_], oldState: Option[Company], cmd: Command): Effect[Event, Option[Company]] = cmd match {
    case CreateCommand(newState, replyTo) =>
      persist(UpdatedEvent(newState)).
        thenRun((x: Option[Company]) => notifyRegistry(ctx, newState.companyId.get)).
        thenReply(replyTo)(state => new Result())
    case UpdateCommand(newState, replyTo) =>
      persist(UpdatedEvent(newState)).
        thenReply(replyTo)(state => new Result())
    case GetCommand(replyTo) => Effect.none.thenReply(replyTo)(state => state.get)
  }

  def eventHandler(oldState: Option[Company], evt: Event): Option[Company] = {
    println("Event handler being run " + evt)
    evt match {
      case CreatedEvent(newState) => Some(newState)
      case UpdatedEvent(newState) => Some(newState)
    }
  }

  private def notifyRegistry(ctx: ActorContext[_], companyId: String) = {
    ctx.system.receptionist ! Receptionist.Register(ServiceKey("Company-" + companyId), ctx.self)
    ctx.log.info("Subscription message sent " + companyId)
  }

}

And this is the code where I create and try to find the actor. I am hoping there is a better way to to do this, as the code below looks a bit convoluted for looking up an actor.

sealed trait CompanyRegistryActorCommand
case class CreateRegistry(company: Company, replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class UpdateRegistry(companyId: String, company: Company, replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class UpdateRegistry2(company: Company, companyActor: ActorRef[PersistentCompanyActor.Command], replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class GetRegistry(companyId: String, replyTo: ActorRef[Company]) extends CompanyRegistryActorCommand
case class GetRegistry2(companyActor: ActorRef[PersistentCompanyActor.Command], replyTo: ActorRef[Company]) extends CompanyRegistryActorCommand
case class WrappeduserCreatedResponse(ucr: UserCreationResponse) extends CompanyRegistryActorCommand
case class CompanyActorRegistration(id: String, actor: ActorRef[PersistentCompanyActor.Command]) extends CompanyRegistryActorCommand

object CompanyRegistryActor {

  var companyActors = scala.collection.mutable.Map.empty[String, ActorRef[PersistentCompanyActor.Command]]
  val Key: ServiceKey[CompanyRegistryActorCommand] = ServiceKey("CompanyRegistry")

  def apply(fbs: FirebaseService): Behavior[CompanyRegistryActorCommand] = Behaviors.setup[CompanyRegistryActorCommand] { ctx =>

    val fbsResponseHandler = ctx.messageAdapter[UserCreationResponse](response => WrappeduserCreatedResponse(response))
    ctx.system.receptionist ! Receptionist.Register(Key, ctx.self)

    Behaviors.receiveMessage {
      case CreateRegistry(company, replyTo) =>
        ctx.log.info("Create message received by company registry")
        val companyId = Some(UUID.randomUUID().toString)
        val firebaseActor = ctx.spawn(FirebaseActor(fbs), "FirebaseActor")
        firebaseActor ! CreateUserCommand(company.copy(companyId = companyId), replyTo, fbsResponseHandler)
        Behaviors.same
      case UpdateRegistry(companyId, company, replyTo) =>
        ctx.log.info("Update message received by company registry")
        implicit val timeout: Timeout = 1.second
        val key: ServiceKey[PersistentCompanyActor.Command] = ServiceKey("Company-" + companyId)
        ctx.ask(ctx.system.receptionist, Find(key)){
          case Success(listing: Listing) =>
            ctx.log.info("Company actor looked up " + companyId)
            val companyActor = listing.serviceInstances[PersistentCompanyActor.Command](key).head
            UpdateRegistry2(company, companyActor, replyTo)
        }
        Behaviors.same
      case UpdateRegistry2(company, compnayActor, replyTo) =>
        compnayActor ! UpdateCommand(company, replyTo)
        Behaviors.same
      case GetRegistry(companyId, replyTo) =>
        ctx.log.info("Get message received by company registry")
        implicit val timeout: Timeout = 1.second
        val key: ServiceKey[PersistentCompanyActor.Command] = ServiceKey("Company-" + companyId)
        ctx.ask(ctx.system.receptionist, Find(key)){
          case Success(listing: Listing) =>
            ctx.log.info("Company actor looked up " + companyId)
            val companyActor = listing.serviceInstances[PersistentCompanyActor.Command](key).head
            GetRegistry2(companyActor, replyTo)
        }
        Behaviors.same
      case GetRegistry2(compnayActor, replyTo) =>
        ctx.log.info("Get message received by company registry")
        compnayActor ! GetCommand(replyTo)
        Behaviors.same
      case WrappeduserCreatedResponse(response) =>
        ctx.log.info("Firebase response received by registry")
        if (response.success) {
          val companyActor = ctx.spawn(PersistentCompanyActor(response.company.companyId.get), "CompanyActor-" + response.company.companyId.get)
          companyActors += (response.company.companyId.get -> companyActor)
          companyActor ! CreateCommand(response.company, response.originator)
        } else {
          response.originator ! Result(response.success, response.error)
        }
        Behaviors.same
      case CompanyActorRegistration(id, actor) =>
        companyActors += (id -> actor)
        Behaviors.same
    }
  }

}

Many thanks

I have managed to get the discovery working under normal flow by type tagging service key. However, when I restart the server, the recovery code on the persistent behavior doesn’t seem to be running. This means the code I have for registering persistent actor instances with the receptionist doesn’t run either.

    ).receiveSignal {
      case (state, RecoveryCompleted) => notifyRegistry(ctx, companyId)
    }

I can see the entries in the journal with the persistence Ids. However, when I restart the server, they are never loaded, nor I can see the recovery callback from being executed. Is there anything I need to do explicitly for the instances to be loaded? ANy pointers would be highly appreciated. Kind regards.

  def apply(companyId: String): Behavior[Command] = Behaviors.setup[PersistentCompanyActor.Command] { ctx =>
    EventSourcedBehavior[Command, Event, Option[Company]](
      persistenceId = PersistenceId.ofUniqueId(companyId),
      emptyState = None,
      commandHandler = commandHandler(ctx, _, _),
      eventHandler = eventHandler
    ).receiveSignal {
      case (state, RecoveryCompleted) => notifyRegistry(ctx, companyId)
    }
  }

A persistent actor is in no way automatically started when the system starts up, it is perfectly fine to have a huge number of persistent actors with stored events but only a few or none of them running at a given time.

To get your actors started on a cold start of the system you will have to store that in some persistent way and load it on system start and then start each of those persistent actors.

You can achieve this using the Remember Entities feature of Akka Cluster Sharding. The alternative is to implement something like that feature yourself - an actor that keeps track of what persistence ids it has started, loads that set of ids and starts them on a cold start.

Such an actor can keep track of exactly what ids are started and be responsible for starting, stopping and looking up the ActorRef for a given id but note that this is much like what cluster sharding gives you out of the box, but you will have to implement and test it yourself, so make sure you are confident that it is the right decision.

I would not in general recommend involving the receptionist for a per-id based lookup of persistent actors, think about it more as a place to look up services/workers for some kind of workload.

1 Like

Thank you very much. I do have an account of all the persistence Ids. With the persistence Ids, what is the best way to load the persistent actor instance?

Kind regards

I am currently spawning the actor by specifying the persistence Id. Is that the right approach?

Many thanks

Yes, starting a persistent actor with the id is how you do it. Note that it is very important that you do not start multiple instances for the same id as that will corrupt the event log for that id (this the central thing that Cluster Sharding solves for a clustered application) so if you do something more elaborate than spawning them all as children to a supervising parent you will have to be careful to make sure of this.

1 Like

Many thanks for the detailed reply.

The pattern I have is I spawn a root level singleton non-persistent actor, which I pass to a HTTP route. When the requests come in, the routes send the message to this actor. The actor retrieves the persistence Id from some context and spawns the persistent actors. From what you have said, I will need to maintain a cache of persistent actors within the singleton actor and only spawn the persistent actor instances if they don’t exist in the cache and add any spawned instances to the cache. Since all the messages to the singleton actor are serial, the cache will be thread safe.

However, since creation of the persistent actor will involve loading the state from the database, does that mean warming the cache lazily can become a bottleneck in the singleton actor? I understand sharding will address a lot of these concurrency issues. I just want to start simple and get this running on GCP with K8, before I start looking into clustering and sharding.

Kind regards

That completely depends on your application - how many actors there are, how many events they store, how fast replaying those events are, how many requests for unstarted actors will arrive at the same time etc. so the only way to know is to benchmark with more or less realistic workloads and compare to your requirements for latency and throughput.

1 Like

Understood, at some point I will port this to using sharding, so that I don’t need to worry about some of these concerns within the application. Many thanks for your clear and quick responses, I believe I now get the core workings.

Kind regards