I am currently developing an akka-stream/alpakka application that has the following general logic
- Given a
Flow
, split it into aSubFlow
using thesplitAfter
method. - For each of the
SubFlow
's in point 1, useprefixAndTail(1)
to create a key based on the first element of thatSubflow
- Attach each
Subflow
to aSink
that is given that key as a parameter.
So assuming we have a function defined as following which gives us a Sink
def mySink(key: String): Sink[ByteString, Future[Done]]
Essentially what I want to do is something like this
val source = ???
val subFlows = source.splitAfter(...)
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
val computedFlow: Flow[ByteString, ByteString] = ???
computedFlow
}
val finished = logic.prefixAndTail(1).to(
Sink.lazySink { (head) =>
val key = computeKey(head)
mySink(key)
}
)
This code evidently doesn’t compile since Sink.lazySink
's thunk takes 0 parameters. I tried other variations however I haven’t gotten anywhere. In the documentation for Sink.lazySink
it says that you can combine it with prefixAndTail(1)
to get the first element out of the flow presumably so you can use the first element as an input to the final Sink
which in my case is mySink(key)
(like with the combination of prefixAndTail(1).flatMapConcat
on a Flow
). The problem is that the .to
command doesn’t accept parameters (only a raw Sink
) so you end up with the following problem
val finished = logic.prefixAndTail(1).to(
Sink.lazySink { () =>
// How do I get the first element out of the sink so I can compute
// the key?
val key = computeKey(???)
mySink(key)
}
)
I tried computing the key in the logic
Flow
however then you have to compose Sink
's of two different types (one which is the key, computed from the head of the subflow and the other which is the original Sink
that accepts a ByteStream
) which doesn’t really work
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
...
val computedFlow: Flow[ByteString, ByteString] = ???
val key = computeKey(head)
computedFlow.map(something => (something, key) )
}
val finished = logic.to(
Sink.lazySink { () =>
// Here I have to return a Sink[ByteString, String] but
// mySink only returns returns Sink[ByteString].
// I essentially need to do something along the lines of Sink[String].flatMap(key => mySink(key))
// but this doesn't work because Sink's don't have an output
}
)
So hence I am kind of stuck. Since I am working with a SubFlow
I have a limited set of methods and I can’t seem to find a way to pass the key (which you compute with the first element from each SubFlow
) to pass as a parameter to mySink(key: String)
Sink.lazyInit
appears to close to what I want, I managed to get the following to compile
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
...
val computedFlow: Flow[ByteString, ByteString] = ???
val key = computeKey(head)
computedFlow.map(something => (something, key) )
}
val finished = logic.to(
Sink.lazyInit( { case (_, key} =>
Future.successful(
mySink(key).contramap[(ByteString, String)] { case (content, _) => content }
)
), ???)
)
Although the problem is that Sink.lazyInit
is deprecated and so I would prefer to use the prefixAndTail(1).to(Sink.lazySink)
combination (as stated in the depreciation message). Also I am unsure as to what the point of requiring a Future
is (and whether Future.successful
is appropriate or should I use standard Future.apply
that requires an ExecutionContext
).
Transform a CSV file into multiple CSV files using Akka Stream - #6 by ennru Seems to be related however I haven’t managed to figure out the extract what you need to create the sink
part.