I am trying to process HTTP requests in a way that will not result in
Exceeded configured max-open-requests value of [32]
My idea for this is to use a cachedHostConnectionPool
that will open a limited amount of connections, and while the connections are in use, the other queries will be buffered. I will pass requests to the flow via a Source.queue
My code currently looks like this -
private Source<Pair<HttpRequest, Object>, SourceQueueWithComplete<Pair<HttpRequest, Object>>> sourceQueue = Source.queue(4, OverflowStrategy.backpressure(), 4);
private final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow
= Http.get(system).cachedHostConnectionPool(host);
private final Materializer materializer = SystemMaterializer.get(system).materializer();
private SourceQueueWithComplete<Pair<HttpRequest, Object>> queue = sourceQueue.via(flow).toMat(Sink.foreach(p -> {
// processing ?
}), Keep.left()).run(materializer);
My question is how should I process the requests and get a response out of it? Particularly, what should I be doing here -
(Sink.foreach(p -> {
// processing ?
})
If I have understood correctly, the Sink.foreach
construct is responsible for consuming the elements that are queued, so the processing, i.e getting the HttpResponse should be completed here? If someone can provide an example based off this - that would be very helpful.
Here, p.first()
gives a Try<HttpResponse>
, So is this the result of the HttpRequest
that is offered to the queue? I’m unsure of this as well.