So my problem is when i starting my kafka consumer stream, after consuming some records AbstractCoordinator sends LeaveGroup request to coordinator and then Consumer is stopping, why it happens?
[INFO ] [26/12/2019 13:25:45] [] o.a.k.c.c.i.AbstractCoordinator | [Consumer clientId=consumer-1, groupId=product-projector] Sending LeaveGroup request to coordinator 10.48.2.82:9092 (id: 2147483645 rack: null)
[INFO ] [26/12/2019 13:25:45] [akka://productProjector/system/kafka-consumer-1] a.a.RepointableActorRef | Message [akka.kafka.KafkaConsumerActor$Stop$] from Actor[akka://productProjector/system/StreamSupervisor-0/$$a#246015084] to Actor[akka://productProjector/system/kafka-consumer-1#1371580266] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://productProjector/system/kafka-consumer-1#1371580266]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
My consumer config:
consumer {
# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 50ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 50ms
# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# Prefer use of `DrainingControl` over a large stop-timeout.
stop-timeout = 30s
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
commit-timeout = 15s
# If commits take longer than this time a warning is logged
commit-time-warning = 1s
# Not used anymore (since 1.0-RC1)
# wakeup-timeout = 3s
# Not used anymore (since 1.0-RC1)
# max-wakeups = 10
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite
# Not used anymore (since 1.0-RC1)
# wakeup-debug = true
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = ${KAFKA}
# Disable auto-commit by default
enable.auto.commit = false
}
# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms
# Limits the query to Kafka for a topic's position
position-timeout = 5s
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s
# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
# Interval for checking that transaction was completed before closing the consumer.
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms
}