I need to collect a metric on how much time each event spent in a certain flow. The main use-case is to collect the time the ElasticsearchFlow spent writing each element.
I’ve had a look at the contrib module which has some timing-flow, but unfortunately not covering my use-case. Then I came accross this LatencyMonitor which is almost what I need, with the only exception that it is collecting averages instead of actual “response” times.
Furthermore I need to only monitor response times which have been written successfully (because only those are relevant for the metric histogram)
So I started my own custom flow which wraps another flow. Now I am facing two challenges:
Flow-API: how to pass some TimerContext from a startFlow to the endFlow
GraphStage: how to get the actual output-value from the wrapped flow in order to check the condition
Regarding 1: I first thought to just pass a Tuple2 including the Timer, but since I have no control over the types in the ElasticsearchFlow this doesnt work.
Regarding 2: just using the handlers onPush and onPull would not yield correct result since onPull is called once the sink signaled demand (which can be much later than when the wrapped flow was finished), and I havent found a way the get the out-value of the wrapped flow.
Another idea might be the use of the Graph-DSL, which splits and merges the flow, meaning that the TimerContext would move in paralell to the wrapped flow and the merge would do the logic. But this seems quite heavy for a “simple” timer.
Did I miss anything in the existing API? Is this even possible, or am I conceptually wrong here?
I was able to finish my implementation, though I am not 100% certain if my logic will handle backpressure correct. I am willing to contribute my solution to the Akka Contrib module if there is any interest in this by the community. At the moment I only support a Scala API and would need to add the Java counterpart.
At the moment the usage looks like this
import com.stuff.akka.streams.Implicits._
Elasticsearch.prepareElasticsearchFlow
.measureLatency { result =>
if (result.outcome.success) {
... push result to metrics backend with "result.measuredTime"
}
}
.map ....continue with result from wrapped flow (Elasticsearch result)
in addition to the timer, the result also contains the wrapped flow result in order to add additional logic
Just saw that akka-stream-contrib got a commit at the end of January saying it will no longer be released
I think it is a valuable addition to the akka ecosystem. For instance, I am using the Retry-operator which works pretty well.
Can anyone comment on this? @ennru?
Still feedback on the operator would be welcome even though I will have to provide it as an internal library in my company.
Note: I already talked to Lighbend about the subscription (since it includes Telemetry) but we are currently in an very early stage and paying the bill for just one PoC project is not yet going to happen. Though we are having another project that might use Akka Streams for real…