Hi,
Context:
We use Alpakka Google PubSub GRPC to subscribe to messages from PubSub over GPRC. This is a Streaming subscription which receives messages in a stream.
The problem:
When the service first starts up and connects to PubSub, there is no backpressure yet, so Alpakka/Source starts to pull a ton of messages from PubSub, which we can’t respond to in a timely manner, so they are left hanging in a buffer/queue that takes too long to be processed. Since this is on PubSub, it means that some of the messages which have not been discarded, but rather buffered, are going to be sent to the service again by PubSub (that’s how retries work on PubSub: you get a “loan” on each message you receive, but if you don’t “acknowledge” the message in a specified period, the message is retried). Now since the service already has a queue full of messages to process, the retries will again be in the same state and the cycle continues on and our pubsub queue doesn’t drain. Horizontally scaling the service only makes this worse, each replica will fetch a ton of messages but now messages are sent between services and the effect is amplified.
Backpressure seems to work after that because after a short period of time passes after the startup, the number of requested messages from PubSub decreases significantly (and it matches our expectations of the throughput of our system). But, we have been looking for ways to avoid that initial spike in messages.
The spike is about a rate of ~3000-4000/s in the first 2-3 minutes, and then it goes down to 200/s. This is what it looks like when the service is restarted (or the connection to pubsub is lost and and a new connection is made):
We tried these things:
Using .throttle
on the source, or even a ticking .zip
on the source to slow down consumption: doesn’t work
Using .buffer(1, OverflowStrategy.backpressure)
on the source: doesn’t work
Decreasing values of these configurations (basically reducing buffer capacity and output burst limit significantly):
initial-input-buffer-size = 1
max-input-buffer-size = 2
output-burst-limit = 50
When I say “doesn’t work” I mean it does not reduce the initial spike, although the effect on the pipeline is visible for all these cases: a significant throttle of 1 per 1 second is visible in the processing pipeline and after the initial spike, no more than one item is received from pubsub per second, but the spike in the beginning is left untouched!
We would really appreciate it if someone could help us understand how can we apply a limit/throttling on the source or somehow overcome this issue.
Thank you!