Hello there,
I am currently trying to understand better how parallelism works in Akka Streams. For that purpose, I built a simple stream in a parallel and a sequential way to see the difference. But the sequential processing is faster than the parallel processing and I wonder why?
I used a Balancer and a Merger and my computer has 4 cores. I used a huge amount of numbers and some mathematical operations to give the computer enough workload.
I added a warm-up phase, so the comparison is fair.
My results are always that my sequential code is a lot faster, it needs mostly around 16 sec for the computation, whereas my parallel code needs somewhere between 70 and 100 sec.
Here is my code:
object Main extends App {
implicit val system = ActorSystem("QuickStart")
implicit val materializer: Materializer = ActorMaterializer()
val source: Source[Long, NotUsed] = Source(1L to 100000000L)
val sequential: Flow[Long, Long, NotUsed] =
Flow.apply[Long]
.filter(x => x % 2 == 0)
.map(x => x.toDouble)
.map(x => log(x))
.map(x => exp(x))
.map(x => x.toLong)
.fold(0L)((accu, value)
=> accu + value)
{
println("starting warm up")
val start = System.currentTimeMillis()
val done = source
.via(sequential)
.runForeach(i => println(s"warm up: result = $i, time = ${System.currentTimeMillis() - start}"))
Await.result(done, 100000.millisecond)
}
{
println("starting sequential")
val start = System.currentTimeMillis()
val done = source
.via(sequential)
.runForeach(i => println(s"sequential: result = $i, time = ${System.currentTimeMillis() - start}"))
Await.result(done, 100000.millisecond)
}
def parallel(parallelism: Int): Flow[Long, Long, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val balancer = builder.add(Balance[Long](parallelism))
val merger = builder.add(Merge[Long](parallelism))
for (i <- 0 until parallelism) {
balancer.out(i) ~> sequential.async ~> merger.in(i)
}
FlowShape(balancer.in, merger.out)
})
{
println("starting parallel")
val start = System.currentTimeMillis()
val done = source
.via(parallel(4))
.fold(0L)((accu, value) => accu + value)
.runForeach(i => println(s"parallel: result = $i, time = ${System.currentTimeMillis() - start}"))
Await.result(done, 100000.millisecond)
}
}
Do you have an idea why the sequential code is so much faster than the parallel? Is the workload not enough or did I write the parallel code in a wrong way?
Cheers