Kafka Draining Control with restart mechanism

Hey guys,

I’m working with alpakka (slick,cassandra,kafka).
For consuming a Kafka topic I use the Draining Control combined with the new Committer Sink.
https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#committer-sink

Thats nice, but I’m unsure how I can now react (restart) on errors that probably after not existing anymore after a restart (connection problems, …).
I would like to use the RestartSource but a DrainingControl is not a Source or CompletionStage.

Whats the trick? :smiley:

Best
Sigurd

Hi Sigurd,

Sorry for not answering earlier.

You’re right, there is no good way to get the materialized value out of the RestartSource. One way of getting access to the latest materialized control is to have an AtomicReference which is populated from the inner stream creation logic in mapMaterializedValue as shown in the docs: https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html

I hope that helps,
Enno.

2 Likes

Thank you for that hint, thats interesting.

I currently use a decider function to restart the source.

akka.japi.function.Function<Throwable, Supervision.Directive> decider = exc -> {
            if (exc instanceof CoordinatorNotAvailableException) {
                return Supervision.restart();
            }
            else if (exc instanceof RetriableException){
                return Supervision.restart();
            }
            else if (exc instanceof java.util.concurrent.TimeoutException){
                return Supervision.restart();
            }
            else{
                return Supervision.stop();
            }
        };

And additionally I add a shut down hook with a CoordinatedShutdown which terminates the Draining Control in phase service-unbind (with phase default configuration).
I’m little bit unsure which phase is the best. I get now failed graph stages on shut down.

Hi Enno,

I am new with Alpakka. And I am trying to use RestartSource with DrainingControl.apply. But I get a compile error below. I can do away with that line but I’m worried that my flow will not drain properly. Basically my flow is a consumer that goes to a producer. How can I ensure that the flow will drain but wrapping it in a RestartSource?

error [error] found : akka.stream.scaladsl.RunnableGraph[akka.kafka.scaladsl.Consumer.DrainingControl[akka.Done]]
[error] required: akka.stream.scaladsl.Source[?, _]
[error] .mapMaterializedValue(DrainingControl.apply)

Hi @nur

Are you sure you need a RestartSource? For most use cases it is not necessary.

When using RestartSource you would need to create the DrainingControl more explicitly as the stream doesn’t get access to the “latest” materialized value from the consumer source.
You need to pass in the value from the AtomicReference and the sink’s stream completion future to create the DrainingControl.

Cheers,
Enno.

Hi @Enno,

I have some business logic in between the Consumer and Producer stage that might cause an exception. I like to automatically restart the flow with a backoff if that happens. My output is to the Producer so would I still need a materialized value?

I assumed that I still needed it and I did what you recommended and it compiled. Thanks a lot for your help. I still have an issue with getting results for unit tests using testcontainers. But I will post it as a separate ticket. Thanks again.

Regards,

Are you looking for something like this?

  val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)

  val result: Future[Foo] = RestartSource
    .onFailuresWithBackoff(
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ) { () =>
      \\ you source here
    }
    .runWith(dbSink)

  val drainingControl = DrainingControl(Tuple2(control.get(), result))

I use a committable source. So this is what I did.

restartSource
      .toMat({
        Committer.sink(committerSettings)
      })(
        Keep.right
      )
      .mapMaterializedValue(f => {
        DrainingControl.apply(Tuple2(control.get(), f))
      })
      .run()

You need to construct the DrainingControl with the latest Control when you want to shut down the stream.
Passing in Consumer.NoopControl as @NullPointerException suggested is useful so there is a valid value until the consumer is established.

But don’t create the DrainingControl immediately after creating the stream as that will most likely catch the NoopControl. Instead, you may use Control.drainAndShutdown and pass in the stream completion signal to it.

Cheers,
Enno.

Hi @ennru,

Am I correct in using RestartSource? I use explicit commit of offset as I am doing something with the message from the kafka source. I want to repeat in case something fails. Will my code be re-triggerred if I fail to commit the offset, even if I don’t use RestartSource?

If I am correct in using RestartSource, isn’t mapMaterialized the place where I should create the DrainingControl? I am not sure how to pass the stream completion. Wouldn’t calling drainAndShutdown() in the code below work?

    val control = new AtomicReference[Control](Consumer.NoopControl)

    val restartSource = 
      RestartSource
        .onFailuresWithBackoff() { 
          // logic here
        }
    

    val drainingControl = restartSource
      .toMat({
        Committer.sink(openedCommitterSettings)
      })(
        Keep.right
      )
      .mapMaterializedValue(f => {
        DrainingControl.apply(Tuple2(control.get(), f))
      })
      .run()

    drainingControl.drainAndShutdown()

Hi @ennru,

I haven’t gotten a reply from you. I experience a very rare problem with my implementation that I can’t pinpoint the cause. So I returned to this thread. Is this what you are recommending?

val drainingControl = restartSource
.toMat({
Committer.sink(openedCommitterSettings)
})(
Keep.right
)
.mapMaterializedValue(f => {
control.get(f).drainAndShutdown)
})
.run()

Regards,

hi @ennru / @nur
I am facing class cast exception when using the above solution

streamCompletion =
(CompletionStage) RestartSource.onFailuresWithBackoff()

Caused by: java.lang.ClassCastException: akka.NotUsed$ cannot be cast to java.util.concurrent.CompletionStage

If i am not casting then facing issue at compile time.
Please share the correct example to use.