Now we need to transfer given source to Source[Map(String, Seq[String), NotUsed], where key of the map is the first value of each line of CSV file. Once we have this we need to save the values of each key-value pair in separate file (actually it needs to save in the S3) where name of the needs to be in the following format:
This is not as trivial as it should be in an ideal world
So you want:
parse the csv to some data representation
group the lines by the first field
write the groups to separate files csv
name the files with a naming convention
The problems are:
you cant write to a file inside a stream with a name given by the data (or at least its not easy)
if you want to name the file with the number of lines, then you either need to buffer all the lines to the memory (it kills the whole streaming idea), or you need to name the file after you finished the writing to it.
instead of the mergeSubStream I would close all the branches with a fileSink with a randomly generated filename
after the stream finished I would start a new stream
second stream (file naming)
get the filenames from the dir that you used as an output dir from the prev stream as a Source[String]
open the file
get the first line’s first block and count the lines (statefulMapConcat could do this)
rename the files with the gathered information
The problem with this method is; when you recieve the “stream ended” signal in the first stream, the file descriptors are not necessarly released the files at the end of the stream, so the next phase not necessary sees the whole files… You need to wait a bit between the first stream end and the second stream start. (This is why I said its the hacky solution).
If you have this solution you could try a better one. The main problem in this scenario that we not know when the filesinks finished. You could ducktape a solution with a custom filesink which not a “sink” but a flow with the materialized value given downstreams when the upstream finished. If you could build this stage, then you could use the prebild second stream with a mergeSubstream and a map, and you could easily call the uploadToS3 at the end. This is “ducktape” bcs you need to place this class to the good package hierarhy to reach internal functions, but totally doable. Maybe worth a PR to the main library or to the alpakka file connector.
For the file naming one trick you could potentially do is to combine groupBy with lazy sink, it’s a bit messy and highlights a deprecated factory method with no replacement, but I think it should work:
case class Entry(group: String, field1: String, field2: String)
val sink: Sink[Entry, NotUsed] = Flow[Entry]
.groupBy(200, entry => entry.group)
.to(Sink.lazyInit(first =>
Future.successful(
Flow[Entry].map(entry =>
ByteString(entry.toString) // make bytes out of it
).to(FileIO.toPath(Paths.get("/somewhere", first.group)))
),
() => Future.failed[IOResult](new RuntimeException("")) // won't be used so doesn't really matter
))
What is the alternative to do similar operation in Akka 2,6.+, I see documentation is referring to use Sink.lazyFutureSink’ in combination with 'Flow.prefixAndTail(1), but can’t figure out how to achieve this.
The idea is to use prefixAndTail(1) to get access to the first element, extract what you need to create the sink and put it back into the stream to be written.