I’ve come across this issue too, or something very like it.
I had a downstream bug where my final Sink.actorRef wasn’t sending acks correctly, so nothing was actually getting to it.
But the flow was able to process around 18 HTTP requests before it crashed with the ‘not subscribed after 1 second’ message.
I can fix the downstream bug and I’ll probably never see the error again, but it raises the worry that any downstream delay that happens to occur in production could cause the flow to crash instead of backpressuring.
My intuition was that the downstream red light should have propagated upstream instantly, so that no HTTP requests should have taken place. But evidently there was a buffer between the HTTP processing and the sink. This buffer initially gave the green light for HTTP requests to start, and then that light changed to red when there were half-processed responses that didn’t have any room to proceed.
I would like to get more understanding and visibility and control of those buffers. How big are they; do they go in between every stage of the explicit flow (like corridors of a certain capacity), or are they attached to the input and output of given components; is there a way of calibrating their size?
I’ve been experimenting with cachedHostConnectionPool because I’ve been hitting max-open-requests limits when using singleRequest, and I was hoping that the flow element would just backpressure if there was any chance of going over that limit. It would be very nice to have a Flow that won’t crash, so I don’t have to write code to monitor and retry. It would be very nice not to have pull calibration numbers out of the air - to be able to just let the Flow determine its own safe rate.
Back to the response entity subscription delay… There’s a critical section where an element shouldn’t be able to enter unless it can get to the end without any chance of getting stuck. So it needs a green light both from the start of the critical section and from the end of the critical section. Can this be implemented by having an alsoTo at the start of the critical section that sends a copy of the request to the end - the documentation says that that alsoTo uses backpressure information from both destinations. Downstream of the critical section will have the requests and the responses interspersed, or maybe a way of putting it is that the request is holding a space for its response. The requests have to be filtered out at some point, but I haven’t been able to find a safe way of doing it inside the flow.
Does that reasoning work? Will it guarantee that the http requests will get through the critical section without being held up?
Sample code below. Running in openjdk11, akka 2.6.15, akka-http 10.2.4. With the alsoTo line commented out, it crashes after 18 http requests get through. With the alsoTo line included, it ran for a few minutes anyway without crashing.
import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CompletionStage;
import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.Uri;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Attributes.LogLevels;
import akka.stream.DelayOverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
public class ResponseSubscriptionIssue {
// https://discuss.akka.io/t/stream-mapasync-1-vs-single-threaded-execution-context-with-stream-mapasync-16-to-maintain-ordering/886
// https://discuss.akka.io/t/a-lot-requests-results-in-response-entity-was-not-subscribed-after/7797/5
// https://github.com/akka/alpakka/issues/2296#issuecomment-627897852
// https://github.com/akka/akka-http/issues/2120
// https://discuss.akka.io/t/difference-between-http-singlerequest-and-http-superpool/713/2
private static class CorrelatedRequest {
private final HttpRequest req;
private final int n;
CorrelatedRequest(Uri uri, int n) {
var pathAmended = uri.path() + "/" + n + "/" + LocalTime.now();
var uriAmended = uri.path(pathAmended).toString();
this.req = HttpRequest.GET(uriAmended);
this.n = n;
}
Pair<HttpRequest, Object> toPair() {
return new Pair<HttpRequest, Object>(req, n);
}
public String toString() {
return String.format("CorrelatedRequest(%d, %s)", n, req);
}
}
private static CompletionStage<String> responseBodyString(ActorSystem system,
HttpResponse httpResponse) {
long strictTimeoutMillis = Duration.ofSeconds(10).toMillis();
CompletionStage<HttpEntity.Strict> strict = httpResponse
.entity()
.toStrict(strictTimeoutMillis, system);
return strict
.thenApply(HttpEntity.Strict::getData)
.thenApply(ByteString::utf8String);
}
private static Attributes logAttributes() {
var info = Logging.InfoLevel();
LogLevels logLevels = new LogLevels(info, info, info);
return Attributes.none().and(logLevels);
}
public static void main(String[] args) {
var logAttributes = logAttributes();
var system = ActorSystem.create();
var log = Logging.getLogger(system, "test");
var uri = Uri.create("http://127.0.0.1:8080/echo.php");
var http = Http.get(system);
var sink = Flow
.of(Object.class)
// To simulate whatever downstream delay might happen to come up.
.delay(Duration.ofSeconds(10), DelayOverflowStrategy.backpressure())
.to(Sink.foreach(it -> {
log.info("SINK: {}", it);
}));
var afterCriticalSection = Flow
.of(Object.class)
.log("AFTER CRITICAL SECTION", log)
.addAttributes(logAttributes)
.to(sink);
var source = Source
.range(1, Integer.MAX_VALUE)
.map(n -> new CorrelatedRequest(uri, n))
.map(it -> (Object) it);
// Comment this out to replicate 'response entity was not subscribed'
source = source.alsoTo(afterCriticalSection);
var poolFlow = http
.cachedHostConnectionPool(uri.getHost().toString() + ":" + uri.getPort());
var criticalSection = Flow
.of(Object.class)
// .detach() // EDIT: thought this might be relevant but maybe not.
.map(it -> (CorrelatedRequest) it)
.map(CorrelatedRequest::toPair)
.via(poolFlow)
.log("RESPONSE BEFORE PROCESSING BODY", Pair::second, log)
.map(pair -> pair.first().get())
.mapAsync(1, res -> responseBodyString(system, res))
.log("RESPONSE AFTER PROCESSING BODY", log)
.addAttributes(logAttributes)
.map(it -> (Object) it);
source.via(criticalSection).runWith(afterCriticalSection, system);
}
}