Hello,
I couldn’t find clear answer to my question so I’ve decided to post it here:
I have stream that must preserve order to ack appropriately on Kafka using Commiter
(https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-in-kafka-committing).
This stream to some elements apply conditional flow based on predicate using custom stage described here: https://gist.github.com/davideicardi/2a6e2a93507a731026160c2d383528ab#gistcomment-3109158
As applied action is async I would like to run them similar to function that could be executed in .mapAsync
with given parallelism level but preserve elements order.
Currently my impl look like below:
kafkaMessageSource
.via(doSthWithEachElement)
.via(applyConditionalFlow) // here we're using custom stage that apply flow conditionally https://gist.github.com/davideicardi/2a6e2a93507a731026160c2d383528ab#gistcomment-3109158
.map(res => res.originalMsg.committableOffset)
.via(CommittingFlow())
.runWith(Sink.ignore)
Is there a way to run applyConditionalFlow
parallel with given by parameter parallelism level (similar to .mapAsync
) with preserving order?