Hi all,
I’m getting started with akka http websockets (from the client side), and I’ve encountered a scenario where the websocket opens and data flows… until it doesn’t. I’m not sure why but occasionally the websocket terminates. Is there a proper way to have akka http try to reconnect the websocket if it fails?
Note, this is from within an actor who’s state is the list of other actors to send data as it arrives
This is the code I have for setting up the websocket.
Any tips/suggestions are greatly appreciated.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
handleIncomingWSMessage(message.text)
case _ =>
// ignore other message types
}
val subscribeData =
"""{"action": "subscribe", "params": { "symbols": "QQQ" } }""".stripMargin
val subscribeSource = Source(List(TextMessage(subscribeData)))
//this sends one message on the outgoing websocket to subscribe, then waits
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
incoming,
subscribeSource.concatMat(Source.maybe[Message])(Keep.right)
)(Keep.right)
val (upgradeResponse, closed) =
Http(context.system).singleWebSocketRequest(
WebSocketRequest(
s"wss://ws.twelvedata.com/v1/quotes/price?apikey=$apiKey"
),
flow,
settings = getClientConnectionSettings()
)(mat)
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
upgradeResponse.onComplete {
case Success(_) => {
logger.info("Websocket connected.")
}
case Failure(t) => {
logger.error("Websocket failed.", t)
}
}
closed.future.map(o => {
logger.error("Closed.") //this is what i'm seeing when it fails
throw new IllegalStateException("Websocket failed unexpectedly")
})