I want to save each element in the stream to it’s separate file. I’m not sure if I’m missing something or there is a bug, as for the last element the file is always created but it is empty.
Not entirely sure why the LogRotatorSink does that, but it also does not sound like a perfect match for what you want to do, its purpose is rather like how system logs on Unixes work where you want to append multiple elements (lines) to the same file but also make sure the individual file does not grow unboundedly and is closed after a while so that it can be compressed and cleaned up after a while.
To write each element to a separate file, and maintain backpressure from how fast you can write said file, you could instead use something like:
Source(1 to 4)
.mapAsync(1) { n =>
val path = Path.of(s"/tmp/file${n}" + ".txt")
Source.single(ByteString(n.toString))
.toMat(FileIO.toPath(path))(Keep.right)
.run()
}.run()
Note that ByteString is not in general something representing a bounded chunk of bytes, like a line for example, and usually more like a chunk of a longer stream of bytes representing a complete thing (for example if it is a stream of bytes from the stream network operators, file operators, Akka HTTP request or response body and so on), so make sure that it makes sense in your case to think of them as such.
This is super strange… I checked the code, and it should write the content and only complete the materialized value after the file is closed… Or at least it is a really strange corner case, that we never found. I think the main problem is with the termination, after the Await.ready, you should give a bit more time to write the content to the sink before terminating. If this would solve the problem, than you found a bug… (I will check this.)
Writing with mapAsync can be faster, bcs you can tune the parallelism…