Overview
I have two microservices called warehouse-microservice (server) & grocery-microservice (client). And I’m trying to implement a streaming request & streaming response between them using the akka-grpc library. But I’m facing a problem while fetching the streaming response in my grocery-microservice (client). Below is the client code description.
Problem Description:
I have a variable called responseStream
of Source type onto which I’m accumulating a few elements. And then I want to return a future of the elements which are present inside this Source.
Below is the snippet of my function:
def streamingBroadcast(itemIds: List[Long]): Future[List[ItemStockAvailabilityDto]] = {
logger.debug("Performing streaming requests")
val itemIter = itemIds.toIterator
val requestStream: Source[CheckStockRequest, NotUsed] =
Source
.fromIterator(() => itemIter)
.map { itemId => CheckStockRequest(itemId, 1) }
.mapMaterializedValue(_ => NotUsed)
val responseStream: Source[CheckStockReply, NotUsed] = client.checkStockToAll(requestStream)
responseStream.runFoldAsync(List.empty[ItemStockAvailabilityDto])((acc, item) => {
logger.debug(s"got streaming reply for id: ${item.itemId} and qty: ${item.qtyAvailable}")
Future.successful(acc :+ ItemStockAvailabilityDto(item.itemId, item.qtyAvailable))
})
}
And then I have another method called getQuote
from where I call the streamingBroadcast
method which looks something like this -
def getQuote(itemIds: List[Long], quantityRequested: Int): Future[List[GroceryItemAvailabilityDto]] = {
streamingBroadcast(itemIds) map {
case stockList: List[ItemStockAvailabilityDto] if stockList.nonEmpty =>
logger.debug("we have some results for items " + stockList.map(_.itemId).mkString(","))
stockList map { stock =>
catalog.get(stock.itemId) match {
case Some(item) =>
val amt = if (stock.quantityAvailable < quantityRequested) stock.quantityAvailable else quantityRequested
val msg = if (amt < quantityRequested) Some("We are unable to supply all you requested") else Some("We await your order")
GroceryItemAvailabilityDto(stock.itemId, item.name, item.price, item.price * amt, quantityRequested, stock.quantityAvailable, msg)
case None => throw new RuntimeException("item not found in warehouse: " + stock.itemId)
}
}
case Nil => throw new RuntimeException("none of the items were found in warehouse: " + itemIds.mkString(","))
}
}
Problem
The problem I’m facing is that the code execution is not returning me any values from the getQuote
method. And it is just hanging inside getQuote
maybe & another waiting actor is timing out. Maybe because of the pattern matching inside that method? I also tried debugging the code to make sure that there are at least a few elements present inside the list.
Is there something that I’m doing wrong in creating a responseStream
or accessing the elements in it? Please let me know if you need anymore information or the code snippet from the warehouse microservice (server).