Hello
I am tring to run the example at reactivedesignpatterns dot com lectures distributed1.
ClusterMain runs successfully.
Then ClusterWorker gets this error:
Bind failed for TCP channel on endpoint [10.18.1.8:25520]
Address already in use: bind
NOTE: This is the third time I am trying open this issue. Akismet stops it.
ClusterMain code:
package com.example.cluster
import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout}
import akka.cluster.{Cluster, ClusterEvent}
import com.example.cluster.ClusterReceptionist._
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
class ClusterMain extends Actor with ActorLogging {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberUp])
cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
cluster.join(cluster.selfAddress)
val clusterreceptionist = context.actorOf(Props[ClusterReceptionist], “receptionist”)
context watch(clusterreceptionist)
def getlater(d:FiniteDuration,url: String): Unit = {
import context.dispatcher
context.system.scheduler.scheduleOnce(d,clusterreceptionist,Get(url))
}
def receive: Receive =
{
case ClusterEvent.MemberUp(member)=>
if(member.address!=cluster.selfAddress)
{
getlater(1.seconds,“google.com”)
context.setReceiveTimeout(3.seconds)
}
case Result(url, set) =>
println(set.toVector.sorted.mkString(s"Results for ‘$url’:\n", “\n”, “\n”))
case Failed(url,reason) =>
println(s"Failed to fetch ‘$url’ : $reason\n")
case ReceiveTimeout =>
cluster.leave(cluster.selfAddress)
case ClusterEvent.MemberRemoved(m,_)=>
context.stop(self)
}
}
object ClusterMain extends App {
val main = akka.Main.main(Array(classOf[ClusterMain].getName))
}
ClusterWorker code:
package com.example.cluster
import akka.actor.Actor
import akka.cluster.{Cluster, ClusterEvent}
class ClusterWorker extends Actor {
val cluster = Cluster(context.system)
cluster.subscribe(self, classOf[ClusterEvent.MemberRemoved])
val seedAddress = cluster.selfAddress.copy(port = Some(25520))
cluster.join(seedAddress)
override def receive = {
case ClusterEvent.MemberRemoved(m, _) =>
if (m.address == seedAddress) context.stop(self)
}
override def postStop(): Unit = {
WebScrapper.shutdown()
}
}
object ClusterWorker extends App {
val main = akka.Main.main(Array(classOf[ClusterWorker].getName))
}
ClusterMain conf
akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
}
loglevel = “INFO”
cluster {
min-nr-of-members = 2
}
}
ClusterWorker conf:
akka {
actor {
provider = “akka.cluster.ClusterActorRefProvider”
}
remote {
log-remote-lifecycle-events = on
netty.tcp {
port = 0
}
}
loglevel = “INFO”
cluster {
auto-down=on
auto-down-unreachable-after = 10s
}
}
build.sbt :
name := “akka-quickstart-scala”
version := “1.0”
scalaVersion := “2.13.1”
lazy val akkaVersion = “2.6.17”
libraryDependencies ++= Seq(
“com.typesafe.akka” %% “akka-actor-typed” % akkaVersion,
“ch.qos.logback” % “logback-classic” % “1.2.3”,
“com.typesafe.akka” %% “akka-actor-testkit-typed” % akkaVersion % Test,
“com.ning” % “async-http-client” % “1.7.19”,
“com.typesafe.akka” %% “akka-cluster” % akkaVersion,
“org.jsoup” % “jsoup” % “1.11.2”,
“com.typesafe.akka” %% “akka-cluster-typed” % akkaVersion,
“org.scalatest” %% “scalatest” % “3.1.0” % Test
)