Hi, I am new to akka-grpc and have a question on streaming API.
In the below example, I am trying to return Source[HelloReply, NotUsed]
as a response for the API.
service GreeterService {
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
}
Since new messages will be offered to the Source, I am trying to use Source.queue
and return it(Source[HelloReply, NotUsed])
) to the client.
val (queue, source): (SourceQueueWithCompleteAndSize[HelloReply],
Source[HelloReply, NotUsed]) = Source
.queue[HelloReply](bufferSize, OverflowStrategy.backpressure)
.toMat(
PartitionHub.sink((size, elem) ⇒ math.abs(elem.hashCode % size),
startAfterNrOfConsumers = 1,
bufferSize = 256))(Keep.both)
.run()
The problem is, there could be multiple queues and I want to create a corresponding actor for each queue to wrap it.
Then the GRPC service implementation class will have the pool(Map
) of those actors and “ask” the Source
to each actor.
override def ItKeepsReplying(
request: HelloRequest): Source[HelloReply, NotUsed] = {
(someActor ? Fetch(request))
.mapTo[...] // this will be Future
...
But the return type of “ask” API is Future
. No matter how many transformations I performed, the final type would be Future(internal type would change over transformation).
Future[Message] => Future[Intermediate] => Future[HelloReply]
But as per the proto spec, what I need is Source
.
override def ItKeepsReplying(
request: HelloRequest): Source[HelloReply, NotUsed]
I looked into Source.fromFuture
, Source.fromFutureSource
, or Source.actorRef
but it seems neither of them fit with this situation.
Is there any good way to provide streaming API using Actor?