Does Akka support messages that require long processing time?

Hi everyone,

I have a general question. Is Akka design to process messages that can run both long or short? Is there limitation as to whether a message should not run longer than x amount of time in order for it to work? For our use case, a message typically runs couple of minutes. When the message is received, it invoked a legacy module that only supports single threaded processing. So, therefore, one process only has one actor instance in it. There could also be some outlier messages that run more than couple of minutes.

I was reading this post: https://stackoverflow.com/questions/20825570/why-do-i-get-handshake-timed-out-how-can-i-configure-timeout-period

It seems his message runs 3 seconds and he gets a handshake error. Could you explain and if there is a suggested pattern of usage for potentially long running requests.

Thank you,
Grace

Processing is in general expected to be fast, there’s basically two cases where it isn’t: when the logic is CPU bound and just takes a lot of time, and when there is blocking IO going on - which essentially is holding on to a thread but not really doing anything. Both of these can lead to “starving” the default dispatcher of Akka, which is a small number of threads based on the idea that if your app is async, it cannot do much more work than you have cores anyways. Starving the dispatcher then leads to the threads are not available to run other actors.

For blocking things, which it sounds like you describe with the legacy module, you should isolate such actors on a separate threadpool based dispatcher so that they do not affect the rest of your actors. If the legacy logic can only ever be used by a specific thread you could also use the pinned dispatcher which dedicates a thread to the actor.

For CPU bound workloads you can also solve it in one more way, you can split the work up in smaller steps, and have the actor send itself the partial progress it has done as a message, this gives the thread back to the dispatcher and allows Akka to run other actors before continuing with the next step of computing.

Thanks Johan. I am trying to simulate the thread starvation scenario that you mentioned so I can gain a better understanding. I have a tutorial example here in which I attempt to have default-dispatcher overridden and that parallelism-max = 1. I expect then I will see one of these messages at a time

I’m a backend with path: Actor[akka://ClusterSystem/user/backend#1996649166] and I received add operation.

but I see three of these messages in concurrent. (there are three similuated backend services)
I thought that the parallelism-max will limit the parallelism to 1. Could you advise how I can modify the example to see the thread starvation you were mentioning? Am I specifying the configuration correctly?

Thanks,
Grace

akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
default-dispatcher {
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
throughput = 1
}
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = “127.0.0.1”
port = 0
}
}

cluster {
seed-nodes = [
“akka.tcp://ClusterSystem@127.0.0.1:2551”,
“akka.tcp://ClusterSystem@127.0.0.1:2552”]

auto-down-unreachable-after = 10s
}
}

akka.cluster.min-nr-of-members = 3

akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}

akka.actor.deployment {
/frontend/backendRouter {
# Router type provided by metrics extension.
router = adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
nr-of-instances = 100
routees.paths = [“/user/backend”]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}

=====================

package com.packt.akka.loadBalancing

import com.packt.akka.commons.Add

object LoadBalancingApp extends App {

//initiate three nodes from backend
Backend.initiate(2551)

Backend.initiate(2552)

Backend.initiate(2561)

//initiate frontend node
Frontend.initiate()

Thread.sleep(10000)

Frontend.getFrontend ! Add(2, 4)

}

===================================

package com.packt.akka.loadBalancing

import akka.cluster._
import com.packt.akka.commons._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.MemberUp
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, RootActorPath }

class Backend extends Actor {

def receive = {
case Add(num1, num2) =>
println(s"I’m a backend with path: ${self} and I received add operation.“)
Thread.sleep(60000)
println(s"I’m a backend with path: ${self} and I am done with add operation.”)
}

}

object Backend {
def initiate(port: Int){
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString(“akka.cluster.roles = [backend]”)).
withFallback(ConfigFactory.load(“loadbalancer”))

val system = ActorSystem("ClusterSystem", config)

val Backend = system.actorOf(Props[Backend], name = "backend")

}
}

=================================

package com.packt.akka.loadBalancing

import com.packt.akka.commons._
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.cluster.Cluster
import akka.routing.FromConfig
import akka.actor.ReceiveTimeout
import scala.util.Random

class Frontend extends Actor {
import context.dispatcher

val backend = context.actorOf(FromConfig.props(), name = “backendRouter”)

context.system.scheduler.schedule(3.seconds, 3.seconds, self,
Add(Random.nextInt(100), Random.nextInt(100)))

def receive = {
case addOp: Add =>
println(“Frontend: I’ll forward add operation to backend node to handle it.”)
backend forward addOp

}

}

object Frontend {

private var frontend: ActorRef =

val upToN = 200

def initiate() = {
val config = ConfigFactory.parseString(“akka.cluster.roles = [frontend]”).
withFallback(ConfigFactory.load(“loadbalancer”))

val system = ActorSystem(“ClusterSystem”, config)
system.log.info(“Frontend will start when 2 backend members in the cluster.”)
//#registerOnUp
Cluster(system) registerOnMemberUp {
_frontend = system.actorOf(Props[Frontend],
name = “frontend”)
}
//#registerOnUp

}

def getFrontend = _frontend
}
===================