i’m about to deploy my lagom services to a Kubernetes Cluster. I’m using lagom version 1.4.0
I have two services one which produces records to a kafka topic and another which consumes records from the kafka topic.
The producer service can start successfully and connect to kafka (i can see following message in the logs: KafkaSubscriberActor - Kafka service [_server._tcp.kafka-svc.default.svc.cluster.local] located at URI [10.100.118.126:9092]) and produce some records.
The consumer service as well can start successfully and connect to kafka (i can see following message in the logs: KafkaSubscriberActor - Kafka service [_server._tcp.kafka-svc.default.svc.cluster.local] located at URI [10.100.118.126:9092]). But in the logs of the
consumer service i’m getting following errors:
[error] a.k.KafkaConsumerActor - WakeupException limit exceeded, stopping.
[error] c.l.l.i.b.k.KafkaSubscriberActor - Topic subscription interrupted due to failure: [akka.kafka.ConsumerFailed: Consumer actor terminated]
[error] a.a.OneForOneStrategy - Consumer actor terminated
akka.kafka.ConsumerFailed: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:219)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:186)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:186)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:465)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:560)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:757)
I am investigating on this for quite a while already but i dont understand, what the problem is.
Any Ideas what the problem could be and how to solve it?
The two services the producer and the consumer can connect to the kafka endpoint (as shown by the logs), but are unable to produce or consume records to/from kafka.
Here is what i see in the logs of the producer service:
[warn] o.a.k.c.NetworkClient - Error while fetching metadata with correlation id 2 : {UserAccountDeletedTopic=LEADER_NOT_AVAILABLE}
[info] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 0 ms.
[info] o.a.k.c.p.KafkaProducer - Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
[info] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 60000 ms.
[error] a.a.OneForOneStrategy - Failed to update metadata after 60000 ms.
I have seen someone with a similar error recently. It was exactly the same context. Kafka on Kubernetes.
I don’t have the details, but I remember that it was a configuration problem in the Kafka installation. It was solved by restart the installation from scratch.
Can you confirm you can publish and consume from Kafka directly using kafka-console-producer and kafka-console-consumer?
the main problem with WakeupExcepion is that it’s a too high level exception and in itself doesn’t peovide enough information. I find relevant that both Lagom’s producer and subscriber fail but both kafka CLI tools work successfully.
I suspect the error could be caused by how the Kafka brokers have the networking configured. There are a couple of settings host.name and host.name(not 100% sure those are the actual setting names) which cause exactly this trace:
Error while fetching metadata with correlation id 2 : {UserAccountDeletedTopic=LEADER_NOT_AVAILABLE}
You can also have a look at https://github.com/akka/reactive-kafka (and add akka-specific categories to this topic) since I think the error is not specific to lagom.
Interesting. It will be useful if we can have the original config you were using to compare and better understand why this is happening. It may help our future selves and others.
The Kafka client library is rather difficult to use because the poll method doesn’t always honor the given poll timeout an instead just blocks the call. The ”workaround” prescribed by the library id to use this interrupt to trigger the wakeup exc.
We have added additional logging in latest versions of akka-stream-kafka, eg full stack dumps when this happens.
Do you use the latest version and what does the logs look like?
I unfortunetly didn’t see logs messages as the one you wrote.
I spend two days trying to understand what the problem was and i definetly didn’t see logs like the one you suggest.
Regarding the error i posted in my first comment, i suspect that the consumer actor was not able to connect to kafka. What i also saw in the logs was an error message saying NO_LEADER_AVAILABLE. But i couldn’t really find out why this is happening
that messages usually means that client and brokers can’t establish a communication. I’m far from a Kafka expert but it’s common to see a NO_LEADER_AVAILABLE in lagom dev mode (sbt runAll) because the service could have started faster than the embedded kafka broker.
Some users reported seeing the NO_LEADER_AVAILABLE trace infinitely in their staging or PROD environments and that was usually a Kafka setup error (often related to how binding and advertising is setup in the brokers → host.name or advertised.host.name).
the configuration for kafka, that i’m using now is working.
But the strange thing is that, if i start only one pod/kafka broker i will get an error saying NO_LEADER_AVAILABLE. But if a start 3 pods/kafka brokers everything works fine.
This is also something i don’t quite know what is causing this problem, i can also be that the configuration i’m using for kafka doesn’t work with only one broker.
If you don’t see that logging I suspect that you use an older version of akkka-stream-kafka. The latest is now 0.20. Can you try to override that dependency in your build? I think that logging can help in understanding the problem.