GroupWhile on Akka Streams?

Ok, this is how you can actually do it - no hacks, just a clean solution. Note that I’m throttling the source for demo purposes:

import akka.actor.ActorSystem
import akka.stream.SubstreamCancelStrategy
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

object Application extends App {

  implicit val system = ActorSystem("Groupygroup")

  case class Groupygroup(key: String, value: Int)

  val items = (1 to 3).map(Groupygroup("foo", _)) ++
      (1 to 3).map(Groupygroup("bar", _)) ++
      (1 to 3).map(Groupygroup("baz", _))

  Source(items)
      .throttle(1, 1.second)
      .splitWhen(SubstreamCancelStrategy.drain) {
        var lastKey: Option[String] = None
        item =>
          lastKey match {
            case Some(item.key) | None =>
              lastKey = Some(item.key)
              false
            case _ =>
              lastKey = Some(item.key)
              true
          }
      }
      .fold(Vector.empty[Groupygroup])(_ :+ _)
      .mergeSubstreams
      .runForeach(println)
}

I’ve ran into similar use cases frequently, so I was thinking about this question from time to time today. Usually I ended up writing my own GraphShape, as stated above.

Not sure if there are any reasonable objections to the var in splitWhen, but I don’t think so.

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitWhen.html

1 Like