I am currently working on a PR to add SinkWithContext
into akka-streams I came across a blocker when creating a helper function called SinkWithContext.fromDataAndContext
. Below is the definition of the function
def fromDataAndContext[In, CtxIn, MatIn, MatCtx, Mat3](
dataSink: Graph[SinkShape[In], MatIn],
contextSink: Graph[SinkShape[CtxIn], MatCtx],
combine: (MatIn, MatCtx) => Mat3,
dataComesFirst: Boolean = true)(
strategy: Int => Graph[UniformFanInShape[Either[In, CtxIn], Either[In, CtxIn]], NotUsed]): SinkWithContext[In, CtxIn, Mat3] = {
SinkWithContext.fromTuples(Sink.fromGraph(GraphDSL.createGraph(dataSink, contextSink)(combine) {
implicit b => (dSink, ctxSink) =>
import GraphDSL.Implicits._
val unzip = b.add(Unzip[In, CtxIn]())
val c = b.add(strategy(2))
val p = b.add(new Partition[Either[In, CtxIn]](outputPorts = 2, { in =>
if (in.isLeft)
0
else
1
}, eagerCancel = true))
if (dataComesFirst) {
unzip.out0.map(Left.apply) ~> c.in(0)
unzip.out1.map(Right.apply) ~> c.in(1)
} else {
unzip.out1.map(Right.apply) ~> c.in(0)
unzip.out0.map(Left.apply) ~> c.in(1)
}
c.out ~> p.in
p.out(0).map(_.asInstanceOf[Left[In, CtxIn]].value) ~> dSink.in
p.out(1).map(_.asInstanceOf[Right[In, CtxIn]].value) ~> ctxSink.in
SinkShape(unzip.in)
}))
The general idea is this, given that you have a SourceWithContext
/FlowWithContext
and you want to pipe this to a SinkWithContext
, you may want to create a SinkWithContext
from 2 already existing Sink
's.
An immediate example that comes to mind is that if you have a Kafka consumer with explicit offset comitting (i.e. Consumer.sourceWithOffsetContext(consumerSettings, subscriptions)
and for each Kafka message you want to persist it to a file and if that’s successful you would then commit the cursor. Using SinkWithContext.fromDataAndContext
this would be quite easy, i.e. you would do something like
val sourceWithContext = Consumer.sourceWithOffsetContext(consumerSettings, subscriptions)
val sinkWithContext = SinkWithContext.fromDataAndContext(FileIO.toPath(...), Committer.sink(committerSettings), Keep.none)(Concat(_))
sourceWithContext.runWith(sinkWithContext)
The purpose of Concat(_)
(i.e. the strategy
parameter) is so that the first Sink
FileIO.toPath(...)
runs before Committer.sink(committerSettings)
and hence Committer.sink(committerSettings)
will only execute if FileIO.toPath(...)
is successful.
The problem I am getting is that for some reason the above implementation doesn’t work, i.e. if I create a simple test
val dataSink = Sink.collection[Int, List[Int]]
val contextSink = Sink.collection[Int, List[Int]]
val sink = SinkWithContext.fromDataAndContext(dataSink, contextSink, (left: Future[List[Int]], right: Future[List[Int]]) => {
implicit val ec = akka.dispatch.ExecutionContexts.parasitic
for {
l <- left
r <- right
} yield left zip right
}
)(Concat(_))
val source = SourceWithContext.fromTuples(Source(List(
(1,2),
(3,4),
(5,6)
)))
source.runWith(sink).futureValue shouldEqual List(
(1,2),
(3,4),
(5,6)
)
The materialized stream from SinkWithContext.fromDataAndContext
function never finishes (the test actually times out because the Future
from source.runWith(sink)
never completes). Interestingly if I remove the entire strategy
portion from SinkWithContext.fromDataAndContext
(i.e. the Concat(_)
) then the stream completes as expected however the problem then is that its not possible to control the ordering/strategy of the dataSink
and the contextSink
(while in this specific Kafka example you may want to execute the contextSink
before the dataSink
, there can be situations where you want to do the opposite or use a strategy like Merge
if its fine for dataSink
/contextSink
to execute concurrently at the same time).
I have tried using different parameters for eagerCancel
/detachedInputs
for Concat
/Partition
and it doesn’t have an effect. From doing some initial debugging/print statements what appears to happen is the data part of the first element of the source (i.e. 1
in the test) ends up going through correctly all the way up to the data sink, but then the context part of the first element (i.e. 2
) never gets propagated through.
Note that in the above example SinkWithContext
doesn’t actually exist yet, instead you can modify the example to just deal with tuple + Sink
. Alternately you can check the in progress PR here that contains the work for SinkWithContext
which you can just checkout and run the test.