Akka Streams recreate stream in case of stage failure

I have very simple Akka Streams flow which reads msg from Kafka using alpakka, performs some manipulation on msg and indexes it to Elasticsearch.

I’m using CommitableSource, therefore i’m in At-Least-Once strategy. I commit my offset only when index to ES succeed, if it fails I will read again the message because form latest known offset.

val decider: Supervision.Decider = {
    case _:Throwable =>  Supervision.Restart
    case _           => Supervision.Restart
  }

  val config: Config = context.system.settings.config.getConfig("akka.kafka.consumer")

  val flow: Flow[CommittableMessage[String, String], Done, NotUsed] =
    Flow[CommittableMessage[String,String]].
      map(msg => Event(msg.committableOffset,Success(Json.parse(msg.record.value()))))
    .mapAsync(10) { event => indexEvent(event.json.get).map(f=> event.copy(json = f))}
      .mapAsync(10)(f => {
    f.json match {
      case Success(_)=> f.committableOffset.commitScaladsl()
      case Failure(ex) => throw new StreamFailedException(ex.getMessage,ex)
    }
      })

  val r: Flow[CommittableMessage[String, String], Done, NotUsed] = RestartFlow.onFailuresWithBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 3.seconds,
    randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
    maxRestarts = 20 // limits the amount of restarts to 20
  )(() => {
    println("Creating flow")
    flow
  })

  val consumerSettings: ConsumerSettings[String, String] =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val restartSource: Source[CommittableMessage[String, String], NotUsed] = RestartSource.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 30.seconds,
    randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
    maxRestarts = 20 // limits the amount of restarts to 20
  ) {() =>
    Consumer.committableSource(consumerSettings, Subscriptions.topics("test"))
  }


  implicit val mat: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system).withSupervisionStrategy(decider))



  restartSource
    .via(flow)
    .toMat(Sink.ignore)(Keep.both).run()

What I would like to achieve, is to restart entire flow Source -> Flow-> Sink. If from any reason I was no able to index message in Elastic.

I tried the following:

  • Supervision.Decider - It looks like flow was recreated but no message was pulled from Kafka, obviously because it remembers it offset.
  • RestartSource - doesn’t looks ether, because exception happens in flow stage.
  • RestartFlow - Doesn’t help as well because it restarts only Flow, but I need to restart Source from last successful offset.

Is there any elegant way to do that?