I have an HTTP service that streams responses for a maximum time duration. After this timeout is reached, the stream will stop even if all desired data has not yet been retrieved. In order to obtain all the desired data, my application must use the last object returned in the stream to decide if and how to create a new request.
Presently, I have a client side Reactive Streams publisher that needs to be integrated into my Akka Streams based business logic. I am struggling to find a clean way of composing multiple streams of paged data, where one stream depends on the last output of the previous stream, into a single stream of paged data.
The following is an example of what I’m trying to do.
// A simplified response object containing all info needed for this paging algorithm
case class Page(maxTime: Timestamp, size: Int)
// This is the client side API I'd prefer to use, as developing directly in Reactive Streams is painful
def fetchFromServer(startTime: Timestamp, targetPageSize: Int): Stream[Page, NotUsed]
// This is the method the business logic would like to use
def fetch(startTime: Timestamp): Stream[Page, NotUsed] = {
val targetPageSize = 1000
// TODO: What do I do here? I know this is completely bogus.
val source = fetchFromServer(startTime, targetPageSize)
while (true) {
val lastPage = source.last
if (lastPage.size < targetPageSize) break
source.concat(fetchFromServer(lastPage.maxTime + 1, targetPageSize)
}
}
Calling the fetchFromServer
method will terminate when the server stops sending data (no more data available OR timeout); however, the client side business logic should continue requesting data from the server until Page.size < targetPageSize
.
I suspect that using a Materialized Value might provide me with the magic sauce I need here, but I can’t seem to wrap my head around how to do so in a looping or recursive fashion.