Hello.
I’m having some issues with one of my Kafka Consuming actors. The actors job is to consume from multiple Kafka topics, and to store the contents of each message into the database based on what topic it is coming from.
Using Play Framework 2.6
and akka-stream-kafka 0.22
Its set up as such:
val consumerSettings = ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(groupId)
.withClientId(clientId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val sink = Sink.actorRefWithAck(
system.actorOf(StoringActor.props),
onInitMessage = Init,
ackMessage = Ack,
onCompleteMessage = Done,
onFailureMessage = (ex: Throwable) => StreamFailure(ex))
Consumer.plainSource(consumerSettings, Constants.SUBSCRIPTION_ALL_RELEVANT)
.map(rec => KafkaMessage(rec.topic(), rec.value()))
.runWith(sink)(materializer)
Where the StoringActor responds with an Ack after every message is handled.
Storing is done as such:
class StoringActor extends Actor {
private def store(topic: String, value: String, parent: ActorRef): Unit = {
Logger.debug(s"${LocalDateTime.now().toString} - Kafka package on topic: $topic")
topic match {
case myTopic1 =>
Json.parse(value).asOpt[SomeObject] match {
case Some(someObject) =>
Database
.store(someObject)
.map(_ => Ack)
.pipeTo(parent)
case None =>
Logger.error(s"Cannot parse in Topic: $topic. Value: $value")
parent ! Ack
}
case _ => parent ! Ack
}
def receive = {
case KafkaMessage(topic, value) => store(topic, value, sender)
case Init =>
Logger.info("StorageActors Stream Initialized")
sender ! Ack
case StreamFailure(ex) =>
Logger.error("StorageActors stream failed.", ex)
case Done =>
Logger.error("Stream was Completed.")
case _ =>
Logger.error("Unexpected message received in StorageActor ")
sender ! Ack
}
}
My Issue
After a while (can be up to a week) this actor suddenly stops consuming and does not throw any apparent errors. Just as if one package does not respond with an Ack, and the system gets stuck in a position of waiting for a reply.
I have been looking at introducing an actor supervisor, but in my case I can’t react to an error to be thrown.
Is there any methods to check if a running actor is responsive? And to restart it if it stops responding? I imagine I need some sort of a ping system from a supervisor to check the health of the actor every n
minutes.
Any help or tips is greatly appreciated.
Thanks!