Hi using lagom 1.4.11 on kubernetes. Kafka is working all event processors are working fine.
But if I using a topic I get all the time this error.
Consumer interrupted with WakeupException after timeout. Message: null.
My log is here:
2019-03-12T09:47:29.015Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-29, akkaTimestamp=09:47:29.015UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:29.126Z [warn] akka.remote.EndpointReader [sourceThread=application-akka.remote.default-remote-dispatcher-32, akkaTimestamp=09:47:29.126UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fapplication%40*.*.*.*%3A2552-1/endpointWriter/endpointReader-akka.tcp%3A%2F%2Fapplication%40*.*.*.*%3A2552-0, sourceActorSystem=application] - Discarding inbound message to [unknown] in read-only association to [akka.tcp://application@*.*.*.*:2552]. If this happens often you may consider using akka.remote.use-passive-connections=off or use Artery TCP.
2019-03-12T09:47:32.105Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-36, akkaTimestamp=09:47:32.105UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:35.195Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-52, akkaTimestamp=09:47:35.194UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:38.285Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-54, akkaTimestamp=09:47:38.284UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:41.375Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-56, akkaTimestamp=09:47:41.375UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:44.465Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-58, akkaTimestamp=09:47:44.465UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:47.555Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-59, akkaTimestamp=09:47:47.554UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:50.646Z [warn] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-24, akkaTimestamp=09:47:50.645UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2019-03-12T09:47:53.740Z [error] akka.kafka.KafkaConsumerActor [sourceThread=application-akka.kafka.default-dispatcher-28, akkaTimestamp=09:47:53.738UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/system/kafka-consumer-1, sourceActorSystem=application] - WakeupException limit exceeded, stopping.
2019-03-12T09:47:53.755Z [error] com.lightbend.lagom.internal.broker.kafka.KafkaSubscriberActor [sourceThread=application-akka.actor.default-dispatcher-19, akkaTimestamp=09:47:53.754UTC, akkaSource=akka.tcp://application@10.233.97.135:2552/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
2019-03-12T09:47:53.772Z [error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-19, akkaTimestamp=09:47:53.765UTC, akkaSource=akka://application/user/KafkaBackoffConsumer1-topic-item-created/KafkaConsumerActor1-topic-item-created, sourceActorSystem=application] - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:230)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:198)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:198)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:454)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:468)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:563)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:745)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:760)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:670)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
My topic is here:
topic("topic-item-created", createdItemTopic ).
addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[StorageItemTopic](_.itemId)
),
topic("item-issued-topic", issuedItemTopic )
The called topic is “topic-item-created”.
What could be the problem. Can someone help me?
Thank you