Thats nice, but I’m unsure how I can now react (restart) on errors that probably after not existing anymore after a restart (connection problems, …).
I would like to use the RestartSource but a DrainingControl is not a Source or CompletionStage.
You’re right, there is no good way to get the materialized value out of the RestartSource. One way of getting access to the latest materialized control is to have an AtomicReference which is populated from the inner stream creation logic in mapMaterializedValue as shown in the docs: https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html
And additionally I add a shut down hook with a CoordinatedShutdown which terminates the Draining Control in phase service-unbind (with phase default configuration).
I’m little bit unsure which phase is the best. I get now failed graph stages on shut down.
I am new with Alpakka. And I am trying to use RestartSource with DrainingControl.apply. But I get a compile error below. I can do away with that line but I’m worried that my flow will not drain properly. Basically my flow is a consumer that goes to a producer. How can I ensure that the flow will drain but wrapping it in a RestartSource?
Are you sure you need a RestartSource? For most use cases it is not necessary.
When using RestartSource you would need to create the DrainingControl more explicitly as the stream doesn’t get access to the “latest” materialized value from the consumer source.
You need to pass in the value from the AtomicReference and the sink’s stream completion future to create the DrainingControl.
I have some business logic in between the Consumer and Producer stage that might cause an exception. I like to automatically restart the flow with a backoff if that happens. My output is to the Producer so would I still need a materialized value?
I assumed that I still needed it and I did what you recommended and it compiled. Thanks a lot for your help. I still have an issue with getting results for unit tests using testcontainers. But I will post it as a separate ticket. Thanks again.
val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
val result: Future[Foo] = RestartSource
.onFailuresWithBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () =>
\\ you source here
}
.runWith(dbSink)
val drainingControl = DrainingControl(Tuple2(control.get(), result))
You need to construct the DrainingControl with the latest Control when you want to shut down the stream.
Passing in Consumer.NoopControl as @NullPointerException suggested is useful so there is a valid value until the consumer is established.
But don’t create the DrainingControl immediately after creating the stream as that will most likely catch the NoopControl. Instead, you may use Control.drainAndShutdown and pass in the stream completion signal to it.
Am I correct in using RestartSource? I use explicit commit of offset as I am doing something with the message from the kafka source. I want to repeat in case something fails. Will my code be re-triggerred if I fail to commit the offset, even if I don’t use RestartSource?
If I am correct in using RestartSource, isn’t mapMaterialized the place where I should create the DrainingControl? I am not sure how to pass the stream completion. Wouldn’t calling drainAndShutdown() in the code below work?
val control = new AtomicReference[Control](Consumer.NoopControl)
val restartSource =
RestartSource
.onFailuresWithBackoff() {
// logic here
}
val drainingControl = restartSource
.toMat({
Committer.sink(openedCommitterSettings)
})(
Keep.right
)
.mapMaterializedValue(f => {
DrainingControl.apply(Tuple2(control.get(), f))
})
.run()
drainingControl.drainAndShutdown()
I haven’t gotten a reply from you. I experience a very rare problem with my implementation that I can’t pinpoint the cause. So I returned to this thread. Is this what you are recommending?