Greetings!
I have an http endpoint that I want to protect against bursts of requests so I want to implement an initial solution based on the akka stream throttle.
However, I can’t find a way to obtain a response for each event emitted at Source, it seems that the request/response style doesn’t naturally fit with akka streams.
For example, I’ve tried with MergeHub but the materialized value is NotUsed.
def source[T]: Source[T, Sink[T, NotUsed]]
I’ve found a very simple solution based on promises, but I’m wondering if it is the correct approach.
The solution is create a promise an emit the pair (event,promise) and the stream is responsible of complete the promise with a success response or a failure. Here is the gist.
object RequestResponseWithAkkaStreams {
import scala.concurrent.duration._
type Request = String
type Response = String
def processRequest(request: Request): Future[Response] = ???
val hub =
MergeHub.source[(Request,Promise[Response])]
.throttle(100,1.minute,100,ThrottleMode.Shaping)
.toMat(Sink.foreach { case (request,promise) =>
processRequest(request).onComplete(promise.complete)
})(Keep.left)
.run()
// This should be included in a play filter for example. I use a SourceQueueWithComplete to obtain a Dropped when the throttle applies backpressure.
def receiveRequest(request: Request): Future[Response] = {
val source = Source.queue(1,OverflowStrategy.dropNew).idleTimeout(10.second).toMat(hub)(Keep.left).run()
val promise = Promise[Response]()
source.offer((request,promise)).onComplete {
case Success(Dropped) => promise.failure(new RuntimeException("Limit exceeded"))
case Failure(f) => promise.failure(f)
case _ => ()
}
promise.future
}
}
Thanks.