Ok, this is how you can actually do it - no hacks, just a clean solution. Note that I’m throttling the source for demo purposes:
import akka.actor.ActorSystem
import akka.stream.SubstreamCancelStrategy
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
object Application extends App {
implicit val system = ActorSystem("Groupygroup")
case class Groupygroup(key: String, value: Int)
val items = (1 to 3).map(Groupygroup("foo", _)) ++
(1 to 3).map(Groupygroup("bar", _)) ++
(1 to 3).map(Groupygroup("baz", _))
Source(items)
.throttle(1, 1.second)
.splitWhen(SubstreamCancelStrategy.drain) {
var lastKey: Option[String] = None
item =>
lastKey match {
case Some(item.key) | None =>
lastKey = Some(item.key)
false
case _ =>
lastKey = Some(item.key)
true
}
}
.fold(Vector.empty[Groupygroup])(_ :+ _)
.mergeSubstreams
.runForeach(println)
}
I’ve ran into similar use cases frequently, so I was thinking about this question from time to time today. Usually I ended up writing my own GraphShape, as stated above.
Not sure if there are any reasonable objections to the var
in splitWhen
, but I don’t think so.
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitWhen.html