Hi Arnout,
Actually i have tried before, and also didn’t work
Here is the complete sample code that i use:
AkkaHttpClient.scala
import java.net.URI
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpMethods, _}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContextExecutor, Future, Promise}
import scala.util.{Failure, Success, Try}
class AkkaHttpClient(uri: String, successPredicate: HttpResponse => Boolean) {
var retryable = true
var maxRetryAttempts = 3
private val MAX_QUEUE_SIZE = Int.MaxValue
private val RUN_FUTURE_FIRST_ATTEMPT = 1
private val parsedUri = new URI(uri)
private implicit val system: ActorSystem = ActorSystem()
private implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)
private implicit val executionContext: ExecutionContextExecutor = system.dispatcher
private var terminated = false
private val connectionPool = createCachedConnectionPool(parsedUri)
private val sourceQueue = createSourceQueue(connectionPool)
def submitBody(body: String,
method: HttpMethod = HttpMethods.POST,
contentType: ContentType.NonBinary = ContentTypes.`application/json`) {
val request = HttpRequest(method, uri, entity = HttpEntity(contentType, body))
val responseFuture: Future[HttpResponse] = queueRequest(request)
if (retryable)
retryFuture(responseFuture)
}
def shutdown(timeout: FiniteDuration) {
sourceQueue.complete()
try {
// Does not awaits :(
Await.result(sourceQueue.watchCompletion(), timeout)
} catch {
case _: Throwable =>
} finally {
terminated = true
system.terminate()
}
}
private def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
sourceQueue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}
def retryFuture(target: Future[HttpResponse]): Unit = {
retryFuture(target, RUN_FUTURE_FIRST_ATTEMPT)
}
private def retryFuture(target: Future[HttpResponse], attempt: Int): Future[HttpResponse] = {
target.recoverWith {
case _ =>
if (!terminated && attempt <= maxRetryAttempts) {
println("retrying..")
retryFuture(target, attempt + 1)
}
else {
println("failed (again)!")
Future.failed(new Exception)
}
}
}
private def createCachedConnectionPool(uri: URI): Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = {
if (uri.getScheme == "http")
Http().cachedHostConnectionPool[Promise[HttpResponse]](uri.getHost, if (uri.getPort == -1) 80 else uri.getPort)
else
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](uri.getHost, if (uri.getPort == -1) 443 else uri.getPort)
}
private def createSourceQueue(connectionPool: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool])
: SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = {
Source.queue[(HttpRequest, Promise[HttpResponse])](MAX_QUEUE_SIZE, OverflowStrategy.dropNew)
.via(connectionPool)
.toMat(Sink.foreach({
case (Success(resp), p) =>
if (successPredicate.apply(resp)) {
println("completed!")
p.success(resp)
}
else {
println("really failed!")
p.failure(new Exception("REsult is not satisfied: " + resp.toString()))
}
case (Failure(e), p) => p.failure(e)
}))(Keep.left)
.run()
}
}
AkkaHttpClientTest.scala
import akka.http.scaladsl.model.StatusCodes
import org.apache.commons.lang3.time.StopWatch
import org.scalatest.FunSuite
import scala.concurrent.duration._
class AkkaHttpClientTest extends FunSuite {
test("make a few requests!") {
val stopWatch = StopWatch.createStarted()
val client = new AkkaHttpClient("https://akka.io", resp => resp.status == StatusCodes.Accepted)
(1 to 10).foreach(number => {
client.submitBody(s"test $number!")
})
client.shutdown(2.minutes)
println(s"Time passed: ${stopWatch.getTime} ms")
}
}
When i execute the test, it does not awaits queue completion
test 1!
test 2!
test 3!
test 4!
test 5!
test 6!
test 7!
test 8!
test 9!
test 10!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
really failed!
retrying..
retrying..
retrying..
failed (again)!
Time passed: 3158 ms
really failed!
failed (again)!
Process finished with exit code 0
There are 6 matches of failed!
at the log, but there are 10 requests Result is random
(edit: For a note, disabling retry mechanism does not change the result)