I am uploading a large file to S3 using akka HTTP with AWS SDK low level API.
Routes:
withoutRequestTimeout {
withoutSizeLimit {
fileUpload("file") {
case (metadata, byteSource) =>
logger.info(s"Request received for file: ${metadata.fileName}")
val stream = byteSource.runWith(StreamConverters.asInputStream())
val uploadFuture = Future {
val clinet = S3Util.getClient("region")
S3Utility.writeStreamToBucket(clinet, metadata.fileName, "test", stream)
}
onComplete(uploadFuture) {
case Success(result) =>
logger.info(s"Successfully uploaded")
complete(StatusCodes.OK)
case Failure(ex) =>
println(ex, "Error in uploading file")
complete(StatusCodes.FailedDependency, ex.getMessage)
}
}
}
}
S3Utility:
public static void writeStreamToBucket(S3Client client,
String filePath,
String bucketName,
InputStream is) {
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(filePath)
.build();
CreateMultipartUploadResponse multipartUpload = client.createMultipartUpload(createMultipartUploadRequest);
String uploadId = multipartUpload.uploadId();
List<CompletedPart> completedParts = new ArrayList<>();
int partNumber = 1;
byte[] buffer = new byte[5 * 1024 * 1024];
try {
int bytesRead = is.read(buffer);
// Divide the input into 5MB chunks and upload
while (bytesRead > 0) {
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(filePath)
.uploadId(uploadId)
.partNumber(partNumber)
.build();
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
RequestBody requestBody = RequestBody.fromByteBuffer(byteBuffer);
String etag = client.uploadPart(uploadPartRequest, requestBody).eTag();
CompletedPart completedPart = CompletedPart.builder()
.partNumber(partNumber)
.eTag(etag)
.build();
completedParts.add(completedPart);
bytesRead = is.read(buffer);
partNumber += 1;
}
CompletedMultipartUpload completedMultipartUpload =
CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(filePath)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
.build();
client.completeMultipartUpload(completeMultipartUploadRequest);
} catch (IOException e) {
// cancel the upload
AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.uploadId(uploadId)
.key(filePath)
.build();
client.abortMultipartUpload(abortMultipartUploadRequest);
throw new RuntimeException("Could not upload file to the server. Exception: " + e.getMessage());
}
The above code uploads only few parts of the file, not the complate parts. Looks like akka http is sending data bit slow whie the S3Utility consumes data fast and once stream becomes empty, bytesRead > 0 condition becomes false and it skips the remaining elements.
To add some hack, I just added the logic to write stream to file first and then read that file into InputStream and then used that file stream, then it uploaded full file. Writing to file code waits to complete the akka stream
So can you let me know how can I make this code work without hack?
Thanks