Merging alpakka kafka consumer with akka streams webclient

Hi all
I have the following scenario, that I do not know, how to solve it with akka stream.
I have an image, that will help to clarify what I am trying to archive.

I will receive the data from Kafka with Alpakka Kafka consumer and then would like to send the received data via Akka Streams
websocket client to the webserver.
With received data from the webserver, I would like to broadcast to two different Sink.

The difficulties are:
1.
How combine Alpakka Kafka consumer and Akka Streams websocket client togheter, that I have the control over those two client, when a shutdown will happen.
To clarify, let’s take a look at the following code snippets:

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()

As you can see, with the variable consumerControl, the consumer can be shutdown.

Let’s take a look at the websocket client(I just copied from the website):

object SingleWebSocketRequest {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // print each incoming strict text message
    val printSink: Sink[Message, Future[Done]] =
      Sink.foreach {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    val helloSource: Source[Message, NotUsed] =
      Source.single(TextMessage("hello world!"))

    // the Future[Done] is the materialized value of Sink.foreach
    // and it is completed when the stream completes
    val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] representing the stream completion from above
    val (upgradeResponse, closed) =
      Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)

    val connected = upgradeResponse.map { upgrade =>
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    // and handle errors more carefully
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

The code below returns a tuple:

val (upgradeResponse, closed) =
      Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow) 

and with the variable upgradeResponse, the response can be checked, if it was successfull or not.

The question is now, how to combine wisely Alpakka Kafka consumer and Akka Streams websocket client?

2.

How to broadcast the received data from the webserver into to different Sinks?

Thanks

Hi @bifunctor,

take a look at Http().webSocketClientFlow which returns Flow[Message, Message, ... ]. With this you will be able to continue the stream with .via(...) after converting from message received from Kafka to Message.

Simplest way to broadcast to multiple sinks is to do .alsoTo(sink1).to(sink2) where both of the sinks filter out the unwanted elements. You could also use groupBy which will give you multiple substreams and you could attach different sinks to them.

1 Like

Thanks a lot for your help. I appreciate.