Hi,
I would like to address issue described in https://github.com/akka/akka-http/issues/1447 where in my setup S3 <- Application <- User i would like to have retries for PUT requests from Application to S3.
If i run application with default settings for retries in HTTP client i get “Substream Source cannot be materialized more than once”, if i disable retries error is normally either IOException broken pipe or UnexpectedConnectionClosureException in both cases upload fails and error is propagated to user. I would like to have retries on application side as well.
When using AWS SDK it accepts InputStream for upload requests, if input stream supports mark-and-reset read limit, i.e. can buffer some data into memory and if on error buffer was big enough it can reset input stream and retry upload once again.
I was looking to have something similar with akka stream sources, but so far failing to come up with working solutions.
I tried with prefixAndTail which is not working as when requests failing error/cancel even propagates within whole graph and i see same exceptions.
Another option i tried is creating custom Source stage with SubSinkInlet which would read from request entity, on error sub-sink is not receiving cancelation event, but i cannot find a way how to recover from source`s out port getting cancel event.
I.e something like this:
(path("postData") & post) {
withoutRequestTimeout {
extractRequestEntity { entity =>
val resettableSource = new ResettableSource(maximumRetries = 1, readLimit = 10 * 1024 * 1024, originalSource = entity.dataBytes)
complete {
retry {
val dataSource: Source[ByteString, Any] = resettableSource.reset()
Http().singleRequest(
HttpRequest(
method = HttpMethods.PUT,
uri = Uri("http://localhost:8081/putData"),
entity = HttpEntity(entity.contentType, dataSource)
)
)
}
}
}
}
Maybe somebody has any recommendations how i can implement such feature or hint i can try next?
Thanks,
Kyrylo