Hi all.
This is an interesting issue my team and I have been facing, and would be happy to get your input on.
We are running Akka streams over kafka, utilizing MergePrioritized to merge streams from several topics.
Here is the behaviour we are witnessing:
Consumer A starts up, and starts consuming.
Consumer A finishes processing up to offset 1000 for partition X (commiting)
Consumer B starts up, and a reassignment occurs.
Consumer B is assigned partition X
Consumer B starts processing from offset 920 for partition X
So we end up reprocessing ~50-100 messages per partition, every time a reassignment occurs.
I assume this can’t be completely avoided, but I was wondering if there is something we are doing wrong, or a way in which we can minimize this reprocessing behaviour.
He is what our code looks like:
private val consumerSettings = ConsumerSettings(actorSystem,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafka_bootstrap_servers)
.withGroupId("job_priority_consumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
.withMaxWakeups(24 * 60)
.withWakeupTimeout(60 seconds)
private val stream1 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_1)).buffer(2000, OverflowStrategy.backpressure)
private val stream2 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_2)).buffer(1000, OverflowStrategy.backpressure)
private val stream3 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_3)).buffer(5000, OverflowStrategy.backpressure)
private val stream4 = Consumer.committableSource(consumerSettings, Subscriptions.topics(topic_4)).buffer(2000, OverflowStrategy.backpressure)
private val PRIORITY_1 = 2
private val PRIORITY_2 = 1
private val PRIORITY_3 = 5
private val PRIORITY_4 = 2
val jobsProcessing: NotUsed = Source.combine(
stream1,
stream2,
stream3,
stream4
)(
_ =>
MergePrioritized(Seq(PRIORITY_1, PRIORITY_2, PRIORITY_3, PRIORITY_4))
).mapAsync(100) {
msg =>
handleMessage(msg.record).recover(PartialFunction(handleError)).map(_ -> msg)
}.map {
msg =>
//commit message
msg._2.committableOffset.commitScaladsl()
}.withAttributes(supervisionStrategy(stoppingDecider)).to(Sink.ignore).run()
private def handleError(t: Throwable) = {
t match {
case NonFatal(e) =>
Logger.warn(e.getLocalizedMessage)
Future.successful()
}
}
Please tell me if there’s any additional info that is missing here