I would like to partition my stream which starts from a AMQP source which is converted to a SourceWithContext, in order ack/nack the message at the end of the stream.
It seems the Partition-operator is not able to handle a Flow/SourceWithContext because the compiler cannot resolve ~> when using those xWithContext types.
private[stream] lazy val amqpSource: SourceWithContext[MyProtobufType, CommittableReadResult, NotUsed] = {
val connection = AmqpLocalConnectionProvider
val settings = NamedQueueSourceSettings(connection, queue.name)
.withAckRequired(true)
.withDeclarations(Seq(exchange, queue, binding))
AmqpSource.committableSource(settings, 50)
.asSourceWithContext[CommittableReadResult]((x: CommittableReadResult) => x)
.map(readResult => MyProtobufType.parseFrom(readResult.message.bytes.toArray))
}
def partitioner(event: MyProtobufType): Int = {
event.payload match {
case Payload.A(_) => 0
case Payload.B(_) => 1
case Payload.C(_) => 2
case Payload.Empty => 4
}
}
val graph = RunnableGraph.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val partition = builder.add(new Partition[MyProtobufType](4, partitioner, false))
partition.out(1) // A
source ~> partition.out(2) // B ~> more steps
partition.out(3) // C
partition.out(4) // emtpy payload
ClosedShape
})
When I am using a simple Flow on the Partition-operator, ~> gets resolved.
Any ideas?
Well, after working around for a day now it really looks like the Partition-Operator is not able to handle a SourceWithContext. When I convert it back to a Source of Tuple2 the GraphDSL accepts the connections and resolves ~>
The main scenario which drove the addition of WithContext is the use of acknowledgements or commits of received messages. The idea is to pass the handle for those as context so it frees the user from handling it explicitly in the flow. For some technologies (namely Kafka) it is important not to reorder commits, as that would commit messages that might not have been fully processed.
No, the Graph DSL will not get support for WithContext. There it is required to use the tuple flows.