Hi I was trying to use Producer api like shown in Alpakka documentation.
I’m able to consume record using Transactional source and Producer is created but not able to put message to topic
Not able to Produce to topic using Transactional.Sink in Alpakka but I see idempotent producer is enabled.
I see logs that it’s comming into logic But it’s not producing events to myTopic
[info] o.a.k.c.p.KafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] the idempotent producer is enabled.
Could you please help me understand why it might not produce message to topic
I’m running my code locally using docker
Below is my code
Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
/**
* Instead of this I also tried Source.future
*/
.runWith(Transactional.sink(producerSettings,
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)
And my handleBusiness logics looks like below:
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]] = {
(conversion of consumedMessage after applying ETL gives me Future ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}
}