I have the following problem quite often:
I have a type A and a function that transform it to type B (but a transformation back is not possible).
Then I want to run a Flow<B,B,NotUsed> (e.g. writing something to a database) but then I want to proceed with type A. I need the guarantee that the further step (for that element in the stream) will be executed after the Flow<B,B,NotUsed> is already executed. Therefore source.alsoTo() is not an option.
I’m aware, that I could use a Pair, but then the Flow<B,B,NotUsed> must be something like Flow<Pair<A,B>,Pair<A,B>,NotUsed>. That seems for me too complicated.
Also, a mapAsync could solve the problem:
source()
.mapAsync(parallelism, a -> Source.single(toB(a)).via(flow()).runWith(Sink.head(),...).thenApply(__ -> a))
That’s still not a good solution because I start a stream just for one element.
SourceWithContext<B, A, Object> withContext =
source()
.asSourceWithContext(a -> a) // make the incoming A the context
.map(toB) // ...and the B the element
Source<A, NotUsed> =
withContext
.map(b -> doStuffWithB())
.asSource()
.map(Pair::second)
The Pairing of A and B within the SourceWithContext is hidden; since the common use-case for this will have restrictions on reordering of elements, none of the operations which can reorder elements are directly supported: you can use a via to a flow from pairs to pairs and handle context propagation yourself.