Custom GraphStage

Hi,

I’m thinking about using a custom GraphStage like the following:

case class MapAsyncErrorUnordered[In, Out](parallelism: Int, f: In => Future[Out])
extends GraphStage[FanOutShape2[In, Out, Throwable]]

So if Future is success an element will be generated in output 1 and, if Future ends with failure, a Throwable wil be generated in output 2. This would allow me to apply different pipeline for errors and successful cases.

If I go this way like I sketched up here custom-shapes/src/main/scala/shapes/MapAsyncErrorUnordered.scala at main · bcoromina/custom-shapes · GitHub, looks like I break encapsulation of graph component so I should test by myself that I’m not breaking things like correct backpreasure handling in my custom shape.

If you do think this is the way to go I would appreciate some advice for testing the new shape, otherwise, maybe you could suggest a better aproach.

Thanks in advance,

Bernat

Note that it is relatively straight forward to achieve this without a custom graph stage.

Combine existing stream operators. Start mapAsyncUnordered but wrap the In => Future[Out] lambda and returned future with a function that “lifts” future errors to Scala In => Future[Try[T]], a partition that directs error and success down different downstream and finally a map in each branch to unwrap the Try[T] into T in the success branch and Throwable in the error branch.

A custom graph stage could be an interesting exercise but I would recommend that unless a composed solution is tried out and turns out to be a bottle neck or have too much overhead.

Thanks a lot for your answer!

I would like to build something similar than this:

where CustomFlow1 can produce errors. You suggest to model CustomFlow1with one Input and one Output, like In => Future[Try[T]]. The problem I see with that is that in case of CustomFlow1 producing an Error I would like not to call CustomFlow2. With In => Future[Try[T]], if I’ve understood well, I will be forced to check is income is error in each stage of the pipeline forceing a message to be sent accross multiple async boundaries. If I could implement a graph like it appears in the picture I would have a separate success path at graph level.
And separate error handling logic from the success path.

Does any of this makes sense?

With the suggested composed operator, and probably also if you go with a custom stage, each of those “custom flow” boxes would be one instance of it, so that it funnels successes on, but routes the errors down to the error handling subflow.

A composed operator could be something like this (compiles but I didn’t test using it):

def partitionErrors[In, Out](f: In => Future[Out]): Graph[FanOutShape2[In, Out, Throwable], NotUsed] = 
  GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val liftedFlow =
      builder.add(Flow[In].mapAsync(1)(in => f(in).transform(tryIn => Success(tryIn))(ExecutionContext.parasitic)))
    val partition = builder.add(Partition[Try[Out]](2, {
      case Success(_) => 0
      case Failure(_) => 1
    }))

    liftedFlow.out ~> partition.in
    val successOut = partition.out(0).map(_.get)
    val throwableOut = partition.out(1).map(_.failed.get)

    new FanOutShape2(liftedFlow.in, successOut.outlet, throwableOut.outlet)
  }
1 Like

Now I get what ‘partition’ from your fisrt answer means! Thanks!