When writing a websocket server with akka-actor, I found that there is a need (and possibly a pattern) to create a Future when handling a message, and mutate actor’s internal state after the Future completes.
My current (hopefully valid) solution is to send the actor itself another message ( ScheduledComputation
), and mutate own state in a future run of receive ( handleScheduledComputation
). The detail can be found in attached code.
In this way I can write conceptually consecutive computations together (like in handleUserMessage
), but as I’m new to akka and actors, I feel insecure to use it before hearing from others (the thing I wanted to do may be an anti-pattern after all).
Any comments are welcome.
// ActorFutureRunnableSupport.scala
import akka.actor.Actor
import scala.concurrent.{Future, Promise}
import scala.util.Try
private case class ScheduledComputation[A](createdIn: Actor, computation: () => Try[A]) {
private val executed = Promise[A]
private lazy val result = computation()
/**
* MUST be executed within another receive() run of the same actor
* MUST execute only once
*
* @param runIn
*/
def run(runIn: Actor): Unit = {
if (createdIn ne runIn) {
throw new IllegalThreadStateException(
s"Continuation is created in $createdIn but executed in $runIn")
}
executed.complete(result) // would throw if already complete-d
}
val future: Future[A] = executed.future
}
trait ActorFutureRunnableSupport extends Actor {
protected def handleScheduledComputation: Receive = {
case c: ScheduledComputation[Any] =>
c.run(this)
}
protected def runInNextTick[A](computation: () => Try[A]): Future[A] = {
val cont: ScheduledComputation[A] = ScheduledComputation(this, computation)
self ! cont
cont.future // completes after cont.run called by handleContinuation
}
}
// ExampleActor.scala
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorRef}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import io.jokester.util.akka.actor.ActorFutureRunnableSupport
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success
case class UserMessage()
case class ServerMessage()
case class ExampleActorState()
case class HandlerActorResponse()
abstract class ExampleActor extends Actor with ActorFutureRunnableSupport {
var state: ExampleActorState = ExampleActorState()
def receive: Receive = handleScheduledComputation.orElse(handleUserMessage)
def handleUserMessage: Receive = {
case req: UserMessage =>
implicit val (timeout, ec) = defaultAsyncTimeout()
val res: Future[ServerMessage] = createHandlerActorIfNotExist() // create a (maybe remote) handler actor if not exist
.flatMap(handleActor => handleActor ? req) // ask (maybe new created) handlerActor
.mapTo[HandlerActorResponse]
.flatMap(handled => {
runInNextTick(() => {
state = ??? // mutate own state after ask completes, in another run of receive()
Success(ServerMessage())
})
})
pipe(res) to sender
}
abstract def createHandlerActorIfNotExist(): Future[ActorRef]
def defaultAsyncTimeout(): (Timeout, ExecutionContext) = {
val timeout: Timeout = Timeout(5, TimeUnit.SECONDS)
val ec = context.system.dispatcher
(timeout, ec)
}
}