I need to implement rabbitmq + alpakka combination.
I have 2 queues in rabbitmq (sourceQ and destinationQ).Cosume from sourceQ and tranform the message and put out it back the new transformed message to destinationQ.
In this flow after successful posting to destinationQ, i have to acknowledge the message, so that it will be removed from sourceQ.
Below is the Flow:
private val amqpSource = AmqpSource.committableSource(
NamedQueueSourceSettings(connectionProvider, “sourceQ”)
.withDeclaration(queueDeclaration)
.withAckRequired(false),
bufferSize = 10
)
private val amqpFlow = AmqpFlow.withConfirm(AmqpWriteSettings(connectionProvider)
.withRoutingKey(destinationQ)
.withDeclaration(destinationQ)
.withBufferSize(10)
.withConfirmationTimeout(200.millis))
final val result = amqpSource
.map(flow1) // message processing cutsom logic
.map(flow2) // message processing cutsom logic
.via(amqpFlow)
.runWith(Sink.seq)
I need to ACK the message after the amqpFlow. Now please guide me, how i will get the original message for ACK after amqpFlow.