Not able to Produce message to myTopic using Transactional.Sink when I'm using Transactional.Source to consume message

Hi I was trying to use Producer api like shown in Alpakka documentation.
I’m able to consume record using Transactional source and Producer is created but not able to put message to topic
Not able to Produce to topic using Transactional.Sink in Alpakka but I see idempotent producer is enabled.
I see logs that it’s comming into logic But it’s not producing events to myTopic

[info] o.a.k.c.p.KafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c]  the idempotent producer is enabled.

Could you please help me understand why it might not produce message to topic

I’m running my code locally using docker

Below is my code

Transactional.source(consumerSettings,
                Subscriptions.topics(topicNames))
                .mapMaterializedValue(innerControl = _)
                .map(consumerRecord => {
                  handleBusiness(consumerRecord)
                    .flatMap(res => Source.single(res)
/**
* Instead of this I also tried Source.future
*/
                           .runWith(Transactional.sink(producerSettings, 
                                               UUID.randomUUID().toString)))
                })
            }
              source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:

  private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {
        (conversion of consumedMessage after applying ETL gives me Future ) map { message =>
             ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)

      }
  }

I have used mapAsync instead of map now because my business logic was returning future

Hi @appy,

The source code formatting went wild, so it is not obvious how it fits together.

For transactional flows you always need to combine a Transactional.source with a Transactional.sink.

Cheers,
Enno.