It looks like I’m running into a variation on the problem reported earlier this month in the post Kafka consumer actor stop after Serialization exception. I’m using version 0.22 of akka-stream-kafka_2.12 (the Alpakka Kafka connector).
Out of normal diligence, I wanted to make sure my Kafka consumer was correctly handling an org.apache.kafka.common.errors.SerializationException
. I wanted to make sure that, if it got a bad message that the consumer couldn’t read, it would skip that message and go on to the next one.
I’m adding attributes to my Sources and Flows to apply custom exception handling via Supervision.Decider
. I’m using Consumer.committablePartitionedSource
, and, not knowing which would be the applicable Source
in this case, I made the Supervision.Decider
for both my root source and my per-partition source catch a SerializationException
and do a Supervision.Resume
.
So I went and ran a smoke test that would deliberately trigger a SerializationException
on my value deserializer.
Much to my surprise, the custom supervision strategy for either Source
was bypassed, and instead I got an exception that failed the whole stream:
2018-09-14 10:41:32.188 ERROR 22879 --- [lt-dispatcher-2] akka.kafka.internal.KafkaConsumerActor : Exception when polling from consumer
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition FooInbound-3 at offset 17. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data … from topic [FooInbound]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'random': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"random text"; line: 1, column: 8]
…
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar!/:na]
…
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar!/:na]
at akka.kafka.internal.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:348) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:423) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:311) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:191) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.13.jar!/:2.5.13]
at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:120) ~[akka-stream-kafka_2.12-0.22.jar!/:0.22]
…
Then in my parent vanilla actor that runs my Akka Streams RunnableGraph
, I observed that the Akka Streams execution was failing, due to:
akka.kafka.ConsumerFailed: Consumer actor terminated
I was also observing that the subsequent, valid messages that the Kafka consumer had picked up for that partition in the polling interval were also failing in the same way, and I was also getting a message about dead letters for each of them:
2018-09-14 10:41:32.195 INFO 22879 — [t-dispatcher-35] akka.actor.RepointableActorRef : Message [akka.kafka.internal.KafkaConsumerActor$Internal$RequestMessages] from Actor[akka://FooInboundAlpakkaKafkaConsumer/system/StreamSupervisor-0/$$c#-275126390] to Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://FooInboundAlpakkaKafkaConsumer/system/kafka-consumer-1#696869527]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters’ and ‘akka.log-dead-letters-during-shutdown’.
In my book, the ability to gracefully handle a SerializationException
, and not fail on it, is a basic, table-stakes feature of any Kafka consumer. (I have an alternate Kafka consumer that uses Spring Cloud Stream that I’m able to switch to via configuration. That implementation handles the SerializationException
exactly as one would expect.)
At this point, the only way I can think of to justify going to production with the Alpakka Kafka connector is to configure the akka.kafka.ConsumerSettings
with ByteArrayDeserializer
for the deserializers and do the real deserialization in my own code. Clearly, this is a hack, and I’m not even sure offhand if it’s going to be feasible once I start needing to use the KafkaAvroDeserializer
from Confluent.
Does anyone have any better ideas how to deal with SerializationException
in an Akka Streams Kafka consumer?