Hi,
We have created a Alpakka stream, which consumes Kafka message from a topic and then process those messages. These messages are processed in parallel, using mapAsyncUnordered with a configured parallelism. The Kafka lag for the consumer increases, but the application uses only 1 core of CPU. I have changed the default dispatchers to akka.actor.default-dispatchers, which uses a fork-join executor expecting it to use more than a CPU core. I have my application running in 32 cores.
Please find the configured settings below:
akka.kafka.consumer.use-dispatcher = "akka.actor.default-dispatcher"
Consumer stream code:
Consumer.DrainingControl<Done> control = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.buffer( 500, OverflowStrategy.backpressure() )
//De-serialize the response from json to java object
.mapAsyncUnordered( 5, //deserialize the output )
.mapAsyncUnordered(5, //Process it and perform some calculations )
.mapAsyncUnordered( 5, //Do something and return the consumer offset )
//Commit the offset
.toMat( Committer.sink(committerSettings.withMaxBatch(100)), Consumer::createDrainingControl)
.run( materializer );
The stream runs in a akka-cluster, which is load balanced by same consumer group id. We have a typed actor system as well in the application which is used for triggering the request, with a group router which helps in sharing the load across the cluster. The triggered request is sent to a micro service as a Kafka message and we get a response as a Kafka message which is processed by streams. And these messages are not necessarily to be processed in order, hence the use of mapAsyncUnordered…
Tried increasing the parallelism to even 100, but didn’t see a change.
Thanks in advance