Hello :)
Can I ask how to do a conditional publish with msg data in the code shown in the following link?
e.g. if msg.record.value contains some string, then publish otherwise skip etc.
Thank you,
Sean
Hello :)
Can I ask how to do a conditional publish with msg data in the code shown in the following link?
e.g. if msg.record.value contains some string, then publish otherwise skip etc.
Thank you,
Sean
How about using filter, sth like:
...
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.filter(msg=> msg.record.value().nonEmpty)
.map { msg =>
ProducerMessage.single(
new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
msg.committableOffset
)
}
...
Regards¨
Paul
Thank you, Paul for your kind reply
I have two questions.
.toMat(Producer.committableSink(producerSettings, committerSettings))
. Do you know what Producer.committableSink
does? does it commit to the original kafka or the one it publishes to?See: ProducerMessage.PassThroughMessage
pawelkaczor Thanks for the hint about the ProducerMessage.passThrough
I used it in the test mentioned above, like:
val control =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map { msg =>
if (msg.record.value().toInt % 2 == 0) {
// For msg with even numbers: send to the targetTopic
ProducerMessage.single(
new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
// The `passThrough` field may hold any element that is passed through the `Producer.flow`
// In our case it is the [[ConsumerMessage.CommittableOffset]] which is committed later in the flow
msg.committableOffset)
} else {
// For msg with odd numbers: Create a pass-through message not containing any records
ProducerMessage.passThrough[String,String,ConsumerMessage.CommittableOffset](msg.committableOffset)
}
}
.toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
.run()
With this, the targetTopic yields 10 messages containing even numbers (5 from topic1, 5 from topic2 instead of 10 each as in the original test without this conditional filtering). So the messages with the odd numbers are filtered and we have a way to do a conditional publish, yay!
The commits go to the Consumer topic(s). See also answer here: Kafka Transactional Producer
Kind regards
Paul
Thank you, Pawel & Paul.
That was exactly what I was looking for!