Hello
I am trying to run cluster example here: reactivedesignpatterns dot com
but I receive this error when running ClusterWorker. I am new to Akka. What should I do?
[ERROR] [akka.io.TcpListener] [Main-akka.actor.default-dispatcher-5]
Bind failed for TCP channel on endpoint [10.18.1.8:25520]
java.net.BindException: [10.18.1.8:25520] Address already in use: bind
The ClusterMain code is this:
import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout}
import akka.cluster.{Cluster, ClusterEvent}
import com.example.cluster.ClusterReceptionist._
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
//ClusterSeed
class ClusterMain extends Actor with ActorLogging {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberUp])
//cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
cluster.join(cluster.selfAddress)
val clusterreceptionist = context.actorOf(Props[ClusterReceptionist], “receptionist”)
context watch(clusterreceptionist) //added //sign death pact
def getlater(d:FiniteDuration,url: String): Unit = {
import context.dispatcher
context.system.scheduler.scheduleOnce(d,clusterreceptionist,Get(url))
}
def receive: Receive =
{
case ClusterEvent.MemberUp(member)=>
if(member.address!=cluster.selfAddress)
{
getlater(1.seconds,“google.com”)
context.setReceiveTimeout(3.seconds)
}
case Result(url, set) =>
println(set.toVector.sorted.mkString(s"Results for ‘$url’:\n", “\n”, “\n”))
case Failed(url,reason) =>
println(s"Failed to fetch ‘$url’ : $reason\n")
case ReceiveTimeout =>
cluster.leave(cluster.selfAddress)
case ClusterEvent.MemberRemoved(m,_)=>
context.stop(self)
}
}
object ClusterMain extends App {
val main = akka.Main.main(Array(classOf[ClusterMain].getName))
}
the ClusterMain.conf is like this:
akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
}
loglevel = “INFO”
cluster {
min-nr-of-members = 2
}
}
ClusterWorker like this:
import akka.actor.Actor
import akka.cluster.{Cluster, ClusterEvent}
//ClusterNode
class ClusterWorker extends Actor {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
val seedAddress = cluster.selfAddress.copy(port = Some(15520))
cluster.join(seedAddress)
override def receive = {
case ClusterEvent.MemberRemoved(m, _) =>
if (m.address == seedAddress) context.stop(self)
}
override def postStop(): Unit = {
WebScrapper.shutdown()
}
}
object ClusterWorker extends App {
val main = akka.Main.main(Array(classOf[ClusterWorker].getName))
}
ClusterWorker.conf like this:
akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
}
remote {
log-remote-lifecycle-events = on
netty.tcp {
port = 0
}
}
loglevel = “INFO”
cluster {
auto-down=on
auto-down-unreachable-after = 10s
}
}