Graph-DSL Partition Source/FlowWithContext not recognising ~>

Hello everyone.

I would like to partition my stream which starts from a AMQP source which is converted to a SourceWithContext, in order ack/nack the message at the end of the stream.
It seems the Partition-operator is not able to handle a Flow/SourceWithContext because the compiler cannot resolve ~> when using those xWithContext types.

private[stream] lazy val amqpSource: SourceWithContext[MyProtobufType, CommittableReadResult, NotUsed] = {
  val connection = AmqpLocalConnectionProvider

  val settings = NamedQueueSourceSettings(connection, queue.name)
    .withAckRequired(true)
    .withDeclarations(Seq(exchange, queue, binding))
  AmqpSource.committableSource(settings, 50)
    .asSourceWithContext[CommittableReadResult]((x: CommittableReadResult) => x)
    .map(readResult => MyProtobufType.parseFrom(readResult.message.bytes.toArray))
}

def partitioner(event: MyProtobufType): Int = {
  event.payload match {
	case Payload.A(_) => 0
	case Payload.B(_) => 1
	case Payload.C(_) => 2
	case Payload.Empty => 4
  }
}

val graph = RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._

  val partition = builder.add(new Partition[MyProtobufType](4, partitioner, false))

            partition.out(1) // A
  source ~> partition.out(2) // B ~> more steps
            partition.out(3) // C
            partition.out(4) // emtpy payload

  ClosedShape
})

When I am using a simple Flow on the Partition-operator, ~> gets resolved.
Any ideas?

Well, after working around for a day now it really looks like the Partition-Operator is not able to handle a SourceWithContext. When I convert it back to a Source of Tuple2 the GraphDSL accepts the connections and resolves ~>

Hi Marc,

The WithContext sources and flows are a different view on flows of tuples. Underneath they are just that.

You can move in and out of that abstraction with the FlowWithContext.fromTuples and .asFlow.

As you found out, the Graph DSL does not support FlowWithContext you need to drop down to tuple flows.

The WithContext APIs are more restrictive than other Flows as they exist to guarantee message order.

Hope this helps you to find an even better solution…

Cheers,
Enno.

Thanks for the clarrification Enno.

Out of curiosity: when the WithContext types are merely views on Tuples, why are they restrictive and why do they have to guarantee message order?

Since this feature is quite new, are there plans to improve support in the GraphDSL?

The main scenario which drove the addition of WithContext is the use of acknowledgements or commits of received messages. The idea is to pass the handle for those as context so it frees the user from handling it explicitly in the flow. For some technologies (namely Kafka) it is important not to reorder commits, as that would commit messages that might not have been fully processed.

No, the Graph DSL will not get support for WithContext. There it is required to use the tuple flows.

Cheers,
Enno.