Hi all
I have the following scenario, that I do not know, how to solve it with akka stream.
I have an image, that will help to clarify what I am trying to archive.
I will receive the data from Kafka with Alpakka Kafka consumer and then would like to send the received data via Akka Streams
websocket client to the webserver.
With received data from the webserver, I would like to broadcast to two different Sink.
The difficulties are:
1.
How combine Alpakka Kafka consumer and Akka Streams websocket client togheter, that I have the control over those two client, when a shutdown will happen.
To clarify, let’s take a look at the following code snippets:
val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()
As you can see, with the variable consumerControl
, the consumer can be shutdown.
Let’s take a look at the websocket client(I just copied from the website):
object SingleWebSocketRequest {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
// and handle errors more carefully
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
The code below returns a tuple:
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
and with the variable upgradeResponse
, the response can be checked, if it was successfull or not.
The question is now, how to combine wisely Alpakka Kafka consumer and Akka Streams websocket client?
2.
How to broadcast the received data from the webserver into to different Sinks?
Thanks