I’m not asking a question here. All I’m doing is sharing a solution that I was all ready to ask a question about as I was implementing it. I’m sharing this in case someone else comes along with much the same question.
I have an Akka Streams (Alpakka Kafka connector) workflow that is consuming messages off one Kafka topic, persisting to a database, producing messages to another Kafka topic, and then finally committing the offsets back to the original Kafka topic. I needed to introduce some logic to produce to the downstream Kafka topic only if the message being processed is valid according to our business logic. Whether the message is valid or invalid, the Kafka message’s offset still needs to be committed back upon successful processing.
I wouldn’t have been able to piece together this solution if it weren’t for the solutions I found while searching on this problem. I think what I found missing in the existing examples was the minimal case of injecting a Flow
without any Source
or Sink
involved. Or they otherwise hadn’t quite distilled the problem (my problem) down to its essence.
My solution starts with two alternate flows that I want to pursue depending on whether the message is valid. They’re created by the following two methods:
-
createFlowToProduceValidMessagesToPublishTopic
-
createFlowToSkipInvalidMessages
Both are returning a Flow[ValidatedCommittableMessage, ConsumerMessage.CommittableOffset, NotUsed]
where ValidatedCommittableMessage
is a domain type that has a valid
flag.
Based on this, I’m able to use the Partition
and Merge
constructs of the GraphDSL to fan out and then fan back in:
private def createPartitioningFlowBasedOnWhetherMessageIsValid()(
implicit ec: ExecutionContext): Flow[ValidatedCommittableMessage, ConsumerMessage.CommittableOffset, NotUsed] = {
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(Partition[ValidatedCommittableMessage](2, message => if (message.valid) 1 else 0))
val merge = builder.add(Merge[ConsumerMessage.CommittableOffset](2))
partition.out(0) ~> createFlowToSkipInvalidMessages() ~> merge
partition.out(1) ~> createFlowToProduceValidMessagesToPublishTopic() ~> merge
FlowShape(partition.in, merge.out)
}
Flow.fromGraph(graph)
}
And then I can just insert that into the overall flow with a via
operator so that the conditional forking is encapsulated. Asynchronous semantics are handled elsewhere:
.via(createPartitioningFlowBasedOnWhetherBanIsValid())
This works for me. But if anyone has a cleaner way of pulling this off…