I have no real idea about the kafka part, but from the documentation it seems plainPartitionedManualOffsetSource
is a Source[(TopicPartition, Source[ConsumerRecord, NotUsed]), NotUsed]
which could be strange bcs this source generates other sources. flatMapMerge
or mapAsync
could be a good resolver, but probably this is not what you are looking for.
About the slick part;
https://doc.akka.io/docs/alpakka/current/slick.html#flow-with-pass-through
The first function is a (A, Connection) => SQLQuery
like function, basically what SQL should you run for every element.
The second function is an (A, B) => C
where A
is the input element (same as the first function), and B
is what the SQLOutput was.
The whole Flow will emit C
s.
For a more concrete example, if A
is a Commitable message, and the first function is a simple insert, the second function will get a (Commitable, Int)
where the int is the inserted row number, and you can simply return with the Commitable message, and commit it in a mapAsync
. If A
is a product and the first function is some kind of select for the historical prices, your second function will get a (Product, List[Prices])
and could create a png image with the products price history, or return other aggregated data.