Hi,
I am having trouble to consume messages from Lagom stream in akka websocket client.
First, what is working fine:
There is a Lagom 1.6.4 service with the following method:
override def tick( interval: Int ): ServiceCall[String, Source[String, NotUsed]] = {
ServiceCall { msg =>
Future.successful {
println(s"Received message: ${msg}: ${interval}")
Source.tick(
interval.milliseconds,
interval.milliseconds,
s"Hello, $msg!"
)
.mapMaterializedValue(_ => NotUsed)
}
}
}
When I try to connect and consume messages from this method via https://www.websocket.org/echo.html, it works fine:
But when I try to create an akka websocket client, the incoming messages from Lagom service are not printed out.
I have tried both SingleWebSocketRequest
or WebSocketClientFlow
from Client-Side WebSocket Support • Akka HTTP. The only change to the example I have made was change the url of the service to “ws://localhost:9001/tick/1000” and added printing of other message types println("other message type received")
.
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("Received: " + message.text)
case _ =>
println("other message type received")
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
//val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9001/tick/1000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
The only output from the client which is printed out is:
Success(Done)
closed
and no incoming message. The service log displays the Received message: hello world!: 1000
text, so message sent by client to Lagom service was received properly. I suppose that the service responded properly too, but client did not print any incoming message.
What is interesting too is that when the same client tries to send a message to ws://echo.websocket.org, it works fine and the received message from the server is printed out properly.
Obviously: the client works, the service works, but they do not work together. What could be the problem here?
Thank you for any hints.
Ales