Hi ,
I am reading data from a Kafka topic. After reading the data, some processing is done and pushed to some other Kafka topic.
Basically, if the data is of type A then it will be pushed to topic A. For now, I am doing for only one type of data. So the scenario is each time it reads a data, it will see its type . If its type A then it pushes to Topic A else it neglects.
val consumerSource: Source[String, Consumer.Control] = Consumer
.plainSource(consumerSettings, Subscriptions.topics(s"$inputTopic"))
.buffer(10000, OverflowStrategy.backpressure)
.mapAsync(parallelism)(ConsumerRecord => Future {
val value = ConsumerRecord.value()
value
})
val filterFlow = // ** filter out Type A elements" //
val kafkaStoreFlow: Flow[String, ProducerRecord[String, String], NotUsed] = Flow[String]
.map { value => {
new ProducerRecord[String, String](s"$outputTopic", value)
}}
val kafkaSink = Producer.plainSink(producerSettings)
consumerSource.via(filterFlow).via(KafkaStoreFlow).runWith(kafkaSink)
When I am doing a load test, I am hitting 600 tps for 30 min. Now ,after sometime, the application starts processing the data very late. When the load is stopped, it still processes the messages. After stopping load, it takes another 30 min to process the entire messages. So, in this case Akka backpressures, and all the messages are getting queued somewhere , so when the load is stopped, the application is taking data from that queues. This is my understanding. Is this correct?
When Akka backpressures, are the data being queued up in the Kafka queue or else? Basically in my condition when backpressure happens, what is the flow of data ? Can anyone help me out?