Recovering from KafkaSubscriberActor caused by ConsumerFailed

Hi,

I’ve been testing my lagom microservice in insolation, and every now and then, I place an incorrect message into my kafka event (i.e instead of publishing a properly formatted JSON event compliant to my event POJO, i publish a newline). (Note: If it’s just regular lagom to lagom communication, none of these issues happen)

That causes the following stacktrace

09:46:16.445 [error] akka.kafka.KafkaConsumerActor - Exception when polling from consumer
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition bitso-order-req-topic-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: com.lightbend.lagom.javadsl.api.deser.DeserializationException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
	at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:129)
	at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:117)
	at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslKafkaDeserializer.deserialize(KafkaSerializers.scala:21)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:268)
	at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:314)
	at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:173)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:75)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1355)
	at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:358)
	at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1596)
	at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1188)
	at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:127)
	... 25 common frames omitted
09:46:16.450 [warn] com.mycompany.MySubscriberImpl - Got an error so we are replacing it with a dummy EVENT object
09:46:16.451 [error] com.lightbend.lagom.internal.broker.kafka.KafkaSubscriberActor - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
09:46:16.452 [error] akka.actor.OneForOneStrategy - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
	at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
	at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
	at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:468)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
	at akka.actor.ActorCell.invoke(ActorCell.scala:557)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The problem then is that this invalid kafka message keeps getting replayed over and over again to my subscriber. What I would want to happen is to skip the message altogether (and log the error) and start processing the new one.

How can I do that?

Thanks,
Franz

Hi,

You should implement a custom serializer that will return null (or default message) in case of serialization error.
Then processed message has to be checked if null or default and skipped.

Br,
Alan

@aklikic Thanks, that’s an interesting solution.

Btw, I tried using .recover() (which is why you’ll see in the stack trace above the line com.mycompany.MySubscriberImpl - Got an error so we are replacing it with a dummy EVENT object). However, although I can catch see the exception thrown, I cant seem to skip the kafka event. Which means this .recover() logic keeps getting hit with the same erroneous kafka event.

Any way to skip erroneous kafka events?

Can you share your subscriber code?

@aklikic

I had something like this

topic.subscribe().atLeastOnce(Flow.<MyEvent>create() //
        .recover(Throwable.class, () -> { //
          LOGGER.warn("Got an error");
          return DUMMY_EVENT_SIGNALING_THERE_IS_AN_ERROR;
        }).mapAsync(1, event -> {
          LOGGER.debug("Got a request {}", event);
          // logic here...
          return CompletableFuture.completedFuture(Done.getInstance());
        }));

I did not use stream error halding for this case but from the documentation:

recover allows you to emit a final element and then complete the stream on an upstream failure.
I also do not believe recoverWithRetries will not help because you need to provide new upstream.

My advice is to try to handle it in serialization. You can specify withMessageSerializer on your topic definition or override Serializer-factory

Thanks @aklikic!