It appears that when subscribing to a Source
, I get 2 events pulled from it, even if I limit the “input” buffer to its minimum size:
.withAttributes(Attributes.inputBuffer(0, 1))
Minimal sample:
Source.fromPublisher(???)
.mapAsync(1) { x =>
Future {
Thread.sleep(2000)
println(s"Received: $x")
}
}
.withAttributes(Attributes.inputBuffer(0, 1))
I cannot find a way to stop the buffering. From the perspective of the Subscriber
and its Subscription
, this would generate:
request(1)
request(1)
onNext(1)
request(1)
onNext(2)
So there is always an event buffered. I do not want this. I don’t want any kind of buffering. The in-flight event should be the only event pulled from the source, until processing is done.
Full code for reproducing the above:
import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.scaladsl.Source
import monix.execution.rstreams.Subscription
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration._
def repeated[A](x: A)(f: A => A)(implicit ec: ExecutionContext): Publisher[A] =
new Publisher[A] {
override def subscribe(s: Subscriber[_ >: A]): Unit =
s.onSubscribe(new Subscription {
private[this] var current = x
private[this] var requestCount = 0
override def cancel(): Unit = ()
override def request(n: Long): Unit = {
requestCount += 1
println(s"Request ($requestCount): $n")
ec.execute(() => {
var i = 0L
while (i < n) {
val c = current
current = f(c)
s.onNext(c)
i += 1
}
})
}
})
}
def run(): Unit = {
implicit val system = ActorSystem("test")
implicit val ec = system.dispatcher
try {
val f = Source
.fromPublisher(repeated(1)(_ + 1))
.mapAsync(1) { x =>
Future {
Thread.sleep(2000)
println(s"Received: $x")
}
}
.withAttributes(Attributes.inputBuffer(0, 1))
.take(5)
.run()
Await.result(f, Duration.Inf); ()
} finally {
Await.result(system.terminate(), 10.seconds); ()
}
}