Hi,
I’ve been testing my lagom microservice in insolation, and every now and then, I place an incorrect message into my kafka event (i.e instead of publishing a properly formatted JSON event compliant to my event POJO, i publish a newline). (Note: If it’s just regular lagom to lagom communication, none of these issues happen)
That causes the following stacktrace
09:46:16.445 [error] akka.kafka.KafkaConsumerActor - Exception when polling from consumer
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition bitso-order-req-topic-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: com.lightbend.lagom.javadsl.api.deser.DeserializationException: No content to map due to end-of-input
at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:129)
at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:117)
at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslKafkaDeserializer.deserialize(KafkaSerializers.scala:21)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:268)
at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:314)
at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:173)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:75)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1355)
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:358)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1596)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1188)
at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:127)
... 25 common frames omitted
09:46:16.450 [warn] com.mycompany.MySubscriberImpl - Got an error so we are replacing it with a dummy EVENT object
09:46:16.451 [error] com.lightbend.lagom.internal.broker.kafka.KafkaSubscriberActor - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
09:46:16.452 [error] akka.actor.OneForOneStrategy - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:468)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
at akka.actor.ActorCell.invoke(ActorCell.scala:557)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
The problem then is that this invalid kafka message keeps getting replayed over and over again to my subscriber. What I would want to happen is to skip the message altogether (and log the error) and start processing the new one.
How can I do that?
Thanks,
Franz