I am trying to use a kafka topic as a source for SSE events. It does not seem to be able to read the source and keeps closing the SSE connection. What am I missing?
val route =
path("events") {
get {
respondWithHeaders(RawHeader("Access-Control-Allow-Origin", "*")) {
complete {
Consumer
.plainSource(consumerSettings, Subscriptions.topics("my_topic"))
.map(msg => ServerSentEvent(msg.value()))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}