Akka Streams: Force just one initial Publisher#request(1), no buffering

It appears that when subscribing to a Source, I get 2 events pulled from it, even if I limit the “input” buffer to its minimum size:

.withAttributes(Attributes.inputBuffer(0, 1))

Minimal sample:

Source.fromPublisher(???)
  .mapAsync(1) { x =>
    Future {
      Thread.sleep(2000)
      println(s"Received: $x")
    }
  }
  .withAttributes(Attributes.inputBuffer(0, 1))

I cannot find a way to stop the buffering. From the perspective of the Subscriber and its Subscription, this would generate:

request(1)
request(1)
onNext(1)
request(1)
onNext(2)

So there is always an event buffered. I do not want this. I don’t want any kind of buffering. The in-flight event should be the only event pulled from the source, until processing is done.

Full code for reproducing the above:

import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.scaladsl.Source
import monix.execution.rstreams.Subscription
import org.reactivestreams.{ Publisher, Subscriber }

import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration._

def repeated[A](x: A)(f: A => A)(implicit ec: ExecutionContext): Publisher[A] =
  new Publisher[A] {
    override def subscribe(s: Subscriber[_ >: A]): Unit =
      s.onSubscribe(new Subscription {
        private[this] var current = x
        private[this] var requestCount = 0

        override def cancel(): Unit = ()
        override def request(n: Long): Unit = {
          requestCount += 1
          println(s"Request ($requestCount): $n")

          ec.execute(() => {
            var i = 0L
            while (i < n) {
              val c = current
              current = f(c)
              s.onNext(c)
              i += 1
            }
          })
        }
      })
  }

def run(): Unit = {
  implicit val system = ActorSystem("test")
  implicit val ec = system.dispatcher
  try {
    val f = Source
      .fromPublisher(repeated(1)(_ + 1))
      .mapAsync(1) { x =>
        Future {
          Thread.sleep(2000)
          println(s"Received: $x")
        }
      }
      .withAttributes(Attributes.inputBuffer(0, 1))
      .take(5)
      .run()

    Await.result(f, Duration.Inf); ()
  } finally {
    Await.result(system.terminate(), 10.seconds); ()
  }
}

As I mentioned in the other thread, you cannot expect to know/control that. If you need that level of control Akka streams, and even Reactive Streams may not be the right tool for what you are trying to do. What an RS publisher/source can do in the face of another request is to not emit anything until a later point in time, based on something else than the RS backpressure.

In this specific case I think the additional request is because the stream subscriber will be able to emit one element downstream inside the stream to mapAsync, and that means no elements are queued at the 1 element buffer in “front” of the stream and it will therefore immediately request another element from the publisher.