I have a subscriber service which listens on Kafka topic for events.
It takes those events, and transforms them to commands for my entity.
val parallelism = 16
kafkaProxy.dataTopic.subscribe.atLeastOnce(
Flow[Data]
.flatMapConcat(data => Source(groupBySession(data)))
.mapAsyncUnordered(parallelism)(sd => entityRefFor(sd.entityId).ask(RecordSessionData(sd.sessionId, sd)))
)
groupBySession is basically
list.groupBy(_.session).map(/* wrap to domain type */).toList
I also have a read side, which reacts on events generated by my service which reacts to these commands.
When I publish data to Kafka (20 sessions), depending on parallelism levels, only a portion of sessions is actually processed.
For example, with parallelism level 16, 18 sessions are processed and recorded.
With 4, 6 sessions.
With 1, 3 sessions.
With 32, all 20 sessions.
I am running it in dev mode (via sbt lagom:runAll).
I also have another Lagom service subscribed to the same Kafka topic, but with different group id.
service1/src/main/application.conf
akka.kafka.consumer {
kafka-clients {
group.id = "service-1-group"
}
}
service2/src/main/application.conf
akka.kafka.consumer {
kafka-clients {
group.id = "service-2-group"
}
}
Am I doing something wrong here? Could someone shed some light?