I have a kafka consumer, that gets shutdown after 20s, when it does not receive any messages.
After then, it is no more possible to receive any messages, because the actor got destroyed.
How to prevent, that the consumer gets destroyed after particular time?
I am using the following consumer configuration, that is copied from alpakka website:
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 50ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 50ms
# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# Prefer use of `DrainingControl` over a large stop-timeout.
stop-timeout = 30s
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
commit-timeout = 15s
# If commits take longer than this time a warning is logged
commit-time-warning = 1s
# Not used anymore (since 1.0-RC1)
# wakeup-timeout = 3s
# Not used anymore (since 1.0-RC1)
# max-wakeups = 10
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite
# Not used anymore (since 1.0-RC1)
# wakeup-debug = true
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms
# Limits the query to Kafka for a topic's position
position-timeout = 5s
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s
# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
}
20:15:08.378 [SAP-SENDER-akka.actor.default-dispatcher-32] DEBUG akka.kafka.internal.KafkaConsumerActor - Received Stop from Actor[akka://SAP-SENDER/system/StreamSupervisor-0/$$a#-730090264], stopping
20:15:08.380 [SAP-SENDER-akka.actor.default-dispatcher-32] DEBUG akka.actor.TimerScheduler - Cancel all timers
20:15:08.380 [kafka-coordinator-heartbeat-thread | SAP-SENDER-GROUP)] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Heartbeat thread has closed
20:15:08.380 [SAP-SENDER-akka.kafka.default-dispatcher-42] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Sending LeaveGroup request to coordinator sweetsoft:9092 (id: 2147483647 rack: null)
20:15:08.382 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] LeaveGroup request returned successfully
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
20:15:08.383 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
20:15:08.384 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-sent
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-received
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.latency
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency
20:15:08.385 [SAP-SENDER-akka.kafka.default-dispatcher-42] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAP-SENDER-GROUP)] Kafka consumer has been closed
20:15:08.387 [SAP-SENDER-akka.actor.default-dispatcher-32] INFO akka.actor.RepointableActorRef - Message [akka.kafka.KafkaConsumerActor$Stop$] from Actor[akka://SAP-SENDER/system/StreamSupervisor-0/$$a#-730090264] to Actor[akka://SAP-SENDER/system/kafka-consumer-1#363756220] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://SAP-SENDER/system/kafka-consumer-1#363756220]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.