I have recently begun learning Akka, and I am trying to convert a consensus protocol that I wrote in Java to Akka typed actors in Scala. Most of what I have seen is incredibly concise and elegant, but I am finding that certain operations that need to reach out to other actors result in implementations that look anything but concise or elegant. That makes me think that I am doing it wrong.
Here is an example scenario that I would like to do in a functional and idiomatic way with typed actors. To keep actors from being coupled, I made the decision to not keep actor refs in the actor. Instead, I am using service keys to get actor refs when I need them. When I do that, now we have one level of futures to map. Next, I take that actor ref and I ask the actor for a piece of information. Now we are at two levels of futures. Since I have that information, now I will need to use the information in a message I am sending to yet another actor. Enter nested future number 3! Now, I can finally send the information since I have that actor’s ref, but wouldn’t you know it – this is future number 4. You may be thinking that there’s no real reason why I would have to wait to get the second actor ref, if I know that I am going to need it, so I could use for comprehension to do that, right? Well, if that’s not even a viable answer, it is because I am also trying to beef up my Scala skills in the meantime.
Either way, how do I handle this as elegantly as possible? I have looked through the online documentation, and I have purchased Akka In Action, but I have not seen this use case.
Here is an example of a method in one of my actors that does some nesting:
private def processFinalizeMessage(iterationNumber: Int,
localPlayerId: String,
leaderId: String,
finalizeReceipts: Seq[String],
finalizeSigned: FinalizeSigned,
receptionist: ActorRef[Receptionist.Command],
timer: Cancellable,
context: ActorContext[IterationCommand])
(implicit timeout: Timeout, ec: ExecutionContext, scheduler: Scheduler): Behavior[IterationCommand] = {
val finalizeMsg = finalizeSigned.finalizeMsg
if (finalizeMsg.iteration != iterationNumber) {
Behaviors.same
} else {
val finalizeReceipt = finalizeMsg.playerId
val playerServiceRefFuture: Future[Receptionist.Listing] =
receptionist ? Receptionist.Find(PlayerActor.playerServiceKey)
Behaviors.receiveMessage {
case _: IterationCommand =>
Behaviors.same
case newBehavior: Behavior[IterationCommand] @unchecked =>
newBehavior
}
playerServiceRefFuture.flatMap { listing =>
listing.serviceInstances(PlayerActor.playerServiceKey).headOption match {
case Some(playerActorRef) =>
val publicKeyFuture = playerActorRef.ask(replyTo => PlayerActor.GetPlayerPublicKey(finalizeReceipt, replyTo))
publicKeyFuture.map {
case Some(publicKey) =>
if (CryptoUtil.verifySignature(MessageUtils.toBytes(finalizeMsg), finalizeSigned.signature, publicKey, MESSAGE_DIGEST_ALGORITHM)) {
context.self ! iterationBehavior(iterationNumber, localPlayerId, leaderId, finalizeReceipts :+ finalizeReceipt, receptionist, timer, context)
} else {
val errorMessage = s"Signature verification failed for finalize receipt from player $finalizeReceipt"
context.log.warn(errorMessage)
context.self ! Behaviors.same
}
case None =>
val errorMessage = s"Could not find player actor for player ID $finalizeReceipt so the finalize receipt could not be verified."
context.log.warn(errorMessage)
context.self ! Behaviors.same
}.recover {
case ex =>
val errorMessage = s"Error occurred while processing finalize message: ${ex.getMessage}"
context.log.error(errorMessage)
context.self ! Behaviors.same
}
case None =>
val errorMessage = s"Could not find player actor reference to verify the finalize message."
context.log.warn(errorMessage)
context.self ! Behaviors.same
}
}
}
}
I know that this isn’t even fully compiling code yet, but I am trying to figure some of this stuff out, so please be kind!