I am wondering what’s the best way for a WebSocketFlow to be connected to a Graph that can later fan out based on the stream messages. At present, I have tried the following:
val diamond = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val partition = builder.add(Partition[ReceivedSocketMsg](numPartitions, {
case ReceivedSocketMsg(_, _, _, _, _, _, sym, _, _) => coinMapper(sym)
}))
socketConfig.subscribeFilterIds.foreach { coin =>
val outFlow = builder.add(Sink.foreach[ReceivedSocketMsg](fanOut(_)))
partition ~> outFlow
}
SinkShape(partition.in)
}
val sourceGraph = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val sendStr = Source(List(TextMessage(compact(render(jsonMsg)))))
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(Sink.foreach[Message](println),
sendStr.concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
val (upgradeResponse, _) =
Http().singleWebSocketRequest(WebSocketRequest(socketUrl), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
// and handle errors more carefully
connected.onComplete(println)
sender ! "Connected"
val socketData = builder.add(flow.map(streamProcessor(_)))
SourceShape(FlowShape(socketData.in, socketData.out).out)
}
Source.fromGraph(sourceGraph).to(diamond).run()
While the helper function looks like following:
def streamProcessor(message: Message): ReceivedSocketMsg = {
message match {
case TextMessage.Strict(text) =>
parse(text).extract[ReceivedSocketMsg]
}}
(This will need to be improved using better case matches for all other cases)
I am currently getting the following error:
Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected. java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.
Any suggestion would be greatly helpful.