Hi, I’m using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
My pipeline looks something like this:
source
.throttle(25, 1.second)
.filter(predicate)
.groupedWithin(25, 5.seconds)
.mapAsync(1) { batch =>
processAsync(batch)
}
.toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
.run()