I want to consume SSE events without losing any data when the rate of production is > rate of consumption. Since SSE supports backpressure Akka should be able to do it. I tried a few different ways but the extra messages are being dropped.
This code works but with the following problems:
- Due to
.take(5)
when rate of consumption < rate of production, I am dropping events. - Also I want to process each message as it comes, and don’t want to wait until 5 messages have reached. How can I do that ?
- I have to write the consumer in a while loop. This does not seem event based, rather looks like polling (very similar to calling GET with pagination and limit of 5)
- I am not sure about throttling, tried reading the docs but its very confusing. If I don’t want to lose any events, is throttling the right approach? I am expecting a rate of 5000 req / sec in peak hours and 10 req/sec otherwise. When the production rate is high I would I ideally want to apply backpressure. Is throttling the correct approach for that ? According to docs it seems correct as it says
Backpressures when downstream backpressures or the incoming rate is higher than the speed limit
@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x: HttpRequest) = {
try {
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
} catch {
case e: Exception => {
println("Exceptio12n", e.printStackTrace())
throw e
}
}
}
val eventSource2: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://xyz/a/events/user"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource2
.throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(5)
.runWith(Sink.seq)
events.map(_.foreach(x => {
// TODO: push to sqs
println("456")
println(x.data)
}))
}
Future {
blocking {
while (true) {
try {
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e: Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}