Start consuming Source(s) elements before Sink pull (Archive.zip flow)

Hello,

I’m using Alpakka Archive.zip flow.
(As reminder, here is the signature of the Flow is Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed]).

The flow is consuming inner sources one by one.
I would like to start consuming others inner sources in order to speed up the process (at least until the buffer reach a given size).

Code example:

val objectKeys: Seq[String] = ...

val zipObjects: Seq[(ArchiveMetadata, Source[ByteString, NotUsed])] = objectKeys.map { obj =>  
  val filename = trimPrefixFromKey(obj)
  ArchiveMetadata(filename) -> S3.download("myBucket", obj)
    .flatMapConcat {  
      case Some((dataSource, _)) =>
        dataSource
          .prefixAndTail(1)  
          .flatMapConcat { case (head, tail) =>  
            Source(head)  
              .wireTap(_ => logger.info("Start consuming {}", obj))  
              .concat(tail)  
          }
          /* How to start consuming let's say 50 MB ??? */ 
      case None => Source.single(ByteString.empty)
  }  
}

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = ...

val uploadResultFuture = Source(zipObjects)  
  .via(Archive.zip())   
  .runWith(s3Sink)
1 Like

Same question

Created a ticket (and PR) => Add supports for Byte-Range Fetches when downloading from S3 · Issue #2981 · akka/alpakka · GitHub

Buffering as described above is pointless.
Preferred improving download speed directly.