Hi all
I have a flow in our system which reads some elements from SQS (using alpakka) and does some preporcessing (< 10 stages, normally << 1 minute in total). Then, the prepared element is sent to the main processing (single stage, processing taking minutes). The whole thing runs on AWS/K8S and we’d like to scale out when the SQS queue grows above a certain threshold. The issue is, the SQS queue takes a long time to blow up, since there are a lot of elements “idling” in-process, having done their preprocessing but waiting for the main thing.
We can’t externalize the preprocessing stuff to a separate queue since their outcome can’t survive a de/serialization roundtrip. Also, this service and the “main” processor are deeply coupled (this service runs as main’s sidecar) and can’t be scaled independently.
The preprocessing stages are technically async, but the whole thing is already on very low parallelism (internal stages and SQS batch/buffer sizes).
We tried lowering the interstage buffer (akka.stream.materializer.max-input-buffer-size
), but that only gives some indirect benefit, no direct control (and is too internal for my taste to be mucking with, anyway).
I tried implementing a “gate” wrapper which would limit the amount of elements allowed inside some arbitrary Flow
, looking something like this:
class LimitingGate[T, U](originalFlow: Flow[T, U], maxInFlight: Int) {
private def in: InputGate[T] = ???
private def out: OutputGate[U] = ???
def limitedFlow: Flow[T, U, NotUsed] = Flow[T].via(in).via(originalFlow).via(out)
}
And using callbacks between the in/out wrapper stages to limit the throughput.
The implementation partially works (edge cases around stream termination are giving me a hard time), but it feels like a wrong way to go about achieving the actual goal.
Any ideas / notes / enlightening questions are appreciated
Thanks!