Hello everyone,
I’m attempting to send a message from senderActor within Microservice A using the concept of “Classic Distributed Publish Subscribe in Cluster” I’m trying to publish the message to the content-topic, but I’m not receiving it in the ReceiverActor located in Microservice B. Do you have any insight into what might be causing this issue?
Thank you!
t he Conf-file in A
akka {
actor {
provider = "cluster"
}
remote {
artery {
enabled = on
canonical {
hostname = "localhost"
port = 2551
}
}
}
cluster {
seed-nodes = [
"akka://cluster-system@localhost:2551"
]
}
}
The SenderActor:
public class SenderActor extends AbstractActor {
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
in -> {
String out = in.toUpperCase();
mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
})
.build();
}
}
the main in Microservice A:
public static void main(String[] args) {
SpringApplication.run(MicroserviceAaApplication.class, args);
Config config = ConfigFactory.load("application.conf");
ActorSystem system = ActorSystem.create("cluster-system", config);
Cluster cluster = Cluster.get(system);
ActorRef publisher = system.actorOf(Props.create(SenderActor.class), "publisher");
publisher.tell("hello from microservice A ", null);
}
the Conf-file in B
akka {
actor {
provider = "cluster"
}
remote {
artery {
enabled = on
canonical {
hostname = "localhost"
port = 2552
}
}
}
cluster {
seed-nodes = [
"akka://cluster-system@localhost:2551"
]
}
}
The ReceiverActor class:
public class ReceiverActor extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public ReceiverActor() {
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
log.info("Received message in receiver-actor-b: {}", msg);
})
.match(DistributedPubSubMediator.SubscribeAck.class,
msg -> log.info("SubscribeAck: {}", msg))
.build();
}
}
the main in Microservice B:
public static void main(String[] args) {
SpringApplication.run(MicroserviceBbApplication.class, args);
Config config = ConfigFactory.load("application.conf");
ActorSystem system = ActorSystem.create("cluster-system", config);
Cluster cluster = Cluster.get(system);
ActorRef subscriber = system.actorOf(Props.create(ReceiverActor.class), "subscriber");
}
itry to get the message "hello from microservice A ", but i get just the flowings logs in B:
2023-08-20T15:53:14.067+02:00 INFO 7788 --- [ main] c.e.M.MicroserviceBbApplication : Started MicroserviceBbApplication in 0.909 seconds (process running for 1.723)
2023-08-20T15:53:14.552+02:00 INFO 7788 --- [lt-dispatcher-5] akka.event.slf4j.Slf4jLogger : Slf4jLogger started
2023-08-20T15:53:14.900+02:00 INFO 7788 --- [lt-dispatcher-5] akka.remote.artery.ArteryTransport : Remoting started with transport [Artery tcp]; listening on address [akka://cluster-system@localhost:2552] with UID [6730186213375081245]
2023-08-20T15:53:14.925+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Starting up, Akka version [2.8.3] ...
2023-08-20T15:53:14.980+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Registered cluster JMX MBean [akka:type=Cluster]
2023-08-20T15:53:14.981+02:00 INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Started up successfully
2023-08-20T15:53:15.006+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#downing
2023-08-20T15:53:15.624+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Received InitJoinAck message from [Actor[akka://cluster-system@localhost:2551/system/cluster/core/daemon#1745370250]] to [akka://cluster-system@localhost:2552]
2023-08-20T15:53:15.675+02:00 INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster : Cluster Node [akka://cluster-system@localhost:2552] - Welcome from [akka://cluster-system@localhost:2551]
2023-08-20T15:53:16.077+02:00 INFO 7788 --- [lt-dispatcher-5] c.example.MicroserviceBB.ReceiverActor : SubscribeAck: SubscribeAck(Subscribe(content,None,Actor[akka://cluster-system/user/subscriber1#-339648796]))
2023-08-20T16:24:19.549+02:00 INFO 7788 --- [t-dispatcher-40] akka.actor.CoordinatedShutdown : Running CoordinatedShutdown with reason [JvmExitReason]
Process finished with exit code 130