Thanks Johan. I am trying to simulate the thread starvation scenario that you mentioned so I can gain a better understanding. I have a tutorial example here in which I attempt to have default-dispatcher overridden and that parallelism-max = 1. I expect then I will see one of these messages at a time
I’m a backend with path: Actor[akka://ClusterSystem/user/backend#1996649166] and I received add operation.
but I see three of these messages in concurrent. (there are three similuated backend services)
I thought that the parallelism-max will limit the parallelism to 1. Could you advise how I can modify the example to see the thread starvation you were mentioning? Am I specifying the configuration correctly?
Thanks,
Grace
akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
default-dispatcher {
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
throughput = 1
}
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = “127.0.0.1”
port = 0
}
}
cluster {
seed-nodes = [
“akka.tcp://ClusterSystem@127.0.0.1:2551”,
“akka.tcp://ClusterSystem@127.0.0.1:2552”]
auto-down-unreachable-after = 10s
}
}
akka.cluster.min-nr-of-members = 3
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
akka.actor.deployment {
/frontend/backendRouter {
# Router type provided by metrics extension.
router = adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
nr-of-instances = 100
routees.paths = [“/user/backend”]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}
=====================
package com.packt.akka.loadBalancing
import com.packt.akka.commons.Add
object LoadBalancingApp extends App {
//initiate three nodes from backend
Backend.initiate(2551)
Backend.initiate(2552)
Backend.initiate(2561)
//initiate frontend node
Frontend.initiate()
Thread.sleep(10000)
Frontend.getFrontend ! Add(2, 4)
}
===================================
package com.packt.akka.loadBalancing
import akka.cluster._
import com.packt.akka.commons._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.MemberUp
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, RootActorPath }
class Backend extends Actor {
def receive = {
case Add(num1, num2) =>
println(s"I’m a backend with path: ${self} and I received add operation.“)
Thread.sleep(60000)
println(s"I’m a backend with path: ${self} and I am done with add operation.”)
}
}
object Backend {
def initiate(port: Int){
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString(“akka.cluster.roles = [backend]”)).
withFallback(ConfigFactory.load(“loadbalancer”))
val system = ActorSystem("ClusterSystem", config)
val Backend = system.actorOf(Props[Backend], name = "backend")
}
}
=================================
package com.packt.akka.loadBalancing
import com.packt.akka.commons._
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.cluster.Cluster
import akka.routing.FromConfig
import akka.actor.ReceiveTimeout
import scala.util.Random
class Frontend extends Actor {
import context.dispatcher
val backend = context.actorOf(FromConfig.props(), name = “backendRouter”)
context.system.scheduler.schedule(3.seconds, 3.seconds, self,
Add(Random.nextInt(100), Random.nextInt(100)))
def receive = {
case addOp: Add =>
println(“Frontend: I’ll forward add operation to backend node to handle it.”)
backend forward addOp
}
}
object Frontend {
private var frontend: ActorRef =
val upToN = 200
def initiate() = {
val config = ConfigFactory.parseString(“akka.cluster.roles = [frontend]”).
withFallback(ConfigFactory.load(“loadbalancer”))
val system = ActorSystem(“ClusterSystem”, config)
system.log.info(“Frontend will start when 2 backend members in the cluster.”)
//#registerOnUp
Cluster(system) registerOnMemberUp {
_frontend = system.actorOf(Props[Frontend],
name = “frontend”)
}
//#registerOnUp
}
def getFrontend = _frontend
}
===================