Explicit throughput limiting on part of a stream

Hi all

I have a flow in our system which reads some elements from SQS (using alpakka) and does some preporcessing (< 10 stages, normally << 1 minute in total). Then, the prepared element is sent to the main processing (single stage, processing taking minutes). The whole thing runs on AWS/K8S and we’d like to scale out when the SQS queue grows above a certain threshold. The issue is, the SQS queue takes a long time to blow up, since there are a lot of elements “idling” in-process, having done their preprocessing but waiting for the main thing.

We can’t externalize the preprocessing stuff to a separate queue since their outcome can’t survive a de/serialization roundtrip. Also, this service and the “main” processor are deeply coupled (this service runs as main’s sidecar) and can’t be scaled independently.

The preprocessing stages are technically async, but the whole thing is already on very low parallelism (internal stages and SQS batch/buffer sizes).

We tried lowering the interstage buffer (akka.stream.materializer.max-input-buffer-size), but that only gives some indirect benefit, no direct control (and is too internal for my taste to be mucking with, anyway).

I tried implementing a “gate” wrapper which would limit the amount of elements allowed inside some arbitrary Flow, looking something like this:

class LimitingGate[T, U](originalFlow: Flow[T, U], maxInFlight: Int) {
  private def in: InputGate[T] = ???
  private def out: OutputGate[U] = ???

  def limitedFlow: Flow[T, U, NotUsed] = Flow[T].via(in).via(originalFlow).via(out)
}

And using callbacks between the in/out wrapper stages to limit the throughput.

The implementation partially works (edge cases around stream termination are giving me a hard time), but it feels like a wrong way to go about achieving the actual goal.

Any ideas / notes / enlightening questions are appreciated

Thanks!

There are so many operators doing one form of buffering and in addition to that a generic flow could also include stages that drop elements or emit multiple elements from one incoming element.

However for a specific use case where you know the elements are 1:1 in and out you could likely create a pair of custom flow shaped GraphStages, one that goes upstream and one downstream that does some form of sideband communication to control the internal in-flight element count, somewhat like you suggest.

Since it would not be generally useable there is no such thing built into Akka and you’d have to implement one yourself, probably not super complicated but also not trivial.