The idiomatic way to manage shared state with Akka Streams

I need to filter my flow based on some blacklist which can be changed outside of the flow execution. So, I see two option to do that:

  1. Incapsulate blacklist in a separate service
  class Blacklist(init: Set[String]) {
    @volatile private var set: Set[String] = init

    def get: Set[String] = set
    def update(newSet: Set[String]): Unit = {
      set = newSet
    }
  }

  val blacklist = new Blacklist(Set.empty)

  Flow[String]
    .filterNot(blacklist.get)
  1. Incapsulate blacklist in an actor
  class Blacklist extends Actor {
    import Blacklist._
    private var set = Set.empty[String]

    override def receive: Receive = {
      case UpdateBlacklist(newset: Set[String]) =>
        set = newset
      case GetBlacklist =>
        sender ! set
    }
  }

  object Blacklist {
    case class UpdateBlacklist(set: Set[String])
    case object GetBlacklist
  }

  val parallelism: Int = ???
  val blacklist = system.actorOf(Props(new Blacklist()))

  Flow[String]
    .mapAsync(parallelism) { str =>
      val ask = blacklist ? Blacklist.GetBlacklist
      ask.mapTo[Set[String]] map { str -> _ }
    } filterNot { case (str, exclude) => 
      exclude(str) 
    }

I’m afraid that actor holder solution with mapAsync introduce new asynchronous boundary which prevents operators fusing.
So, which one should I prefer? Or is there a more idiomatic way?

1 Like

You want to create from a string to 0-1 string. This is a fid: T=>Option[T] function, or a ff: T => Future[Option[T]] function. The ugly way is Flow[T].mapConcat(fid(_).toSeq) the less ugly way is Flow[T].map(fid).collect{case Some(x) => x} (this can ofc work with Flow[T].mapAsync(par)(ff).collect{case Some(x) => x}).

The ff and fid functions can be build by actors:

class Blacklist extends Actor {
    import Blacklist._
    private var set = Set.empty[String]

    override def receive: Receive = {
      case UpdateBlacklist(newset: Set[String]) =>
        set = newset
      case Check(x) =>
        sender ! set.find(_ == x)
    }
  }
def ff(x: String) = (actor ? Check(x)).mapTo[Option[String]]

Or with the class method you mentioned above…

def fid(x: String) = blacklister.getBlacklist.find(_ == x)

I have no idea about the async vs. nonasync way, but I would personally choose the async bcs the var and volatile are easy to mess up. (Probably the sync would be faster then the async…) But I would choose map+collect or mapConcat rather then filter.

EDIT: I just see my implementations are actually whitelists and not blacklists but you get the point I think :D

Thank you, I understand your point.

For good performance, unless you have roughly one update of the blacklist per element in the list, I think you’d want to push the changes to an operator/stage in the stream and keep in a local field rather than touch a volatile or do an ask for every element, that could be done in a few ways, a custom GraphStage which you send updates to through getAsyncCallback or getStageActor, another option would be to have a Source[blacklist] that emits every time there is a new blacklist, and then use something like extrapolate combine with zip to emit a tuple of the latest seen blacklist and the element and then filter using that.

1 Like

If I get it right it should look something like:
Zipped sources:

val initBlacklist = Set.empty[String]

val blacklistSource = Source.queue[Set[String]](100, OverflowStrategy.backpressure)
  .extrapolate(Iterator.continually, Some(initBlacklist))

val elementsSource = Source.repeat("")

val queue = blacklistSource
  .zip(elementsSource)
  .filterNot { case (blacklist, elem) => blacklist(elem) }
  .to(Sink.ignore)
  .run()

val newBlacklist = Set("exclude")
queue.offer(newBlacklist)

Custom stage:

val initBlacklist = Set.empty[String]

val service =
  Source.repeat("")
    .viaMat(new ZipWithState(blacklist))(Keep.right)
    .filterNot { case (blacklist, elem) => blacklist(elem) }
    .to(Sink.ignore)
    .run()

val newBlacklist = Set("exclude")
service.update(newBlacklist)


trait StateService[A] {
  def update(state: A): Unit
}

class StateServiceCallback[A](callback: AsyncCallback[A]) extends StateService[A] {
  override def update(state: A): Unit = callback.invoke(state)
}

class ZipWithState[S, I](initState: S) extends GraphStageWithMaterializedValue[FlowShape[I, (S, I)], StateService[S]] {
  val in = Inlet[I]("ZipWithState.in")
  val out = Outlet[(S, I)]("ZipWithState.out")

  override val shape: FlowShape[I, (S, I)] = FlowShape.of(in, out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, StateService[S]) = {
    val logic = new GraphStageLogic(shape) {
      private[this] var state: S = initState
      val updateStateCallback: AsyncCallback[S] =
        getAsyncCallback[S] { state = _ }

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, (state, grab(in)))
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }

    (logic, new StateServiceCallback(logic.updateStateCallback))
  }
}  
1 Like