Hello all
I am having a bit of a hard time finding a appropriate title, if anyone has a better idea after reading my problem description feel free to suggest it
Right now I am having a Source<FtpFile, ?> fileSource
that I want to transform. More specifically I want to download those files and once the download is complete I would like to continue working with the file (but now the local file). I am using Alpakka for the I/O operations. Using fileSource.flatMapConcat(file -> Sftp.fromFile(file, settings))
I can transform all files that are emitted by the Source
into a continuous stream of ByteString
. So this is not very helpful, since I need to know the boundaries of each file.
In some sort of pseudo code what I would like to do is the following:
Source<FtpFile, ?> fileSource = Source.repeat(this)
.throttle(1, frequency)
.flatMapConcat(elem -> Sftp.ls(scanFolder, settings));
// pseudo code:
// fileSource.fanOut(file -> Sftp.download(file.path(), settings))
// .fanIn(file -> FileIO.toPath(getTempFile(file.path()))
// .map(file -> Paths.get(getTempFile(file.path());
// ...
I hope it is understandable what I am trying to achieve. Right now I am always facing problems, that I have a source and a sink that has to be created for each element passing through the stream and there I always fail to create a graph, because it expects the Sink to be created when modelling the graph and not on a per element basis.
What are alternative ways to achieve this?
Thank you and BR
Yanick