Websocket Reconnect/Retry - Is it Possible

Hi all,

I’m getting started with akka http websockets (from the client side), and I’ve encountered a scenario where the websocket opens and data flows… until it doesn’t. I’m not sure why but occasionally the websocket terminates. Is there a proper way to have akka http try to reconnect the websocket if it fails?

Note, this is from within an actor who’s state is the list of other actors to send data as it arrives

This is the code I have for setting up the websocket.

Any tips/suggestions are greatly appreciated.

       val incoming: Sink[Message, Future[Done]] =
         Sink.foreach[Message] {
           case message: TextMessage.Strict =>
             handleIncomingWSMessage(message.text)
           case _ =>
           // ignore other message types
         }

       val subscribeData =
         """{"action": "subscribe", "params": { "symbols": "QQQ" } }""".stripMargin

       val subscribeSource = Source(List(TextMessage(subscribeData)))
       //this sends one message on the outgoing websocket to subscribe, then waits
       val flow: Flow[Message, Message, Promise[Option[Message]]] =
         Flow.fromSinkAndSourceMat(
           incoming,
           subscribeSource.concatMat(Source.maybe[Message])(Keep.right)
         )(Keep.right)

       val (upgradeResponse, closed) =
         Http(context.system).singleWebSocketRequest(
           WebSocketRequest(
             s"wss://ws.twelvedata.com/v1/quotes/price?apikey=$apiKey"
           ),
           flow,
           settings = getClientConnectionSettings()
         )(mat)

       val connected = upgradeResponse.flatMap { upgrade =>
         if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
           Future.successful(Done)
         } else {
           throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
         }
       }

       upgradeResponse.onComplete {
         case Success(_) => {
           logger.info("Websocket connected.")
         }
         case Failure(t) => {
           logger.error("Websocket failed.", t)
         }
       }

       closed.future.map(o => {
         logger.error("Closed.") //this is what i'm seeing when it fails
         throw new IllegalStateException("Websocket failed unexpectedly")
       })

Yes it’s possible.
This blog explains, how to encapsulate akka-streams websocket clients with Actors:
see chapter: Stream Supervision

An example implementation of this concept is here:

The section which is probably interesting for you is around here:

Hope that helps
Paul

Thank you, Paul! I’ll get a nice big cup of coffee and get to studying. Appreciate it immensely!