Alpakka Kafka Conditional Publish

Hello :)

Can I ask how to do a conditional publish with msg data in the code shown in the following link?

e.g. if msg.record.value contains some string, then publish otherwise skip etc.

Thank you,
Sean

How about using filter, sth like:

...
Consumer
   .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
   .filter(msg=> msg.record.value().nonEmpty)
    .map { msg =>
      ProducerMessage.single(
        new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
        msg.committableOffset
      )
    }
...

Regards¨
Paul

Thank you, Paul for your kind reply :smile:
I have two questions.

  1. If we ‘filter’ the stream, will skip offset commit to the original kafka topic from the consumer?
  2. the github code does .toMat(Producer.committableSink(producerSettings, committerSettings)). Do you know what Producer.committableSink does? does it commit to the original kafka or the one it publishes to?

See: ProducerMessage.PassThroughMessage

pawelkaczor Thanks for the hint about the ProducerMessage.passThrough

I used it in the test mentioned above, like:

val control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map { msg =>
      if (msg.record.value().toInt % 2 == 0) {
        // For msg with even numbers: send to the targetTopic
        ProducerMessage.single(
          new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
          // The `passThrough` field may hold any element that is passed through the `Producer.flow`
          // In our case it is the [[ConsumerMessage.CommittableOffset]] which is committed later in the flow
          msg.committableOffset)
      } else {
      // For msg with odd numbers: Create a pass-through message not containing any records
      ProducerMessage.passThrough[String,String,ConsumerMessage.CommittableOffset](msg.committableOffset)
      }
    }
    .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
    .run()

With this, the targetTopic yields 10 messages containing even numbers (5 from topic1, 5 from topic2 instead of 10 each as in the original test without this conditional filtering). So the messages with the odd numbers are filtered and we have a way to do a conditional publish, yay!
The commits go to the Consumer topic(s). See also answer here: Kafka Transactional Producer

Kind regards
Paul

Thank you, Pawel & Paul.

That was exactly what I was looking for!