Hello,
I am using AKKA Persistence ob its own without clustering of sharding. I have a simple HTTP route with a POST and a GET. In the POST I create the persistent actor instance. I need to access the same actor in the GET to retrieve the state. I also want it accessible after a server restart and recovery. What is the best way to lookup the persistent actor? I tried sending a receptionist subscription from the persistence actor after successful creation and recovery. However, if I try to find the actor I am getting an empty listing. Is there another API to access an instance of a persistent actor?
The code for the persistent actor is shown below.
object PersistentCompanyActor {
sealed trait Event extends CborSerializable;
case class CreatedEvent(company: Company) extends Event
case class UpdatedEvent(company: Company) extends Event
sealed trait Command extends CborSerializable;
case class CreateCommand(company: Company, replyTo: ActorRef[Result]) extends Command
case class UpdateCommand(company: Company, replyTo: ActorRef[Result]) extends Command
case class GetCommand(replyTo: ActorRef[Company]) extends Command
case class Result(success: Boolean, error: Option[String]) {
def this() = this(true, None)
def this(error: String) = this(false, Some(error))
}
def apply(companyId: String): Behavior[Command] = Behaviors.setup[PersistentCompanyActor.Command] { ctx =>
EventSourcedBehavior[Command, Event, Option[Company]](
persistenceId = PersistenceId.ofUniqueId(companyId),
emptyState = None,
commandHandler = commandHandler(ctx, _, _),
eventHandler = eventHandler
).receiveSignal {
case (state, RecoveryCompleted) => notifyRegistry(ctx, companyId)
}
}
def commandHandler(ctx: ActorContext[_], oldState: Option[Company], cmd: Command): Effect[Event, Option[Company]] = cmd match {
case CreateCommand(newState, replyTo) =>
persist(UpdatedEvent(newState)).
thenRun((x: Option[Company]) => notifyRegistry(ctx, newState.companyId.get)).
thenReply(replyTo)(state => new Result())
case UpdateCommand(newState, replyTo) =>
persist(UpdatedEvent(newState)).
thenReply(replyTo)(state => new Result())
case GetCommand(replyTo) => Effect.none.thenReply(replyTo)(state => state.get)
}
def eventHandler(oldState: Option[Company], evt: Event): Option[Company] = {
println("Event handler being run " + evt)
evt match {
case CreatedEvent(newState) => Some(newState)
case UpdatedEvent(newState) => Some(newState)
}
}
private def notifyRegistry(ctx: ActorContext[_], companyId: String) = {
ctx.system.receptionist ! Receptionist.Register(ServiceKey("Company-" + companyId), ctx.self)
ctx.log.info("Subscription message sent " + companyId)
}
}
And this is the code where I create and try to find the actor. I am hoping there is a better way to to do this, as the code below looks a bit convoluted for looking up an actor.
sealed trait CompanyRegistryActorCommand
case class CreateRegistry(company: Company, replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class UpdateRegistry(companyId: String, company: Company, replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class UpdateRegistry2(company: Company, companyActor: ActorRef[PersistentCompanyActor.Command], replyTo: ActorRef[Result]) extends CompanyRegistryActorCommand
case class GetRegistry(companyId: String, replyTo: ActorRef[Company]) extends CompanyRegistryActorCommand
case class GetRegistry2(companyActor: ActorRef[PersistentCompanyActor.Command], replyTo: ActorRef[Company]) extends CompanyRegistryActorCommand
case class WrappeduserCreatedResponse(ucr: UserCreationResponse) extends CompanyRegistryActorCommand
case class CompanyActorRegistration(id: String, actor: ActorRef[PersistentCompanyActor.Command]) extends CompanyRegistryActorCommand
object CompanyRegistryActor {
var companyActors = scala.collection.mutable.Map.empty[String, ActorRef[PersistentCompanyActor.Command]]
val Key: ServiceKey[CompanyRegistryActorCommand] = ServiceKey("CompanyRegistry")
def apply(fbs: FirebaseService): Behavior[CompanyRegistryActorCommand] = Behaviors.setup[CompanyRegistryActorCommand] { ctx =>
val fbsResponseHandler = ctx.messageAdapter[UserCreationResponse](response => WrappeduserCreatedResponse(response))
ctx.system.receptionist ! Receptionist.Register(Key, ctx.self)
Behaviors.receiveMessage {
case CreateRegistry(company, replyTo) =>
ctx.log.info("Create message received by company registry")
val companyId = Some(UUID.randomUUID().toString)
val firebaseActor = ctx.spawn(FirebaseActor(fbs), "FirebaseActor")
firebaseActor ! CreateUserCommand(company.copy(companyId = companyId), replyTo, fbsResponseHandler)
Behaviors.same
case UpdateRegistry(companyId, company, replyTo) =>
ctx.log.info("Update message received by company registry")
implicit val timeout: Timeout = 1.second
val key: ServiceKey[PersistentCompanyActor.Command] = ServiceKey("Company-" + companyId)
ctx.ask(ctx.system.receptionist, Find(key)){
case Success(listing: Listing) =>
ctx.log.info("Company actor looked up " + companyId)
val companyActor = listing.serviceInstances[PersistentCompanyActor.Command](key).head
UpdateRegistry2(company, companyActor, replyTo)
}
Behaviors.same
case UpdateRegistry2(company, compnayActor, replyTo) =>
compnayActor ! UpdateCommand(company, replyTo)
Behaviors.same
case GetRegistry(companyId, replyTo) =>
ctx.log.info("Get message received by company registry")
implicit val timeout: Timeout = 1.second
val key: ServiceKey[PersistentCompanyActor.Command] = ServiceKey("Company-" + companyId)
ctx.ask(ctx.system.receptionist, Find(key)){
case Success(listing: Listing) =>
ctx.log.info("Company actor looked up " + companyId)
val companyActor = listing.serviceInstances[PersistentCompanyActor.Command](key).head
GetRegistry2(companyActor, replyTo)
}
Behaviors.same
case GetRegistry2(compnayActor, replyTo) =>
ctx.log.info("Get message received by company registry")
compnayActor ! GetCommand(replyTo)
Behaviors.same
case WrappeduserCreatedResponse(response) =>
ctx.log.info("Firebase response received by registry")
if (response.success) {
val companyActor = ctx.spawn(PersistentCompanyActor(response.company.companyId.get), "CompanyActor-" + response.company.companyId.get)
companyActors += (response.company.companyId.get -> companyActor)
companyActor ! CreateCommand(response.company, response.originator)
} else {
response.originator ! Result(response.success, response.error)
}
Behaviors.same
case CompanyActorRegistration(id, actor) =>
companyActors += (id -> actor)
Behaviors.same
}
}
}
Many thanks