I am using Akka Streams to process the Kafka data from input topic and after processing send it to output kafka topic. My main requirement is to keep this whole process in real time scenario. Now, Actually when I increases the load on kafka input topic say 600 tps for 20 mins, And I observe the processing speed of my code it is quite effective one. But the moment I stop the load that is being generated to kafka input topic the processing speed of my code reduces and it effects my main requirement i.e. of real time processing. So I want to know actually what is the case or some thing that I might be missing that affects my code after stop in load? There is something that I am missing, It will be great if someone can help me out in suggesting some means or some point that needs to be taken care while working with akka and kafka together? Or Is there anything that affects the kafka or it’s offeset when akka stream backpressure the data to kafka input topic?
How do you track the processing speed? When do you consider the end-to-end processing being done?
One thing that come to mind are the committer settings. The default settings make the Committer
collect offsets until it has collected offsets for 1000 messages or alternatively waits for 10 seconds before they are sent off for committing. If the number of messages becomes low you might see those 10 seconds before an offset is committed.
Cheers,
Enno.
Hi @ennru
To observe the end-to-end processing, I am looking into the kafka output topic. Like as per my main requirement when we stop the load, the count in kafka output topic should also stop increasing (ideally as I want real time) but it didn’t. And to observe the processing speed it can be observed that the rate of increment in count of messages in kafka output topic reduces as we stop the load.
val consumerSource: Source[String, Consumer.Control] = Consumer
.plainSource(consumerSettings, Subscriptions.topics(s"$inputTopic"))
.map(ConsumerRecord => {
val value = ConsumerRecord.value()
value
})
I am using plain Source for consuming the data from kafka, and in documentation it was mentioned that it will take care of offset as well.
Is it is a correct way to consume data from kafka or some parameters need to be configured if I want real time streaming with a load like 600 tps?
Also when akka streams backpressures at what specific point the messages are queued up, Is it in the kafka itself? If yes, Does it affect the offset or some other parameter of kafka?
If no, then at what point the messages are queued up when consumer sends a backpressure signal?
If I understand this correctly, you are inspecting the time from producing data on your input topic to that the Alpakka Kafka flow has produced corresponding data on its output topic.
I would expect some delay between you stop producing data and before the producing of your flow stops, as there is data in-flow already that will continue to be processed. These buffers resides in the topic, the Kafka client library and the Akka Stream. You may tweak some of them eg. max.partition.fetch.bytes
, but the standard settings work mostly well.
The Consumer.plainSource
does not do anything about offsets or committing. “They do not have support for committing offsets to Kafka. When using these Sources, either store an offset externally, or use auto-commit (note that auto-commit is disabled by default).”
The backpressure in your stream would only be the Kafka producer itself, it backpressures when producing to Kafka starts blocking as buffers get full.
Cheers,
Enno.