Proper Akka websocket client to receive infinite incoming stream

I’m trying to connect to WebSocket incoming infinite stream as described here. I see only couple of messages

[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-C","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-C-9136793338-1711635657022","data":{"s":"ETH-29MAR24-3575-C","b":[["46.6","74.7"],["46.5","120"],["45.8","52.1"],["44.2","240"]],"a":[["51.4","120"],["52.5","47.3"],["52.8","34.7"],["53.7","52.1"],["53.8","240"],["81.2","40"],["210","0.2"],["253.2","0.2"]],"u":667407,"seq":9136793338},"cts":1711635654863})
[info] -----------------------
[info] 2024-03-28T22:20:57.023+08:00
[info] TextMessage.Strict({"topic":"orderbook.100.ETH-29MAR24-3575-P","ts":1711635657023,"type":"snapshot","id":"orderbook.100.ETH-29MAR24-3575-P-9136792306-1711635657022","data":{"s":"ETH-29MAR24-3575-P","b":[["39.4","1.6"],["39.3","214"],["38.5","140.4"],["37.2","40"],["37.1","240.8"]],"a":[["44.1","93.6"],["44.2","120.4"],["44.6","40"],["44.9","140.4"],["46.9","240.8"],["100","0.2"],["238.8","0.2"]],"u":402912,"seq":9136792306},"cts":1711635650375})

Then the stream stops. Please, help to fix it.

source code
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.scaladsl.*
import akka.http.scaladsl.model.*
import akka.http.scaladsl.settings.*
import akka.http.scaladsl.model.ws.*
import org.json4s.*
import org.json4s.native.JsonMethods.*
import com.github.nscala_time.time.Imports.{DateTime, DateTimeZone}
import scala.concurrent.duration._

import scala.concurrent.{Future, Promise}

object WebSocketClientFlow {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val msg =
      """
        |{
        |    "op": "subscribe",
        |    "args": [
        |        "orderbook.100.ETH-29MAR24-3575-C",
        |        "orderbook.100.ETH-29MAR24-3575-P"
        |    ]
        |}
        |""".stripMargin

    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    def handle(m: Message): Unit =
      m match
        case TextMessage.Strict(txt) =>
          implicit val formats = org.json4s.DefaultFormats

          val jsonVal   = parse(txt)
          val timestamp = (jsonVal \ "ts").extract[Long]
          println(s"""-----------------------
                     |${new DateTime(timestamp).withZone(DateTimeZone.forID("..."))}
                     |$m
                     |""".stripMargin)
        case x =>
          println(s"---> $x")
          ???

    val flow =
      Flow
        .fromSinkAndSourceMat(
          Sink.foreach[Message](handle),
          Source(List(TextMessage(msg))).concatMat(Source.maybe[Message])(Keep.right)
        )(Keep.right)

    val defaultSettings = ClientConnectionSettings(system)
    val customWebsocketSettings =
      defaultSettings.websocketSettings
        .withPeriodicKeepAliveData { () =>
          println("")
          ByteString(s"debug")
        }
    val customSettings =
      defaultSettings.withWebsocketSettings(customWebsocketSettings)

    val (upgradeResponse, closed) =
      Http().singleWebSocketRequest(
        request = WebSocketRequest("wss://stream.bybit.com/v5/public/option"),
        clientFlow = flow,
        log = system.log,
        settings = customSettings
      )

    val connected = upgradeResponse.map { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    while true do Thread.sleep(1000)

    connected.onComplete(x => println(s"====== $x"))
  }
}