Hi,
I am doing experiments with async + inputBuffers and I can not understand why “manual” batching is faster than using inputBuffer attribute (in my laptop in five times faster). Can someone explain ?
Code:
import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object TimedUtil {
def timed[T](prefix: String)(cmd: => T): T = {
val t0 = System.currentTimeMillis()
val res = cmd
println(s"duration of $prefix: ${System.currentTimeMillis() - t0}")
res
}
}
object Async extends App {
implicit val system = ActorSystem()
val numberOfIterations = 10000000
TimedUtil.timed("async") {
val source = Source
.fromIterator(() => (0 to numberOfIterations).iterator)
.async
.withAttributes(Attributes.inputBuffer(1024, 1024))
.map(a => a + 1)
.runWith(Sink.ignore)
Await.result(source, Duration.Inf)
}
TimedUtil.timed("async-batch") {
val source = Source
.fromIterator(() => (0 to numberOfIterations).iterator)
.batch(1024, el => Vector.newBuilder += el)({ (acc, el) =>
acc += el
})
.map(_.result())
.async
.mapConcat(identity)
.map(a => a + 1)
.runWith(Sink.ignore)
Await.result(source, Duration.Inf)
}
system.terminate()
}