I have come across an interesting usecase in regards to stream boundaries and detection of eof
. In context I am dealing with is using an asynchronous JSON parser that is specifically designed to lazily parse elements of a JSON stream. I am using https://github.com/mdedetrich/akka-streams-json to do this but it solves the same problem as Alpakka’s own JSON framing library at JSON • Alpakka Documentation (it just uses Jawn/Circe underneath for the parsing/serializing).
Akka-streams-json uses jawn under the hood which also supports asynchronous parsing (see https://github.com/typelevel/jawn#parsing). In my case I am dealing with streams of JSON that contains a massive JSON array so if you want to parse each element into the JSON array using akka-streams-json you would use AsyncParser.UnwrapArray
i.e.
val endingNullArrayJson = """[1,2,3,null]"""
"AsyncParser" should {
val entity = Source.single(ByteString(endingNullArrayJson))
"produce all values" in {
val parsed = entity.via(decode[Option[Int]](AsyncParser.UnwrapArray)).runWith(Sink.seq)
parsed.map {
_ shouldBe Seq(Some(1),
Some(2),
Some(3),
None
)
}
}
}
This code works as expected however lets say we are dealing with a Source
that contains multiple JSON
arrays and the Source
correctly emits a single JSON array every time, i.e.
val endingNullArrayJson = """[1,2,3,null]"""
"AsyncParser" should {
val entity = Source(List.fill(3)(ByteString(endingNullArrayJson)))
"produce all values" in {
val parsed = entity.via(decode[Option[Int]](AsyncParser.UnwrapArray)).runWith(Sink.seq)
parsed.map {
_ shouldBe Seq(Some(1),
Some(2),
Some(3),
None,
Some(1),
Some(2),
Some(3),
None,
Some(1),
Some(2),
Some(3),
None
)
}
}
}
This then fails to correctly run, producing the following error
expected eof got '[1,2,3...' (line 1, column 13)
org.typelevel.jawn.ParseException: expected eof got '[1,2,3...' (line 1, column 13)
at org.typelevel.jawn.Parser.die(Parser.scala:131)
at org.typelevel.jawn.Parser.die(Parser.scala:89)
at org.typelevel.jawn.AsyncParser.churn(AsyncParser.scala:191)
at org.typelevel.jawn.AsyncParser.absorb(AsyncParser.scala:98)
at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic.emitOrPullLoop(JsonStreamParser.scala:107)
at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic.org$mdedetrich$akka$json$stream$JsonStreamParser$ParserLogic$$upstreamPush(JsonStreamParser.scala:88)
at org.mdedetrich.akka.json.stream.JsonStreamParser$ParserLogic$$anon$2.onPush(JsonStreamParser.scala:80)
Which points to here jawn/AsyncParser.scala at 8d802a67cd0e7fe45284d514597eac5d7ae78034 · typelevel/jawn · GitHub
Herein lies the core issue which is that when you use AsyncParser.UnwrapArray
with Jawn it strictly expects there to only be a single JSON Array per bytestream and so when you reach the end of the JSON array (i.e. just after ]
) it expects you hit the EOF case using atEof
( jawn/AsyncParser.scala at 8d802a67cd0e7fe45284d514597eac5d7ae78034 · typelevel/jawn · GitHub). In other words the bytestream actually needs to be finished to trigger the EOF case which is done by detecting if the current cursor index ends up being greater than the the current length of bytes. What is happening above instead is that a [
(which is the start of the second JSON Array) is being sent after the ]
.
Note that if you parse this example without using the Framing/lazy parsing then it works fine i.e.
val endingNullArrayJson = """[1,2,3,null]"""
"AsyncParser" should {
val entity = Source(List.fill(3)(ByteString(endingNullArrayJson)))
"produce all values" in {
val parsed = entity.via(decode[List[Option[Int]]]).runWith(Sink.seq)
parsed.map {
_ shouldBe Seq(Some(1),
Some(2),
Some(3),
None,
Some(1),
Some(2),
Some(3),
None,
Some(1),
Some(2),
Some(3),
None
)
}
}
}
So my question here is about how to handle this and what the expectations are? One can argue that Jawn is being “correct” here because when you using AsyncParser.UnwrapArray
it is really expecting one single valid JSON element per stream and so the only way that Jawn knows that the byte array is finished is by eof. The annoying thing about this reasoning is the usability implications for this. For example the above example is a minimization of a real usecase where I want to parse large S3 objects each containing a JSON array one after another. The initial implementation looked like this
def restore: Future[Done] =
Source
.future(finalKeys)
.flatMapConcat(Source.apply)
.via(downloadFlow)
.via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray))
.collect {
case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
.runWith(kafkaProducerInterface.getSink)
Nothing terribly fancy here, finalKeys
is just a Future[List[String]]
which is a collection of the S3 keys and for each S3
key we want to download it (with downloadFlow
) and then parse the JSON. This however doesn’t work because of the previously mentioned problem so now I have to do this
object Utils {
def runSequentially[A](
lazyFutures: List[() => Future[A]]
)(implicit ec: ExecutionContext): Future[List[A]] =
lazyFutures.foldLeft(Future.successful(List.empty[A])) { (acc, curr) =>
for {
a <- acc
c <- curr()
} yield c :: a
}
}
def restoreKey(key: String): Future[Done] = Source
.single(key)
.via(downloadFlow)
.via(CirceStreamSupport.decode[Option[ReducedConsumerRecord]](AsyncParser.UnwrapArray))
.collect {
case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
.runWith(kafkaProducerInterface.getSink)
def restore: Future[Done] = {
implicit val ec: ExecutionContext = system.dispatcher
for {
keys <- finalKeys
_ <- Utils.runSequentially(keys.map(key => () => restoreKey(key)))
} yield Done
}
Which is undesirable since I would ideally want to treat this as an entire single flow (at least to me this looks like a workaround because you shouldn’t need to materialize the stream multiple times).
Another solution would be to somehow intersperse
the ByteString
Source
directly with an eof
so that Jawn’s atEof
gets triggered however I don’t think this is possible right now? Or is the better solution for this is to handle the case explicitly in akka-streams-json as an edge case for AsyncParser.UnwrapArray
in akka-streams-json/JsonStreamParser.scala at master · mdedetrich/akka-streams-json · GitHub i.e. manually terminate the Stream in the GraphStageLogic
?
Finally I guess it would be ideal to mimic the behavior of what Alpakka JSON framing (JSON • Alpakka Documentation) does for consistency reasons. Also if there are any code examples of dealing with the same issue that would be greatly appreciated!