Thanks! Using conflate, I was able to avoid overflowing the buffer.
I use a Map[String, String] for the updates, but that gives me another problem: I have no control about the size of that map.
Ideally, I want to send each element of the map while still being allowed to change the elements that have not been sent.
For example, if my consumer is consuming over the network then sending a huge map will not really fix the problem. If I could send small updates and if an update is produced, while waiting for the consumer, I want to update the buffer.
In summary:
Conflate allows me to chunk everything up, but I don’t want to send it chunked.
I realize that flattening afterconflate may do what I just described. I’ll do some testing!
I couldn’t find any existing functions to do this, so I ended up creating my own GraphStage:
case class ConflateBatchBuffer[A, B](max: Int) extends GraphStage[FlowShape[(A, B), Map[A, B]]] {
type In = (A, B)
type Out = Map[A, B]
val in = Inlet[In]("ConflateBatchBuffer.in")
val out = Outlet[Out]("ConflateBatchBuffer.out")
override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buf = mutable.Queue.empty[In]
private def flush(): Unit = {
val batch = (1 to max).flatMap(_ => if (buf.nonEmpty) Some(buf.dequeue) else None).toMap
push(out, batch)
}
override def preStart(): Unit = {
pull(in)
}
override def onPull(): Unit = {
flush()
}
override def onPush(): Unit = {
val el = grab(in)
val existingIdx = buf.indexWhere(_._1 == el._1)
pull(in)
if (existingIdx == -1) {
buf.enqueue(el)
} else {
buf(existingIdx) = el
}
if (isAvailable(out)) {
flush()
}
}
setHandlers(in, out, this)
}
}