Backpressure support with Akka grpc

Hi.
How can I do back-pressure with akka-grpc?

I want to fetch messages from Kafka and make them consumed by the dynamic number of clients.
I used Kafka akka stream library(alpakka).

First, I created SourceQueue with PartitionHub.

  val (queue, fromProducer): (SourceQueueWithCompleteAndSize[HelloReply],
                              Source[HelloReply, NotUsed]) = MeasurableSourceQueue
    .queue[HelloReply](100, OverflowStrategy.backpressure)
    .toMat(
      PartitionHub.sink((size, elem) ⇒ math.abs(elem.hashCode % size),
                        startAfterNrOfConsumers = 1,
                        bufferSize = 1))(Keep.both)
    .run()

And I created Kafka consumer stream and make it offer messages to the queue.

  val control =
    Consumer
      .atMostOnceSource(consumerSettings,
                        Subscriptions.topics("whisk.system-action"))
      .mapAsync(1) { record =>
        queue.offer(record)
      }
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

Until a client starts, it works as expected.
No more 100 records are stored in the queue as the buffer size of the queue is 100.

When a client starts and requests the Source using the following API, it starts reading 1 message per 1 second based on throttle API.

    def runStreamingReplyExample(): Unit = {
      val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
      val done: Future[Done] =
        responseStream
          .buffer(1, OverflowStrategy.backpressure)
        .throttle(1, 1000 milliseconds)
          .map(reply => {
            println(s"got streaming reply: ${reply.message}")
          })
          .runWith(Sink.ignore)

      done.onComplete {
        case Success(_) =>
          println("streamingReply done")
        case Failure(e) =>
          println(s"Error streamingReply: $e")
      }
    }

But in the Kafka consumer side, it repeatedly offers messages to the queue.
(more than 1 messages per second)
More strange part is, I monitored the size of the queue, the size is 0 most of the time.

What I am expecting is Kafka consumer offers 1 message per second when the client starts.

I am replying to the client like this:

  override def itKeepsReplying(
      in: HelloRequest): Source[HelloReply, NotUsed] = {
    Source
      .single(in)
      .ask[Source[HelloReply, NotUsed]](1)(actor)
      .flatMapConcat(x => x)
  }

Does Akka-grpc support back-pressure?

Akka gRPC supports backpressure for sure, you can verify that with a simpler reproducer, like consuming an incoming stream of events with a throttled stream etc. There must be something more with your more complicated sample causing this.

The parts you have shared look ok but there’s details missing here, the itKeepsReplying call asks an actor, what does the actor do with that request? Without sharing that it’s impossible to say if what you put downstream of the PartitionHub is the issue.

You should also be able to test that part yourself by cutting Akka gRPC out of the equation and just testing your stream design against Kafka in isolation.

Thank you for the answer.
This is the actor doing.

 def receive = {

    case request: HelloRequest =>
      sender ! fromProducer

    case Failure(exception: Exception) =>
      // handle the failure
      println(exception)
  }

It just returns producer of PartitionHub.
And I confirmed the back-pressure is working without Akka-grpc using this sink:

    Source.single[HelloRequest](HelloRequest("Work?")).ask[Source[HelloReply, NotUsed]](1)(actor)
      .flatMapConcat(x => x)
      .buffer(1, overflowStrategy = OverflowStrategy.backpressure)
      .throttle(1, 1 seconds)
      .map(reply => {
        println(reply)
      })
      .runWith(Sink.ignore)

As you can see the sink part is same with the one in akka-grpc client.

Server API

 override def itKeepsReplying(
      in: HelloRequest): Source[HelloReply, NotUsed] = {
    Source
      .single(in)
      .ask[Source[HelloReply, NotUsed]](1)(actor)
      .flatMapConcat(x => x)
  }

Client codes

      val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
      val done: Future[Done] =
        responseStream
          .buffer(1, OverflowStrategy.backpressure)
          .throttle(1, 1000 milliseconds)
          .map(reply => {
            println(s"got streaming reply: ${reply.message}")
          })
          .runWith(Sink.ignore)

Is there any specific akka-grpc version that supports backpressure?

This is my build.gradle.

buildscript {
    repositories {
        mavenLocal()
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        // see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
        // for the currently latest version.
        classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.4.2'
    }
}

apply plugin: 'scala'
apply plugin: 'application'
apply plugin: 'com.lightbend.akka.grpc.gradle'

// These are the default options for a Scala project (not necessary to define)
akkaGrpc {
    language = "Scala"
    generateClient = true
    generateServer = true
}

repositories {
    mavenLocal()
    mavenCentral()
}

// Define a separate configuration for managing the dependency on Jetty ALPN agent.
configurations {
    alpnagent
}

dependencies {
    compile "org.scala-lang:scala-library:${gradle.scala.version}"

    compile 'com.typesafe.akka:akka-actor_2.12:2.5.19'
    compile 'com.typesafe.akka:akka-stream_2.12:2.5.19'
    compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.19'
    compile 'com.typesafe.akka:akka-http-core_2.12:10.1.6'
    compile 'com.typesafe.akka:akka-http-spray-json_2.12:10.1.6'
    compile 'org.apache.kafka:kafka-clients:2.0.0'
    compile 'com.typesafe.akka:akka-stream-kafka_2.12:1.0-RC1'

    alpnagent 'org.mortbay.jetty.alpn:jetty-alpn-agent:2.0.9'
}

Once the client starts receiving data, kafka stream keep enqueuing data.
I just used increasing int value as a message.

enqueue 1, queue size: 1
enqueue 2, queue size: 2
enqueue 3, queue size: 3
enqueue 4, queue size: 4
enqueue 5, queue size: 5
enqueue 6, queue size: 6
enqueue 7, queue size: 7
enqueue 8, queue size: 8
enqueue 9, queue size: 9
enqueue 10, queue size: 10
enqueue 11, queue size: 11
enqueue 12, queue size: 12
enqueue 13, queue size: 13
enqueue 14, queue size: 14
enqueue 15, queue size: 15
enqueue 16, queue size: 16
enqueue 17, queue size: 17
.
.
.
enqueue 19482, queue size: 100
enqueue 19483, queue size: 100
enqueue 19484, queue size: 100
enqueue 19485, queue size: 100
enqueue 19486, queue size: 99
enqueue 19487, queue size: 100
enqueue 19488, queue size: 100
enqueue 19489, queue size: 100
enqueue 19490, queue size: 100
enqueue 19491, queue size: 100
enqueue 19492, queue size: 100
enqueue 19493, queue size: 100
enqueue 19494, queue size: 100
enqueue 19495, queue size: 100
enqueue 19496, queue size: 100
enqueue 19497, queue size: 100
enqueue 19498, queue size: 100
enqueue 19499, queue size: 100
enqueue 19500, queue size: 100
enqueue 19501, queue size: 0
enqueue 19502, queue size: 1
enqueue 19503, queue size: 1
enqueue 19504, queue size: 0
enqueue 19505, queue size: 0
enqueue 19506, queue size: 1
enqueue 19507, queue size: 2
enqueue 19508, queue size: 1
enqueue 19509, queue size: 2
enqueue 19510, queue size: 2
enqueue 19511, queue size: 1
enqueue 19512, queue size: 1
enqueue 19513, queue size: 1
enqueue 19514, queue size: 1
enqueue 19515, queue size: 1
enqueue 19516, queue size: 0
enqueue 19517, queue size: 0
enqueue 19518, queue size: 0
enqueue 19519, queue size: 0
enqueue 19520, queue size: 1
enqueue 19521, queue size: 0
enqueue 19522, queue size: 0
enqueue 19523, queue size: 0
enqueue 19524, queue size: 0
enqueue 19525, queue size: 0
enqueue 19526, queue size: 0
enqueue 19527, queue size: 0
enqueue 19528, queue size: 1
enqueue 19529, queue size: 1
enqueue 19530, queue size: 1
enqueue 19531, queue size: 2
enqueue 19532, queue size: 1
enqueue 19533, queue size: 2
enqueue 19534, queue size: 1
enqueue 19535, queue size: 2
enqueue 19536, queue size: 2
enqueue 19537, queue size: 2
enqueue 19538, queue size: 2
enqueue 19539, queue size: 1
enqueue 19540, queue size: 0
enqueue 19541, queue size: 0
enqueue 19542, queue size: 1
enqueue 19543, queue size: 0
enqueue 19544, queue size: 0
enqueue 19545, queue size: 1
enqueue 19546, queue size: 1
enqueue 19547, queue size: 2
enqueue 19548, queue size: 1
enqueue 19549, queue size: 1
enqueue 19550, queue size: 1
enqueue 19551, queue size: 1
enqueue 19552, queue size: 1
enqueue 19553, queue size: 1
enqueue 19554, queue size: 2
enqueue 19555, queue size: 3
enqueue 19556, queue size: 2
enqueue 19557, queue size: 3
enqueue 19558, queue size: 3
enqueue 19559, queue size: 3
enqueue 19560, queue size: 2
enqueue 19561, queue size: 3
enqueue 19562, queue size: 3
enqueue 19563, queue size: 3
enqueue 19564, queue size: 4
enqueue 19565, queue size: 4
enqueue 19566, queue size: 4
enqueue 19567, queue size: 5
enqueue 19568, queue size: 4
enqueue 19569, queue size: 4
enqueue 19570, queue size: 3
enqueue 19571, queue size: 3
enqueue 19572, queue size: 3
enqueue 19573, queue size: 3
enqueue 19574, queue size: 3
enqueue 19575, queue size: 4
enqueue 19576, queue size: 3
enqueue 19577, queue size: 4
enqueue 19578, queue size: 2
enqueue 19579, queue size: 2
enqueue 19580, queue size: 2
enqueue 19581, queue size: 2
enqueue 19582, queue size: 2
enqueue 19583, queue size: 2
enqueue 19584, queue size: 2
enqueue 19585, queue size: 2
enqueue 19586, queue size: 1
enqueue 19587, queue size: 1
enqueue 19588, queue size: 1
enqueue 19589, queue size: 1
enqueue 19590, queue size: 2
enqueue 19591, queue size: 1
enqueue 19592, queue size: 1
enqueue 19593, queue size: 2
enqueue 19594, queue size: 3
enqueue 19595, queue size: 4
enqueue 19596, queue size: 4
enqueue 19597, queue size: 5
enqueue 19598, queue size: 5
enqueue 19599, queue size: 5
enqueue 19600, queue size: 5
enqueue 19601, queue size: 5
enqueue 19602, queue size: 4
enqueue 19603, queue size: 4
enqueue 19604, queue size: 5
enqueue 19605, queue size: 4
enqueue 19606, queue size: 4
enqueue 19607, queue size: 5
enqueue 19608, queue size: 4
enqueue 19609, queue size: 5
enqueue 19610, queue size: 5
enqueue 19611, queue size: 5
enqueue 19612, queue size: 6
enqueue 19613, queue size: 6
enqueue 19614, queue size: 6
enqueue 19615, queue size: 7
enqueue 19616, queue size: 6
enqueue 19617, queue size: 5
enqueue 19618, queue size: 5
enqueue 19619, queue size: 6
enqueue 19620, queue size: 7
enqueue 19621, queue size: 8
enqueue 19622, queue size: 9
enqueue 19623, queue size: 9
enqueue 19624, queue size: 10
enqueue 19625, queue size: 11
enqueue 19626, queue size: 11
enqueue 19627, queue size: 12
enqueue 19628, queue size: 12
enqueue 19629, queue size: 12
enqueue 19630, queue size: 12
enqueue 19631, queue size: 11
enqueue 19632, queue size: 11
enqueue 19633, queue size: 11
enqueue 19634, queue size: 12
enqueue 19635, queue size: 12
enqueue 19636, queue size: 12
enqueue 19637, queue size: 12
enqueue 19638, queue size: 12
enqueue 19639, queue size: 13
enqueue 19640, queue size: 14
enqueue 19641, queue size: 14
enqueue 19642, queue size: 15
enqueue 19643, queue size: 16
enqueue 19644, queue size: 16
enqueue 19645, queue size: 17
enqueue 19646, queue size: 17
enqueue 19647, queue size: 18
enqueue 19648, queue size: 18
enqueue 19649, queue size: 18
enqueue 19650, queue size: 19
enqueue 19651, queue size: 19
enqueue 19652, queue size: 18
enqueue 19653, queue size: 18
enqueue 19654, queue size: 19
enqueue 19655, queue size: 20
enqueue 19656, queue size: 20
enqueue 19657, queue size: 21
enqueue 19658, queue size: 22
enqueue 19659, queue size: 21
enqueue 19660, queue size: 22
enqueue 19661, queue size: 22
enqueue 19662, queue size: 23
enqueue 19663, queue size: 23
enqueue 19664, queue size: 24
enqueue 19665, queue size: 23
enqueue 19666, queue size: 24
enqueue 19667, queue size: 24
enqueue 19668, queue size: 24
enqueue 19669, queue size: 24
enqueue 19670, queue size: 25
enqueue 19671, queue size: 24
enqueue 19672, queue size: 25
enqueue 19673, queue size: 25
enqueue 19674, queue size: 26
enqueue 19675, queue size: 27
enqueue 19676, queue size: 28
enqueue 19677, queue size: 27
enqueue 19678, queue size: 27
enqueue 19679, queue size: 28
enqueue 19680, queue size: 28
enqueue 19681, queue size: 29
enqueue 19682, queue size: 29
enqueue 19683, queue size: 30
enqueue 19684, queue size: 31
enqueue 19685, queue size: 30
enqueue 19686, queue size: 30
enqueue 19687, queue size: 31
enqueue 19688, queue size: 30
enqueue 19689, queue size: 30
enqueue 19690, queue size: 30
enqueue 19691, queue size: 30
enqueue 19692, queue size: 31
enqueue 19693, queue size: 31
enqueue 19694, queue size: 31
enqueue 19695, queue size: 32
enqueue 19696, queue size: 32
enqueue 19697, queue size: 32
enqueue 19698, queue size: 32
enqueue 19699, queue size: 32
enqueue 19700, queue size: 33
enqueue 19701, queue size: 34
enqueue 19702, queue size: 33
enqueue 19703, queue size: 33
enqueue 19704, queue size: 33
enqueue 19705, queue size: 33
enqueue 19706, queue size: 34
enqueue 19707, queue size: 34
enqueue 19708, queue size: 34
.
.
.
enqueue 19962, queue size: 95
enqueue 19963, queue size: 96
enqueue 19964, queue size: 97
enqueue 19965, queue size: 98
enqueue 19966, queue size: 99
enqueue 19967, queue size: 100

After this point, though I stop the server, the client keeps receiving data.

HelloReply(Backpressure required! 526)
HelloReply(Backpressure required! 527)
HelloReply(Backpressure required! 528)
HelloReply(Backpressure required! 529)
HelloReply(Backpressure required! 530)
HelloReply(Backpressure required! 531)
HelloReply(Backpressure required! 532)
HelloReply(Backpressure required! 533)
HelloReply(Backpressure required! 534)
HelloReply(Backpressure required! 535)
.
.
.

I suspect, HTTP/2 layer of Akka-grpc(or other thing) receives about 20K messages and populate them to the sink based on back-pressure.

Do I need a specific configuration to enable backpressure in HTTP layer?

The reason why you are seeing elements produced is that the per element backpressure is only applied on the Akka streams API level, there is no per element backpressure in gRPC at the protocol level, so over HTTP2 we get backpressure through wether the client accepts more bytes, and there are per HTTP2 substream buffers on both sides which will fit a substantial amount of data before filling up and backpressuring into the server.

A minimal reproducer like this shows that:

// server
def streamStuff(in: Req): Source[Res, NotUsed] = {
  Source.repeat("elem")
    .zipWithIndex
    .map(_.toString())
    .wireTap(s => println(s"server-sent-elem $s"))
    .map(s => Res(s))
  }

// client
client.streamStuff(Req(""))
  .throttle(1, 1.second)
  .runForeach(s => println(s"client-saw-elem $s"))

So to throttle on a stream element level from the client is not really possible like that, especially with small elements, the buffers will fit a lot (~28k in my example) - you would have to put the rate limit on the server side to achieve those 1 elem per second per client pulled from kafka semantics.

I think we could improve the documentation around this so I created an issue to track that: https://github.com/akka/akka-grpc/issues/501

One way you could still achieve per element throttling could be using a bidirectional gRPC call and only produce elements to the client when you get a request element from it. And then throttle the rate you pass such requests. Something like this:

// server
def bidi(in: Source[Req, NotUsed]): Source[Res, NotUsed] = 
  in.mapAsync(1)(req =>
    // fetch one element
    Future.successful(Res("elem"))
  )

// client
client.bidi(Source.repeat(Req("")).throttle(1, 1.second))
  .runForeach(s => println(s"client-saw-elem $s"))

Hey does your suggestion work with external stream?
For example, if server receives elements from Kafka using Alpakka and just returns the stream to the client, does it work in that way?

I finally end up with not using Kafka stream.
Instead, I could achieve what I want using manual Kafka client with the way @johanandren suggested.

@johanandren
Thank you for kind explanation.