Connect akka websocket flow to a Graph

I am wondering what’s the best way for a WebSocketFlow to be connected to a Graph that can later fan out based on the stream messages. At present, I have tried the following:

val diamond = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

    val partition = builder.add(Partition[ReceivedSocketMsg](numPartitions, {
      case ReceivedSocketMsg(_, _, _, _, _, _, sym, _, _) => coinMapper(sym)
    }))

    socketConfig.subscribeFilterIds.foreach { coin =>
      val outFlow = builder.add(Sink.foreach[ReceivedSocketMsg](fanOut(_)))
      partition ~> outFlow
    }

    SinkShape(partition.in)
  }

  val sourceGraph = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

    val sendStr = Source(List(TextMessage(compact(render(jsonMsg)))))
    val flow: Flow[Message, Message, Promise[Option[Message]]] =
      Flow.fromSinkAndSourceMat(Sink.foreach[Message](println),
        sendStr.concatMat(Source.maybe[Message])(Keep.right))(Keep.right)

    val (upgradeResponse, _) =
      Http().singleWebSocketRequest(WebSocketRequest(socketUrl), flow)

    val connected = upgradeResponse.map { upgrade =>
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    // and handle errors more carefully
    connected.onComplete(println)
    sender ! "Connected"

    val socketData = builder.add(flow.map(streamProcessor(_)))

    SourceShape(FlowShape(socketData.in, socketData.out).out)
  }

  Source.fromGraph(sourceGraph).to(diamond).run()

While the helper function looks like following:

def streamProcessor(message: Message): ReceivedSocketMsg = {
  message match {
  case TextMessage.Strict(text) => 
    parse(text).extract[ReceivedSocketMsg]
}}

(This will need to be improved using better case matches for all other cases)

I am currently getting the following error:

Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected. java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.

Any suggestion would be greatly helpful.

1 Like

Hi Syed,
You shouldn’t do this

SourceShape(FlowShape(socketData.in, socketData.out).out)

You need to pipe the sockedData.in to something and just return the SourceShape withput creating a FlowShape.

val someSource = builder.add(Source.single(1))
someSource ~> sockedData
SourceShape(sockedData.out)

This answer is just about your error message.

1 Like