I am having trouble how transport the CommitableReadResult through the stream, so I can ack and nack the message accordingly. In my case I want to write RabbitMQ events to Elasticsearch.
I’ve managed to lay out the happy path, including ack
private val source = RabbitMq.prepareAmqpSource // little helper which creates a Source with CommitableReadResult
private val sink = Elasticsearch.prepareElasticsearchSink // also a little helper
val convertStage = Flow[CommittableReadResult]
.map(readResult => RabbitMq.convertFromReadResult(readResult))
...some more mapping steps
.map(x=> WriteMessage.createUpsertMessage(x.hashCode(), x)) // this is whats expected of the Elasticsearch-Sink
val rabbitToElasticStage = Flow[CommittableReadResult].alsoTo(convertStage.to(sink))
val done: Future[Done] = source
.via(rabbitToElasticStage)
.mapAsync(1) {
cm => cm.ack()
}
.runWith(Sink.ignore)
The alsoTo Method was the only way I could find to keep a reference to the CommitableReadResult, so I can ack it. Is this even the correct way to do something like this?
What I currently cant figure out is the error-handling which can occur when writting to the sink. Using recover only gives me the Throwable, but I just need the CommitableReadResult to call nack. I could not figure out how a supervision-strategy would help me here.
Turns out that my understanding of Akka Streams was not sufficient. I thought that I always need a sink to write some data, but after having a closer look at the Alpakka-Elasticsearch-API I realized that I can model the writes also as a flow which produces a WriteResult which can be used to decide for ack or nack. The last missing piece was then how to transport a value from the source through my stream.
I found that a new type was added recently called FlowWithContext, which is exactly what I needed. But since not all methods are supported I just moved back to always return a tuple in my steps.
Furthermore Alpakka-Elasticsearch allows to create a flow with a passthrough-value, which would be my CommitableReadResult from RabbitMQ.
val source = AmqpSource.committableSource(settings, 50)
val writeFlow = ElasticsearchFlow.createWithPassThrough[ElasticsearchData, CommitableReadResult]("streampoc1", typeName = "_doc", writeSettings)
val convertStage = Flow[CommittableReadResult]
.map(readResult => (RabbitMq.convertFromReadResult(readResult), readResult))
....more steps always returning Tuple with readResult
.map {
case (eUmsatz: ElasticsearchUmsatz, cm: CommittableReadResult) => WriteMessage.createUpsertMessage(eUmsatz.getKontoId, eUmsatz).withPassThrough(cm)
}
val rabbitToElasticStage = Flow[CommittableReadResult].via(convertStage).via(writeFlow).map { writeResult =>
if (writeResult.success) {
writeResult.message.passThrough.ack()
} else {
writeResult.message.passThrough.nack(requeue = true)
}
}
source.via(rabbitToElasticStage).runWith(Sink.ignore)