I think the answer is to handle errors inside of a flatMapConcat
val flow = Flow[String]
.forkJoin(
Flow[String]
.flatMapConcat { item =>
Source.single(item)
.map { str => str.toInt }
.map(Success.apply)
.recover { case NonFatal(e) => Failure(e) }
},
Flow[String].flatMapConcat { item =>
Source.single(item)
.map { _.substring(0, 2) }
.map(Success.apply)
.recover { case NonFatal(e) => Failure(e) }
})
.collect { case (Success(int), Success(pre)) => (int, pre) }
and simplify the forkJoin flow itself:
def forkJoinFlow[T, U, V](flow1: Flow[T, U, NotUsed],
flow2: Flow[T, V, NotUsed]): Flow[T, (U, V), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[T](2))
val zip = b.add(Zip[U, V]())
bcast ~> flow1 ~> zip.in0
bcast ~> flow2 ~> zip.in1
FlowShape(bcast.in, zip.out)
})
However, this means that now each result is being split into its own stream. Since each item is its own stream, the inner flows cannot aggregate results.
Optionally, you can group around the forkJoin. But I still think that the flows downstream from broadcast should be able to recover from failed futures and thrown exceptions.