Hey guys,
I am on a quest to integrate Akka Sharded Cluster Java with Kafka, so I got the killrweather sample application from Lightbend website and modified it using the " Akka Cluster Sharding" doc ( for some reason I can’t paste more than 2 links, but the link is on the issue below) .
My repo with killrweather sample app : https://github.com/marciomarinho/akka-samples-cluster-sharding-kafka-java
The issue Sean responded with detailed explanations : https://github.com/akka/alpakka-kafka/issues/1189
Long story short, I realised the messages were not getting consumed and @seglo helped me to test the alpakka integration, which worked after I applied the modifications suggested by Sean, and was able to see the message arriving:
public static void initSharding(ActorSystem<?> system) {
// ClusterSharding.get(system).init(Entity.of(TypeKey, entityContext ->
// WeatherStation.create(entityContext.getEntityId())
// ));
String groupId = "register-trade-topic-group-id";
EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor =
KafkaClusterSharding.get(system)
.messageExtractorNoEnvelope(
REGISTER_TRADE_TOPIC,
Duration.ofSeconds(10),
(User msg) -> msg.id,
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId(
typeKey
.name()));
messageExtractor.thenAccept(
extractor ->
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> userBehaviour(ctx.getEntityId()))
// Entity.of(typeKey, ctx -> WeatherStation.create(ctx.getEntityId()))
.withAllocationStrategy(
new ExternalShardAllocationStrategy(
system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
KafkaClusterSharding.get(system).rebalanceListener(typeKey);
ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId(
typeKey
.name()); // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
AutoSubscription subscription =
Subscriptions.topics(REGISTER_TRADE_TOPIC)
.withRebalanceListener(Adapter.toClassic(rebalanceListener));
Consumer.plainSource(consumerSettings, subscription)
//.via(userBusiness()) // put your business logic, or omit to just try starting the stream
.map(e -> {
String s = new String(e.value());
System.out.println(s);
return s;
}
)
.runWith(Sink.ignore(), system);
}
Produces the following Application’s log:
…
- ClusterSingletonManager state change [Start -> Younger]
{
“name”: “Molecule Man”,
“age”: 29,
“secretIdentity”: “Dan Jukes”,
“powers”: [
“Radiation resistance”,
“Turning tiny”,
“Radiation blast”
]
}
`
The messages are arriving, but they are not being consumed by the consumers, as shown in the “Aligning Kafka Partitions with Akka Cluster Sharding” on Github.
Btw, that Scala sample does not work either, it does not dispatch the message to the consumers as it should by the docs:
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 29 to cluster sharding
[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29 purchase cat t-shirt, quantity 0, price 8874
[info] [2020-01-16 09:51:39,702] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 60->111
[info] [2020-01-16 09:51:39,703] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 60 to cluster sharding
[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60 purchase cat t-shirt, quantity 2, price 9375
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 75 to cluster sharding
So, if there is anyone with this knowledge, help me please!!!