From what you are describing, you want to aggregate the stream of elements into batches grouped by a key, where each batch ends when the next one starts (eg. the elements with an equal key are in direct sequence of each other)
EDIT: You can ignore all of this and go to my last post. I leave the other other posts here because they may still be helpful in other scenarios.
statefulMapConcat
won’t know when the stream ends, so you have to emit a special element , or else the operation will wait forever for the grouping key to change, indicating the termination of a group. If you don’t want to do that (using a termination element), you’ll have to write your own GraphStage Flow implementation, which will be notified of stream termination and can react appropriately https://doc.akka.io/docs/akka/current/stream/stream-customize.html (that’s what I do in such cases). Alternatively may also use groupBy and fold
on the substreams, but that has the negative side-effect that aggregated batches only become available once the stream completes.
I’ve written a working example of a statefulMapConcat
implementation for you. You’re welcome ;)
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object Application extends App {
implicit val system = ActorSystem("Groupygroup")
case class Groupygroup(key: String, value: Int)
val End = Groupygroup("--END--", 0)
val items = (1 to 3).map(Groupygroup("foo", _)) ++
(1 to 3).map(Groupygroup("bar", _)) ++
(1 to 3).map(Groupygroup("baz", _)) :+
End
Source(items).statefulMapConcat[Seq[Groupygroup]](() => {
var batch = List.empty[Groupygroup]
var lastKey: Option[String] = None
item =>
lastKey match {
case Some(item.key) | None =>
lastKey = Some(item.key)
batch ::= item
Nil
case _ =>
lastKey = Some(item.key)
val result = batch.reverse
batch = item :: Nil
result :: Nil
}
}).runForeach(println)
}
Result:
List(Groupygroup(foo,1), Groupygroup(foo,2), Groupygroup(foo,3))
List(Groupygroup(bar,1), Groupygroup(bar,2), Groupygroup(bar,3))
List(Groupygroup(baz,1), Groupygroup(baz,2), Groupygroup(baz,3))
Of course you could make the code nicer, with sealed traits etc. This is a starter