I´m using Kakfa-client with Akka stream, and even having a Supervisor Strategy defined in my materializer
materializer = ActorMaterializer(createActorMaterializerSettings(strategyDecider, inputBuffer, system))
private def createActorMaterializerSettings(strategyDecider: function.Function[Throwable, Supervision.Directive], inputBuffer: Int, system: ActorSystem) = {
ActorMaterializerSettings(system)
.withDispatcher("akka.stream-sink-dispatcher")
.withSupervisionStrategy(toScalaFunction(strategyDecider))
.withInputBuffer(initialSize = 1, maxSize = inputBuffer)
}
When I receive something that I cannot deserialize
akka stream die org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition
My stream dont use the supervisor strategy and die.
Consumer.plainSource(buildConsumerSettings(consumerConfig),Subscriptions.topics(topicName))
As a workaround how about back off strategy. https://doc.akka.io/docs/akka-stream-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage
or create the KafkaConusumerActor by myself and the subscribe in the pipeline to the topic as they do here https://github.com/kciesielski/reactive-kafka/blob/7dd29719a5d1c6eb70584e67bf9f4dbd7b6d2b39/docs/src/test/scala/sample/scaladsl/ConsumerExample.scala#L328
Then maybe the supervisor decision works.