How actually getOffsetsOnAssign in Consumer.plainPartitionedManualOffsetSource (this mean we keep offset externally) works?
Will consumer always wait for getOffsetOnAssign or it has a timeout?
It seems to me, if getOffsetsOnAssign is called after partitions rebalance and it needs some time to be executed, consumer starts reading from offset 0.
And after that, in log, I can see getOffsetOnAssign returns value but it is allready started from 0 offset.
There are excerpts from the log
//11 offset is saved for topic partition (test-user-event,1)
[JVM-2] 2022-02-17 12:56:22,494 a.p.MongoOffsetStore in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-4 - insertOrUpdate oid ObjectId(fa6a89c7-e245-4ff2-b00b-76ef93374161) insertOrUpdate offset SingleOffset(ProjectionId(user-events-consumer,test-user-events-1),LNG,11,true)
[JVM-2] 2022-02-17 12:56:22,496 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - handle [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Received successful SyncGroup response: SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType='consumer', protocolName='range', assignment=[0, 1, 0, 0, 0, 1, 0, 16, 116, 101, 115, 116, 45, 117, 115, 101, 114, 45, 101, 118, 101, 110, 116, 115, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, -1, -1, -1, -1])
[JVM-2] 2022-02-17 12:56:22,496 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - handle [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Successfully synced group in generation Generation{generationId=4, memberId='consumer-user-events-consumer-2-afbdf4fb-23fe-44e6-9c3e-040383df4a6e', protocol='range'}
[JVM-2] 2022-02-17 12:56:22,497 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - onJoinComplete [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Executing onJoinComplete with generation 4 and memberId consumer-user-events-consumer-2-afbdf4fb-23fe-44e6-9c3e-040383df4a6e
[JVM-2] 2022-02-17 12:56:22,497 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - invokeOnAssignment [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Notifying assignor about the new Assignment(partitions=[test-user-events-0, test-user-events-1])
//rebalancing
[JVM-2] 2022-02-17 12:56:22,497 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - invokePartitionsAssigned [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Adding newly assigned partitions: test-user-events-0, test-user-events-1
[JVM-2] 2022-02-17 12:56:22,498 o.a.k.c.c.KafkaConsumer in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - pause [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Pausing partitions [test-user-events-0, test-user-events-1]
[JVM-2] 2022-02-17 12:56:22,498 a.p.k.i.KafkaSourcedProviderImpl in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - onAssign oid ObjectId(fa6a89c7-e245-4ff2-b00b-76ef93374161) onAssign called Set(test-user-events-0, test-user-events-1) projectionId ProjectionId(user-events-consumer,2)
[JVM-2] 2022-02-17 12:56:22,498 a.p.k.i.KafkaSourcedProviderImpl in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - onAssign oid ObjectId(fa6a89c7-e245-4ff2-b00b-76ef93374161) onAssign called assignedPartitions Set(test-user-events-0, test-user-events-1) projectionId ProjectionId(user-events-consumer,2)
[JVM-2] 2022-02-17 12:56:22,500 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - sendOffsetFetchRequest [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Fetching committed offsets for partitions: [test-user-events-0, test-user-events-1]
[JVM-2] 2022-02-17 12:56:22,500 a.k.i.PlainSubSource$$anon$3$$anon$4 in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-43 - $anonfun$applyOrElse$4 [6b5a8#1] filtering out messages from revoked partitions Set(test-user-events-2)
//getOffsetOnAssign called for offsets
[JVM-2] 2022-02-17 12:56:22,501 a.p.k.i.KafkaSourcedProviderImpl in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-4 - getOffsetsOnAssign$$anonfun$1 oid
ObjectId(fa6a89c7-e245-4ff2-b00b-76ef93374161) getOffsetOnAssign before after assignedTps Set(test-user-events-0, test-user-events-1) call ObjectId(85cc7808-4715-48f7-af6a-741bf5b39777)
[JVM-2] 2022-02-17 12:56:22,501 a.k.i.PlainSubSource$$anon$3$$anon$4 in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-4 - $anonfun$applyOrElse$4 [6b5a8#1] filtering out messages from revoked partitions Set(test-user-events-2)
[JVM-2] 2022-02-17 12:56:22,507 o.a.k.c.c.i.ConsumerCoordinator in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - handle [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Found no committed offset for partition test-user-events-1
[JVM-2] 2022-02-17 12:56:22,508 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - sendListOffsetRequest [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Sending ListOffsetRequest ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='test-user-events', partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, timestamp=-2, maxNumOffsets=1), ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=0, timestamp=-2, maxNumOffsets=1)])]) to broker localhost:9092 (id: 1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,513 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafk
a.default-dispatcher-27 - handleListOffsetResponse [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Handling ListOffsetResponse response for test-user-events-0. Fetched offset 0, timestamp -1
[JVM-2] 2022-02-17 12:56:22,513 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - handleListOffsetResponse [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Handling ListOffsetResponse response for test-user-events-1. Fetched offset 0, timestamp -1
[JVM-2] 2022-02-17 12:56:22,514 o.a.k.c.c.i.SubscriptionState in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - maybeSeekUnvalidated [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Resetting offset for partition test-user-events-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}.
[JVM-2] 2022-02-17 12:56:22,514 o.a.k.c.c.i.SubscriptionState in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - maybeSeekUnvalidated [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Resetting offset for partition test-user-events-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}.
[JVM-2] 2022-02-17 12:56:22,532 a.k.i.KafkaConsumerActor in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-43 - $anonfun$applyOrElse$4 received handled message RequestMessages(0,Set(test-user-events-0)) from Actor[akka://MultiNodeProducerConsumerTest7/system/Materializers/StreamSupervisor-0/$$c#2025299560]
[JVM-2] 2022-02-17 12:56:22,533 a.k.i.KafkaConsumerActor in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-43 - $anonfun$applyOrElse$4 received handled message RequestMessages(0,Set(test-user-events-2)) from Actor[akka://MultiNodeProducerConsumerTest7/system/Materializers/StreamSupervisor-0/$$d#1681821493]
[JVM-2] 2022-02-17 12:56:22,533 a.k.i.KafkaConsumerActor in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-43 - $anonfun$applyOrElse$4 received handled message Poll(akka.kafka.internal.KafkaConsumerActor@27b8c7b3,false) from Actor[akka://MultiNodeProducerConsumerTest7/system/kafka-consumer-1#961770731]
[JVM-2] 2022-02-17 12:56:22,533 o.a.k.c.c.KafkaConsumer in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - pause [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Pausing partitions []
[JVM-2] 2022-02-17 12:56:22,534 o.a.k.c.c.KafkaConsumer in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - resume [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Resuming partitions [test-user-events-0, test-user-events-1]
[JVM-2] 2022-02-17 12:56:22,534 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - prepareFetchRequests [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Added READ_UNCOMMITTED fetch request for partition test-user-events-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}} to node localhost:9092 (id: 1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,534 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - prepareFetchRequests [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Added READ_UNCOMMITTED fetch request for partition test-user-events-1 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}} to node localhost:9092 (id: 1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,535 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - sendFetches [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-user-events-0, test-user-events-1),
toForget=(test-user-events-2), implied=()) to broker localhost:9092 (id: 1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,542 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - onSuccess [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Fetch READ_UNCOMMITTED at offset 0 for partition test-user-events-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=12, lastStableOffset=12, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=2126, buffer=java.nio.HeapByteBuffer[pos=0 lim=2126 cap=4526]))
[JVM-2] 2022-02-17 12:56:22,542 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - onSuccess [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Fetch READ_UNCOMMITTED at offset 0 for partition test-user-events-1 returned fetch data PartitionData(partitionIndex=1, errorCode=0, highWatermark=14, lastStableOffset=14, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=2359, buffer=java.nio.HeapByteBuffer[pos=0 lim=2359 cap=2362]))
[JVM-2] 2022-02-17 12:56:22,545 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - prepareFetchRequests [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Added READ_UNCOMMITTED fetch request for partition test-user-events-0 at position FetchPosition{offset=12, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}} to node localhost:9092 (id: 1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,545 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - prepareFetchRequests [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Added READ_UNCOMMITTED fetch request for partition test-user-events-1 at position FetchPosition{offset=14, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}} to node localhost:9092 (id:
1001 rack: null)
[JVM-2] 2022-02-17 12:56:22,546 o.a.k.c.c.i.Fetcher in MultiNodeProducerConsumerTest7-akka.kafka.default-dispatcher-27 - sendFetches [Consumer clientId=consumer-user-events-consumer-2, groupId=user-events-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-user-events-0, test-user-events-1), toForget=(), implied=()) to broker localhost:9092 (id: 1001 rack: null)
//new consumer record started from offset 0
[JVM-2] 2022-02-17 12:56:22,546 a.p.k.KafkaOffsets$ in MultiNodeProducerConsumerTest7-akka.actor.default-dispatcher-43 - toMergeableOffset consumer record ConsumerRecord(topic = test-user-events, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1645098961719, serialized key size = 36, serialized value size = 70, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2a654a21-1c66-427a-bb0b-c2eb43c41044, value = {"name":"MultiNode Name part=02-id:2-1-node4","age":2,"t":"UserEvent"})
//The moment when external offsets are loaded from Mongo. Topic partition (test-user-events,1) has value 11
[JVM-2] 2022-02-17 12:56:22,626 a.p.MongoOffsetStore in Thread-4 - readOffset$$anonfun$1 oid ObjectId(fa6a89c7-e245-4ff2-b00b-76ef93374161) read offset mergeable Some(MergeableOffset(Map(test-user-events-1 -> 11, test-user-events-0 -> 8, test-user-events-2 -> 8)))
Also asked here: