I have two microservices running play. Now I need to consume a stream of json from one application (Source-App) in the other (Sink-App)
In the Source-App I have code like this:
val chunked = items
.map(Json.toJson(_))
.map(json => ByteString(json.toString()) ++ ByteString("\n"))
Ok.chunked(chunked).as("application/stream+json")
and in the Sink-App, I’m trying to consume the data like this:
val source: Source[JsObject, NotUsed] = Source
.fromFuture(
wSClient
.url(s"${config.endpoint}/v1/companies/${companyId.id}/userLevels")
.addHttpHeaders(
("Accept", "application/stream+json")
)
.get()
.map(resp => resp.bodyAsSource))
.flatMapConcat(s => s)
.map(bs => bs.decodeString("UTF-8"))
.map(s => {
s // here now instead of getting multiple ByteString elements, its just one element already concatenated
})
.map(Json.parse(_)
Is there something wrong in my thinking? It looks like the AhcWsClient is buffering the chunks and concatenating them together. Would akka-http solve this and give me access to the raw stream?