val flow = Flow[Int].map(_ * 17)
Source(List(1,2,3,4))
.map { x =>
// print only multiples of 3
if (x % 3 == 0) println(s"incoming: $x")
x
}
.via(overleaping(flow))
.map { case (x, str) =>
// print only result derived from multiples of 3:
if (x % 3 == 0) println(s"result for $x: $str")
str
}
.to(Sink.seq)
If there is any other approach I should consider for this, I’d be fine with that too.
Second: If the Flow is not a simple map (for example a mapConcat, a filter or something that can drop/generate elements), then you will get a huge deadlock. Read this or this for more info (btw the second link is mostly your case I think).
@tg44 real use-case is this: I have an akka-http app which is set up sth like this:
val routes: Route = ???
val fullFlow = Flow[HttpRequest]
.map { req =>
if (!isInternal(req)) logRequest(req)
req
}
.via(routes)
.map { resp =>
// HERE
// i want request as well as response
// to make sure i don't log responses for internal requests
// "internal request" in my case is e.g. a load balancer health check
// which are happening pretty often so i don't want to clutter the logs
// with useless data
}
as you can see, there is no dropping etc, so it’s pretty straightforward in this case.
The problem here is that Route is a RequestContext ⇒ Future[RouteResult], which is implicitly converted into Flow[HttpRequest, HttpResponse, NotUsed] by http internals.
In theory i could just reimplement it like this:
val fullFlow = Flow[HttpRequest]
.map { req =>
if (!isInternal(req)) logRequest(req)
req
}
.mapAsync(1)(req => handle(req).map(resp => req -> resp))
.map { case (req, resp) =>
if (!isInternal(req)) logRequest(req)
resp
}
where handle is replicating internal logic of converting Route into a HttpRequest => Future[HttpResponse], but I don’t like the idea of a) going on a lower level here and b) reimplementing what is already implemented in the internals.
Since Route is a [kind of] “classic” function (at least it’s total in some sense), I don’t see a possibility of anything weird happening, so it should be fine.
Hmm. Thats sad… There were more talking about solving these kind of use-cases, but nothing implemented as I know…
I think there will be no better solution for this problem then, and the broadcast-zip will not be a bottleneck in this case.
(BTW if this is http you can signal the “internality” as “custom” headers too (every healthcheck request gets a specific header for ex), so you don’t need the reqest to solve the isItInternal? problem.)
Oh, cool, makes sense in context of commottable messages as well. Thanks for this pointer! Definitely won’t hurt having this in contrib, I’ll probably open a P.R. with this.