Hello,
I’m using Alpakka Archive.zip
flow.
(As reminder, here is the signature of the Flow is Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed]
).
The flow is consuming inner sources one by one.
I would like to start consuming others inner sources in order to speed up the process (at least until the buffer reach a given size).
Code example:
val objectKeys: Seq[String] = ...
val zipObjects: Seq[(ArchiveMetadata, Source[ByteString, NotUsed])] = objectKeys.map { obj =>
val filename = trimPrefixFromKey(obj)
ArchiveMetadata(filename) -> S3.download("myBucket", obj)
.flatMapConcat {
case Some((dataSource, _)) =>
dataSource
.prefixAndTail(1)
.flatMapConcat { case (head, tail) =>
Source(head)
.wireTap(_ => logger.info("Start consuming {}", obj))
.concat(tail)
}
/* How to start consuming let's say 50 MB ??? */
case None => Source.single(ByteString.empty)
}
}
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = ...
val uploadResultFuture = Source(zipObjects)
.via(Archive.zip())
.runWith(s3Sink)