I need to filter my flow based on some blacklist which can be changed outside of the flow execution. So, I see two option to do that:
Incapsulate blacklist in a separate service
class Blacklist(init: Set[String]) {
@volatile private var set: Set[String] = init
def get: Set[String] = set
def update(newSet: Set[String]): Unit = {
set = newSet
}
}
val blacklist = new Blacklist(Set.empty)
Flow[String]
.filterNot(blacklist.get)
Incapsulate blacklist in an actor
class Blacklist extends Actor {
import Blacklist._
private var set = Set.empty[String]
override def receive: Receive = {
case UpdateBlacklist(newset: Set[String]) =>
set = newset
case GetBlacklist =>
sender ! set
}
}
object Blacklist {
case class UpdateBlacklist(set: Set[String])
case object GetBlacklist
}
val parallelism: Int = ???
val blacklist = system.actorOf(Props(new Blacklist()))
Flow[String]
.mapAsync(parallelism) { str =>
val ask = blacklist ? Blacklist.GetBlacklist
ask.mapTo[Set[String]] map { str -> _ }
} filterNot { case (str, exclude) =>
exclude(str)
}
I’m afraid that actor holder solution with mapAsync introduce new asynchronous boundary which prevents operators fusing.
So, which one should I prefer? Or is there a more idiomatic way?
You want to create from a string to 0-1 string. This is a fid: T=>Option[T] function, or a ff: T => Future[Option[T]] function. The ugly way is Flow[T].mapConcat(fid(_).toSeq) the less ugly way is Flow[T].map(fid).collect{case Some(x) => x} (this can ofc work with Flow[T].mapAsync(par)(ff).collect{case Some(x) => x}).
The ff and fid functions can be build by actors:
class Blacklist extends Actor {
import Blacklist._
private var set = Set.empty[String]
override def receive: Receive = {
case UpdateBlacklist(newset: Set[String]) =>
set = newset
case Check(x) =>
sender ! set.find(_ == x)
}
}
def ff(x: String) = (actor ? Check(x)).mapTo[Option[String]]
I have no idea about the async vs. nonasync way, but I would personally choose the async bcs the var and volatile are easy to mess up. (Probably the sync would be faster then the async…) But I would choose map+collect or mapConcat rather then filter.
EDIT: I just see my implementations are actually whitelists and not blacklists but you get the point I think :D
For good performance, unless you have roughly one update of the blacklist per element in the list, I think you’d want to push the changes to an operator/stage in the stream and keep in a local field rather than touch a volatile or do an ask for every element, that could be done in a few ways, a custom GraphStage which you send updates to through getAsyncCallback or getStageActor, another option would be to have a Source[blacklist] that emits every time there is a new blacklist, and then use something like extrapolate combine with zip to emit a tuple of the latest seen blacklist and the element and then filter using that.