Hi all,
I was looking around JSON reading & came across Alpakka. I thought I would give reading some JSON & printing it a go but nothing is printed in the console. The JSON is just a simple JSON string in the code.
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.alpakka.json.scaladsl.JsonReader
import akka.stream.scaladsl.{Broadcast, GraphDSL, JsonFraming, Keep, RunnableGraph, Sink, Source}
import akka.util.ByteString
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
object Main extends App {
implicit val actorSystem: ActorSystem = ActorSystem("custom-system")
implicit val executionContext: ExecutionContext = actorSystem.getDispatcher
val data =
"""
|[
| {
| "id": 1,
| "name": "Bulbasaur",
| "type": [
| "Grass",
| "Poison"
| ],
| "HP": 45,
| "Attack": 49,
| "Defense": 49,
| "Sp. Attack": 65,
| "Sp. Defense": 65,
| "Speed": 45
| },
| {
| "id": 2,
| "name": "Ivysaur",
| "type": [
| "Grass",
| "Poison"
| ],
| "HP": 60,
| "Attack": 62,
| "Defense": 63,
| "Sp. Attack": 80,
| "Sp. Defense": 80,
| "Speed": 60
| }
|]
|""".stripMargin
Source.single(ByteString.fromString(data))
.via(JsonReader.select("$.rows[*].name"))
.via(JsonFraming.objectScanner(100))
.map(_.utf8String)
.runForeach(x => println(x))
.map { _ =>
println("Finished !")
actorSystem.terminate()
}
.recover {
case e: Exception => println(s"Failed ! ${e.getMessage}")
actorSystem.terminate()
}
}
The program ends with Finished !
& everything closes but nothing is printed to console, but I can’t see why to be honest.
So I thought I would try a different approach:
val source = Source.single(data)
val printSink = Sink.foreach[String](println)
val transformPrintSink = JsonReader.select("$.rows[*].name")
.map(byteString => byteString.utf8String)
.toMat(printSink)(Keep.right)
val graph = GraphDSL.createGraph(transformPrintSink) { implicit builder =>
(console) =>
import GraphDSL.Implicits._
val broadCast = builder.add(Broadcast[String](1))
source ~> broadCast ~> console
ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()
materialized.onComplete {
case Success(_) =>
actorSystem.terminate()
case Failure(e) =>
println(s"Failure: ${e.getMessage}")
actorSystem.terminate()
}
Now if I use the GraphDSL.createGraph(printSink)
then the entirety of the JSON is printed.
However using transformPrintSink
I get a compilation error:
overloaded method ~> with alternatives:
(to: akka.stream.SinkShape[String])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit <and>
(to: akka.stream.Graph[akka.stream.SinkShape[String], _])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit <and>
[Out](flow: akka.stream.FlowShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanOutShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanInShape[String,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](via: akka.stream.Graph[akka.stream.FlowShape[String,Out],Any])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[U >: String](to: akka.stream.Inlet[U])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_]): Unit
cannot be applied to (akka.stream.SinkShape[akka.util.ByteString])
source ~> broadCast ~> console
Is this essentially telling me that I can’t connect a Sink that takes in a ByteString as it’s input to the graph because of some reason?
The reason that I wrote it the way that I have the second time around was because I was going to extend it to try & write the result of the transformation to a file as well as printing the transformed JSON to console, but that will be for another time after this.
Is there a reason there was no printing with the first attempt I had made at reading the JSON?
Regards.