AWS S3 multipart upload with Akka Http does not upload the full file

I am uploading a large file to S3 using akka HTTP with AWS SDK low level API.

Routes:

 withoutRequestTimeout {
            withoutSizeLimit {
              fileUpload("file") {
                case (metadata, byteSource) =>
                  logger.info(s"Request received for file: ${metadata.fileName}")
                  val stream = byteSource.runWith(StreamConverters.asInputStream())

                  val uploadFuture = Future {
                    val clinet = S3Util.getClient("region")
                    S3Utility.writeStreamToBucket(clinet, metadata.fileName, "test", stream)
                  }
                  onComplete(uploadFuture) {
                    case Success(result) =>
                      logger.info(s"Successfully uploaded")
                      complete(StatusCodes.OK)
                    case Failure(ex) =>
                      println(ex, "Error in uploading file")
                      complete(StatusCodes.FailedDependency, ex.getMessage)
                  }
              }
            }
          }

S3Utility:

 public static void writeStreamToBucket(S3Client client,
                                           String filePath,
                                           String bucketName,
                                           InputStream is) {
        CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
                .bucket(bucketName)
                .key(filePath)
                .build();

        CreateMultipartUploadResponse multipartUpload = client.createMultipartUpload(createMultipartUploadRequest);
        String uploadId = multipartUpload.uploadId();

        List<CompletedPart> completedParts = new ArrayList<>();
        int partNumber = 1;
        byte[] buffer = new byte[5 * 1024 * 1024];
        try {
            int bytesRead = is.read(buffer);

            // Divide the input into 5MB chunks and upload
            while (bytesRead > 0) {

                UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
                        .bucket(bucketName)
                        .key(filePath)
                        .uploadId(uploadId)
                        .partNumber(partNumber)
                        .build();

                ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
                RequestBody requestBody = RequestBody.fromByteBuffer(byteBuffer);
                String etag = client.uploadPart(uploadPartRequest, requestBody).eTag();

                CompletedPart completedPart = CompletedPart.builder()
                        .partNumber(partNumber)
                        .eTag(etag)
                        .build();
                completedParts.add(completedPart);

                bytesRead = is.read(buffer);
                partNumber += 1;
            }

            CompletedMultipartUpload completedMultipartUpload =
                    CompletedMultipartUpload.builder()
                            .parts(completedParts)
                            .build();

            CompleteMultipartUploadRequest completeMultipartUploadRequest =
                    CompleteMultipartUploadRequest.builder()
                            .bucket(bucketName)
                            .key(filePath)
                            .uploadId(uploadId)
                            .multipartUpload(completedMultipartUpload)
                            .build();

            client.completeMultipartUpload(completeMultipartUploadRequest);
        } catch (IOException e) {

            // cancel the upload
            AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder()
                    .bucket(bucketName)
                    .uploadId(uploadId)
                    .key(filePath)
                    .build();
            client.abortMultipartUpload(abortMultipartUploadRequest);

            throw new RuntimeException("Could not upload file to the server. Exception: " + e.getMessage());
        }

The above code uploads only few parts of the file, not the complate parts. Looks like akka http is sending data bit slow whie the S3Utility consumes data fast and once stream becomes empty, bytesRead > 0 condition becomes false and it skips the remaining elements.

To add some hack, I just added the logic to write stream to file first and then read that file into InputStream and then used that file stream, then it uploaded full file. Writing to file code waits to complete the akka stream

So can you let me know how can I make this code work without hack?

Thanks

Hi Rishi,

Are you aware that Alpakka offers great support to connect Akka Streams to AWS S3?
https://doc.akka.io/docs/alpakka/snapshot/s3.html

Regards,
Enno.

Hi Ennru,

Yes, I am aware of it and already have given a try. It is working like a charm. But I want to test it with AWS SDK low level APi for which I am facing issues…

Regards,
Rishi