Websocket Single JsValue Request Streamed JsValue Response

Hi,

I have been searching for 2 weeks for an example of using akka Streams to implement Websockets in Play 2.6.x I have looked at all the examples and nothing meets my usecase.

This works for a single message in and a single message out:


  def wsSearch =  WebSocket.accept[JsValue, JsValue] { request =>
    Flow[JsValue].map { json =>
      json.validate[TestCity] match {
        case s: JsSuccess[TestCity] => {
          Json.toJson(TestSearchResponse(s.value))
        }
        case e: JsError => JsError.toJson(e)
      }
    }
  }

case class TestCity ... //with implicit Json Format
case class TestSearchResponse ... //with implicit Json Format

What I need is something that returns a stream of results:

I tried this as the constructor of an akka.http.scaladsl.model.ws.TextMessage accepts a Source, but it does not work as there is no transformer for the akka Message:

import akka.http.scaladsl.model.ws.{Message, TextMessage}

val results = Array("{ \"name\": \"Five Hotel\", \"price\": 68 }", "{ \"name\": \"The Granville Hotel\", \"price\": 79 }")
val resultsOut: Source[TextMessage, NotUsed] =  Source.fromIterator{
                              () => results.toIterator.map(jsonString => TextMessage(jsonString))
                            }.throttle(1, 200.millisecond, 1, Shaping ).concat(Source.maybe)
  def wsSearch =  WebSocket.accept[Message, Message] { request =>

    Flow[Message].collect {
      case tm : TextMessage =>
        TextMessage(resultsOut)
      case _ => TextMessage("Message type unsupported")
    }
  }

could not find implicit value for parameter transformer: play.api.mvc.WebSocket.MessageFlowTransformer[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message]

I have really struggled to find many people with examples of this kind of usecase, and have found akka Streams have quite a steep learning curve.

Regards

Peter

To answer my own question use Flow.flatmapConcat do something like this:

send the websocket a json message

{ "name": "criteria", "value": "Brighton"}

Use this as the controller method

  val results = Array("{ \"name\": \"Five Hotel\", \"price\": 68 }", "{ \"name\": \"The Granville Hotel\", \"price\": 79 }")
    
  def wsSearch = WebSocket.accept[JsValue, JsValue] { request =>
    Flow[JsValue].flatMapConcat { jsVal =>
      jsVal.validate[TestCriteria] match {
        case _: JsSuccess[TestCriteria] =>
          Source.fromIterator(() => results.toIterator.map(jsonString => Json.parse(jsonString)))
        case e: JsError => Source.single(JsError.toJson(e))
      }
    }
  }

Get two messages streamed back:

{“name”:“Five Hotel”,“price”:68}
{“name”:“The Granville Hotel”,“price”:79}