Hi,
I am trying to use source.queue with cachedHostConnectionPool in java but not able to get the correct output. Below is my attempt at it. I am always getting success response, which i understand why but not sure what is the right way to do it.
// Method to return queue
public SourceQueueWithComplete<Object> httpGetRequestWithPool
(Materializer materializer) {
final Http http = Http.get(system);
final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow;
flow = http.cachedHostConnectionPool(ConnectHttp.toHost("localhost:9091"));
final SourceQueueWithComplete<Object> queue= Source.queue(3,OverflowStrategy.dropNew())
.map(d -> Pair.create(HttpRequest.create("/itm/ee/er/events/"),d))
.via(flow)
.toMat(Sink.foreach(p -> p.first()), Keep.left())
.run(materializer);
return queue;
}
// Here i am trying to use.
SourceQueueWithComplete<Object> queue = util.httpGetRequestWithPool(materializer);
Source<Object, NotUsed> source = Source.from(ListOfObject);
source.mapAsyncUnordered(3,x-> {
String url = util.getServiceUrl("servicename").append(x.getId).toString();
HttpRequest httpRequest = HttpRequest.create(url);
Try<HttpResponse> promise = Try.apply(()->HttpResponse.create());
queue.offer(Pair.create(httpRequest,promise))
.thenCompose(result -> {
if(result instanceof QueueOfferResult.Enqueued$){
return CompletableFuture.completedFuture(promise)
.thenApply(res ->{
if(res.get().status().intValue()==200){
System.out.println("success");
}
return res;
});
}
else{
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
}
});
return null;
}).run(materializer);