Hi,
I am trying to implement akka sharded clusters with kafka in Java, so I got the weather station example : https://developer.lightbend.com/start/?group=akka&project=akka-samples-cluster-sharding-java
In order to add the kafka sharded I followed the steps described here : https://doc.akka.io/docs/alpakka-kafka/current/cluster-sharding.html
I basically modified the sample with the alpakka-kafka code. The code compile and run, I can see the topic gets created in kafka once the application starts up, but no messages get consumed!
Versions used:
akka version = "2.6.8"
akka-stream-kafka-cluster-sharding_2.13:jar:2.0.4:compile
akka-http_2.13:jar:10.1.11
kafka:kafka-clients:jar:2.4.1:compile
logback-classic:jar:1.2.3:compile
SYSTEM:
macOS Catalina
Version 10.15.6
openjdk version “11.0.7” 2020-04-14 LTS
OpenJDK Runtime Environment Corretto-11.0.7.10.1 (build 11.0.7+10-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.7.10.1 (build 11.0.7+10-LTS, mixed mode)
POM.XML
<modelVersion>4.0.0</modelVersion>
<artifactId>akka-sample-sharding-killrweather-java</artifactId>
<groupId>com.lightbend.akka.samples</groupId>
<name>Killrweather</name>
<version>1.0-SNAPSHOT</version>
<licenses>
<license>
<name>Public Domain (CC0)</name>
<url>http://creativecommons.org/publicdomain/zero/1.0/</url>
</license>
</licenses>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.6.8</akka.version>
<akka-http.version>10.1.11</akka-http.version>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-typed_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-sharding-typed_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka-cluster-sharding_2.13</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-serialization-jackson_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http_2.13</artifactId>
<version>${akka-http.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-jackson_2.13</artifactId>
<version>${akka-http.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
<arg>-Xlint:deprecation</arg>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<version>1.6.0</version>
<artifactId>exec-maven-plugin</artifactId>
<groupId>org.codehaus.mojo</groupId>
<configuration>
<mainClass>sample.killrweather.KillrWeather</mainClass>
</configuration>
</plugin>
</plugins>
</build>
Here are my code snippets:
public static final EntityTypeKey<WeatherStation.Command> TypeKey =
EntityTypeKey.create(WeatherStation.Command.class, "WeatherStation");
public static final String REGISTER_TRADE_TOPIC = "register-trade-topic";
public static void initSharding(ActorSystem<?> system) {
// ClusterSharding.get(system).init(Entity.of(TypeKey, entityContext ->
// WeatherStation.create(entityContext.getEntityId())
// ));
String groupId = "register-trade-topic-group-id";
EntityTypeKey<User.Command> typeKey = EntityTypeKey.create(User.Command.class, groupId);
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User.Command>> messageExtractor =
KafkaClusterSharding.get(system)
.messageExtractorNoEnvelope(
REGISTER_TRADE_TOPIC,
Duration.ofSeconds(10),
(User.Command msg) -> msg.getId(),
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId(
typeKey
.name()));
messageExtractor.thenAccept(
extractor ->
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> userBehaviour(ctx.getEntityId()))
.withAllocationStrategy(
new ExternalShardAllocationStrategy(
system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
KafkaClusterSharding.get(system).rebalanceListener(typeKey);
ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId(
typeKey
.name()); // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
AutoSubscription subscription =
Subscriptions.topics(REGISTER_TRADE_TOPIC)
.withRebalanceListener(Adapter.toClassic(rebalanceListener));
Consumer.plainSource(consumerSettings, subscription);
}
private static Behavior<User.Command> userBehaviour(final String wsid) {
return Behaviors.setup(context ->
new User(context, wsid)
);
}
USER :
package sample.killrweather;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Receive;
public class User extends AbstractBehavior<User.Command> {
@Override
public Receive<Command> createReceive() {
System.out.println("*************************");
System.out.println("****** PASSED HERE *******");
System.out.println("*************************");
return null;
}
public interface Command {
String getId();
}
public final String id;
public User(ActorContext<Command> context, String entityId) {
super(context);
this.id = entityId;
System.out.println("*************************");
System.out.println("****** PASSED HERE 2 *******");
System.out.println("*************************");
}
public String getId() {
System.out.println("*************************");
System.out.println("****** PASSED HERE 3*******");
System.out.println("*************************");
return this.id;
}
}```
**Here is the full startup logs:**
marciomarinho@ip-10-0-5-24 akka-samples-cluster-sharding-java % mvn -pl killrweather exec:java
[INFO] Scanning for projects…
[INFO]
[INFO] --< com.lightbend.akka.samples:akka-sample-sharding-killrweather-java >–
[INFO] Building Killrweather 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
…
…
[akka://KillrWeather@127.0.0.1:2554] - Started up successfully
[2020-08-24 13:49:47,491] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.sbr.SplitBrainResolver] [KillrWeather-akka.actor.default-dispatcher-3] [akka://KillrWeather/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KillrWeather@127.0.0.1:2554,3421553894839139366), selfDc: default
[2020-08-24 13:49:47,516] [INFO] [] [org.apache.kafka.clients.consumer.ConsumerConfig] [KillrWeather-akka.kafka.default-dispatcher-18] [] - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = register-trade-topic-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2020-08-24 13:49:47,523] [INFO] [] [org.apache.kafka.clients.consumer.ConsumerConfig] [KillrWeather-akka.kafka.default-dispatcher-18] [] - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = register-trade-topic-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2020-08-24 13:49:47,537] [INFO] [] [akka.event.slf4j.Slf4jLogger] [KillrWeather-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[2020-08-24 13:49:47,555] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.remote.artery.tcp.ArteryTcpTransport] [KillrWeather-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KillrWeather)] - Remoting started with transport [Artery tcp]; listening on address [akka://KillrWeather@127.0.0.1:55840] with UID [-4516988406641602295]
[2020-08-24 13:49:47,556] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:55840] - Starting up, Akka version [2.6.8] …
[2020-08-24 13:49:47,561] [WARN] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Could not register Cluster JMX MBean with name=akka:type=Cluster as it is already registered. If you are running multiple clusters in the same JVM, set ‘akka.cluster.jmx.multi-mbeans-in-same-jvm = on’ in config
[2020-08-24 13:49:47,561] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:55840] - Started up successfully
[2020-08-24 13:49:47,566] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.sbr.SplitBrainResolver] [KillrWeather-akka.actor.default-dispatcher-3] [akka://KillrWeather/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KillrWeather@127.0.0.1:55840,-4516988406641602295), selfDc: default
[2020-08-24 13:49:47,598] [INFO] [] [org.apache.kafka.clients.consumer.ConsumerConfig] [KillrWeather-akka.kafka.default-dispatcher-19] [] - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = register-trade-topic-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2020-08-24 13:49:47,599] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-13] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2554] - Received InitJoin message from [Actor[akka://KillrWeather@127.0.0.1:2553/system/cluster/core/daemon/firstSeedNodeProcess-1#-1459961682]], but this node is not initialized yet
[2020-08-24 13:49:47,599] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Received InitJoin message from [Actor[akka://KillrWeather@127.0.0.1:2554/system/cluster/core/daemon/joinSeedNodeProcess-1#100786241]], but this node is not initialized yet
[2020-08-24 13:49:47,605] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Received InitJoinNack message from [Actor[akka://KillrWeather@127.0.0.1:2554/system/cluster/core/daemon#-627533646]] to [akka://KillrWeather@127.0.0.1:2553]
[2020-08-24 13:49:47,623] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Node [akka://KillrWeather@127.0.0.1:2553] is JOINING itself (with roles [dc-default]) and forming new cluster
[2020-08-24 13:49:47,626] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - is the new leader among reachable nodes (more leaders may exist)
[2020-08-24 13:49:47,627] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-13] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2554] - Received InitJoin message from [Actor[akka://KillrWeather@127.0.0.1:55840/system/cluster/core/daemon/joinSeedNodeProcess-1#1385125671]], but this node is not initialized yet
[2020-08-24 13:49:47,629] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-19] [] - Kafka version: 2.4.1
[2020-08-24 13:49:47,629] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-19] [] - Kafka commitId: c57222ae8cd7866b
[2020-08-24 13:49:47,629] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-19] [] - Kafka startTimeMs: 1598240987627
[2020-08-24 13:49:47,632] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka version: 2.4.1
[2020-08-24 13:49:47,632] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka commitId: c57222ae8cd7866b
[2020-08-24 13:49:47,632] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka startTimeMs: 1598240987628
[2020-08-24 13:49:47,633] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka version: 2.4.1
[2020-08-24 13:49:47,633] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka commitId: c57222ae8cd7866b
[2020-08-24 13:49:47,633] [INFO] [] [org.apache.kafka.common.utils.AppInfoParser] [KillrWeather-akka.kafka.default-dispatcher-18] [] - Kafka startTimeMs: 1598240987626
[2020-08-24 13:49:47,637] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Leader is moving node [akka://KillrWeather@127.0.0.1:2553] to [Up]
[2020-08-24 13:49:47,643] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Received InitJoin message from [Actor[akka://KillrWeather@127.0.0.1:55840/system/cluster/core/daemon/joinSeedNodeProcess-1#1385125671]] to [akka://KillrWeather@127.0.0.1:2553]
[2020-08-24 13:49:47,643] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Sending InitJoinAck message from node [akka://KillrWeather@127.0.0.1:2553] to [Actor[akka://KillrWeather@127.0.0.1:55840/system/cluster/core/daemon/joinSeedNodeProcess-1#1385125671]] (version [2.6.8])
[2020-08-24 13:49:47,647] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.sbr.SplitBrainResolver] [KillrWeather-akka.actor.default-dispatcher-22] [akka://KillrWeather/system/cluster/core/daemon/downingProvider] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[2020-08-24 13:49:47,696] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:55840] - Received InitJoinAck message from [Actor[akka://KillrWeather@127.0.0.1:2553/system/cluster/core/daemon#-1624818115]] to [akka://KillrWeather@127.0.0.1:55840]
[2020-08-24 13:49:47,709] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Node [akka://KillrWeather@127.0.0.1:55840] is JOINING, roles [dc-default]
[2020-08-24 13:49:47,742] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:55840] - Welcome from [akka://KillrWeather@127.0.0.1:2553]
[2020-08-24 13:49:47,875] [INFO] [] [org.apache.kafka.clients.Metadata] [KillrWeather-akka.kafka.default-dispatcher-19] [] - [Consumer clientId=consumer-register-trade-topic-group-id-3, groupId=register-trade-topic-group-id] Cluster ID: DLdQJZL5TcKILpkouN-B6Q
[2020-08-24 13:49:47,875] [INFO] [] [org.apache.kafka.clients.Metadata] [KillrWeather-akka.kafka.default-dispatcher-18] [] - [Consumer clientId=consumer-register-trade-topic-group-id-1, groupId=register-trade-topic-group-id] Cluster ID: DLdQJZL5TcKILpkouN-B6Q
[2020-08-24 13:49:47,875] [INFO] [] [org.apache.kafka.clients.Metadata] [KillrWeather-akka.kafka.default-dispatcher-18] [] - [Consumer clientId=consumer-register-trade-topic-group-id-2, groupId=register-trade-topic-group-id] Cluster ID: DLdQJZL5TcKILpkouN-B6Q
[2020-08-24 13:49:47,994] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.actor.ActorSystemImpl] [KillrWeather-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(KillrWeather)] - Retrieved 1 partitions for topic ‘register-trade-topic’
[2020-08-24 13:49:47,994] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.actor.ActorSystemImpl] [KillrWeather-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(KillrWeather)] - Retrieved 1 partitions for topic ‘register-trade-topic’
[2020-08-24 13:49:47,994] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.actor.ActorSystemImpl] [KillrWeather-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(KillrWeather)] - Retrieved 1 partitions for topic ‘register-trade-topic’
[2020-08-24 13:49:48,019] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KillrWeather-akka.actor.default-dispatcher-22] [ClusterSharding(akka://KillrWeather)] - Starting Shard Region [register-trade-topic-group-id]…
[2020-08-24 13:49:48,019] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KillrWeather-akka.actor.default-dispatcher-15] [ClusterSharding(akka://KillrWeather)] - Starting Shard Region [register-trade-topic-group-id]…
[2020-08-24 13:49:48,019] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KillrWeather-akka.actor.default-dispatcher-13] [ClusterSharding(akka://KillrWeather)] - Starting Shard Region [register-trade-topic-group-id]…
[2020-08-24 13:49:48,036] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-22] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Leader is moving node [akka://KillrWeather@127.0.0.1:55840] to [Up]
[2020-08-24 13:49:48,049] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.sharding.ShardRegion] [KillrWeather-akka.actor.default-dispatcher-15] [akka://KillrWeather@127.0.0.1:55840/system/sharding/register-trade-topic-group-id] - register-trade-topic-group-id: Idle entities will be passivated after [2.000 min]
[2020-08-24 13:49:48,049] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.sharding.ShardRegion] [KillrWeather-akka.actor.default-dispatcher-13] [akka://KillrWeather@127.0.0.1:2554/system/sharding/register-trade-topic-group-id] - register-trade-topic-group-id: Idle entities will be passivated after [2.000 min]
[2020-08-24 13:49:48,049] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.sharding.ShardRegion] [KillrWeather-akka.actor.default-dispatcher-22] [akka://KillrWeather@127.0.0.1:2553/system/sharding/register-trade-topic-group-id] - register-trade-topic-group-id: Idle entities will be passivated after [2.000 min]
[2020-08-24 13:49:48,065] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.singleton.ClusterSingletonManager] [KillrWeather-akka.actor.default-dispatcher-3] [akka://KillrWeather@127.0.0.1:2553/system/sharding/register-trade-topic-group-idCoordinator] - Singleton manager starting singleton actor [akka://KillrWeather/system/sharding/register-trade-topic-group-idCoordinator/singleton]
[2020-08-24 13:49:48,066] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.singleton.ClusterSingletonManager] [KillrWeather-akka.actor.default-dispatcher-22] [akka://KillrWeather@127.0.0.1:2553/system/sharding/register-trade-topic-group-idCoordinator] - ClusterSingletonManager state change [Start -> Oldest]
[2020-08-24 13:49:48,090] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.sharding.DDataShardCoordinator] [KillrWeather-akka.actor.default-dispatcher-22] [akka://KillrWeather@127.0.0.1:2553/system/sharding/register-trade-topic-group-idCoordinator/singleton/coordinator] - ShardCoordinator was moved to the active state State(Map())
[2020-08-24 13:49:48,588] [INFO] [akka://KillrWeather@127.0.0.1:55840] [akka.cluster.singleton.ClusterSingletonManager] [KillrWeather-akka.actor.default-dispatcher-15] [akka://KillrWeather@127.0.0.1:55840/system/sharding/register-trade-topic-group-idCoordinator] - ClusterSingletonManager state change [Start -> Younger]
[2020-08-24 13:49:52,633] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Received InitJoin message from [Actor[akka://KillrWeather@127.0.0.1:2554/system/cluster/core/daemon/joinSeedNodeProcess-1#100786241]] to [akka://KillrWeather@127.0.0.1:2553]
[2020-08-24 13:49:52,633] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Sending InitJoinAck message from node [akka://KillrWeather@127.0.0.1:2553] to [Actor[akka://KillrWeather@127.0.0.1:2554/system/cluster/core/daemon/joinSeedNodeProcess-1#100786241]] (version [2.6.8])
[2020-08-24 13:49:52,659] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2554] - Received InitJoinAck message from [Actor[akka://KillrWeather@127.0.0.1:2553/system/cluster/core/daemon#-1624818115]] to [akka://KillrWeather@127.0.0.1:2554]
[2020-08-24 13:49:52,660] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Node [akka://KillrWeather@127.0.0.1:2554] is JOINING, roles [dc-default]
[2020-08-24 13:49:52,662] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2554] - Welcome from [akka://KillrWeather@127.0.0.1:2553]
[2020-08-24 13:49:53,124] [INFO] [akka://KillrWeather@127.0.0.1:2553] [akka.cluster.Cluster] [KillrWeather-akka.actor.default-dispatcher-3] [Cluster(akka://KillrWeather)] - Cluster Node [akka://KillrWeather@127.0.0.1:2553] - Leader is moving node [akka://KillrWeather@127.0.0.1:2554] to [Up]
[2020-08-24 13:49:53,130] [INFO] [akka://KillrWeather@127.0.0.1:2554] [akka.cluster.singleton.ClusterSingletonManager] [KillrWeather-akka.actor.default-dispatcher-33] [akka://KillrWeather@127.0.0.1:2554/system/sharding/register-trade-topic-group-idCoordinator] - ClusterSingletonManager state change [Start -> Younger]