First of all, I have a server stream method. In the method, I will request an actor in a loop. I expect that after each request is responded, the application will return the result to the caller. But no, I found that it still waits for all responses to be completed before returning, is this normal?
// The code is as follows, for the convenience of observing the results, I added sleep
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
Source(1 to 10).map(index => {
Thread.sleep(5000)
HelloReply(index)
})
}
In order to confirm whether grpc itself supports, I tried java’s implementation of this piece
If you want to send data to the caller:responseObserver.onNext(responseBuilder.build());
If you want to end this call: responseObserver.onCompleted();
Data is sent to the caller every time onNext is called
So my question is:
Is my assumption correct? Can akka-grpc do it
If it can be done, please give an example
In general you should never do blocking operations in a stream operator unless you configure it to run on a separate dispatcher specifically for blocking, this also means the operator will become an async island, blocking only element throughput. Some more about the general problem in the docs here
If you for experimentation just want to delay the response you should instead use an async flow .delay which is non-blocking and allow the stream to do other work during the delay.
The gory details: Akka HTTP/gRPC “sub-materializes” the returned stream into the HTTP server logic, for performance reasons, so therefore blocking in your stream will block the entire HTTP connection stream from doing any processing.
Both Akka actors and streams are highly optimized for asynchronous logic, that’s why extra care needs to be taken to not block of a thread or handle such cases safely.
For a server streaming out, without any such blocking logic, Akka gRPC (and HTTP) will emit the first element as soon as has been produced, and it is possible to hand it off to the network. Just like you expect.
That must be the problem I use, I paste a real code
I pass the actorRef request and receive the response back to the caller
I used Await.result to extract value from future
Is it caused by using this method?
If so, how should I extract the value in the future in the response without blocking? I have read the documentation for a long time but it is not well combined with the current scene
Await.result is blocking for the future to complete, instead you should use one of the actor ask operators built into streams (see ActorFlow.ask • Akka Documentation) or do the ask in a mapAsync which will wait for the reply without blocking.