Hello there,
May I suggest an addition to Flow operation with akka streams?
case class IfThenElseMapFlow[T, U, V](condition: T => Boolean, thn: T => U, els: T => V) extends GraphStage[FanOutShape2[T, U, V]]{
val in: Inlet[T] = Inlet[T]("Input")
val outThen: Outlet[U] = Outlet("Then")
val outElse: Outlet[V] = Outlet("Else")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler {
override def onPush(): Unit = {
val data = grab(in)
if (condition(data)) {
//isAvailable(outThen) check required?
push(outThen, thn(data))
} else {
//isAvailable(outElse) check required?
push(outElse, els(data))
}
}
override def toString: String = "IfThenElseFlow"
setHandler(in, this)
Seq(outThen, outElse).foreach { o=>
setHandler(o, new OutHandler {
override def onPull(): Unit = {
if (!hasBeenPulled(in)) pull(in)
}
})
}
}
}
override def shape: FanOutShape2[T, U, V] = new FanOutShape2(in, outThen, outElse)
}
with a an example usage like…
def mapAsyncWithAcceptOrErrorFlow[T, R](parallelism: Int)(effect: T => Future[R])(implicit ec: ExecutionContext): Graph[FanOutShape2[T, (T, R), (T, Throwable)], NotUsed] = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val flowOperation: FlowShape[T, (T, Either[R, Throwable] with Product with Serializable)] = b.add(Flow[T].mapAsync(parallelism) { t => effect(t).map(r => (t, Left(r))).recoverWith{ case error: Throwable => Future(t, Right(error))} })
val ifThen: FanOutShape2[(T, Either[R, Throwable]), (T, R), (T, Throwable)] = b.add(new IfThenElseMapFlow[(T, Either[R, Throwable]), (T, R), (T, Throwable)]( _._2.isLeft, {case (t, Left(r)) => (t, r)}, {case (t, Right(r)) => (t, r)}))
flowOperation ~> ifThen.in
new FanOutShape2(flowOperation.in, ifThen.out0, ifThen.out1)
}
}
Perhaps there is a better way to do this with streams?
Please advice,
Muthu