0

I need a help to resolve a situation that i have with the Java client (version 3.7.0) which i'm not sure what the best way is.

I'm manually assigning a list of partitions to the consumer. And i want to move to the end of each partition before i start consuming message. Additionaly, i have AUTO_OFFSET_RESET_CONFIG="latest".

public void subscribe(final String topic) {
  final ArrayList<TopicPartition> partitions = new ArrayList<>();
  for (int i = 0; i < numberOfPartitions; i++) {
    partitions.add(new TopicPartition(topic, i));
  }
  consumer.assign(partitions);
  consumer.seekToEnd(partitions);
}

What happens is that the consumer starts consuming, and then after few moments it resets the offset to the last position and skips some messages that the producer has put out in the meantime.

The logs are like this:

[main] INFO org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer - 
[Consumer clientId=consumer-1, groupId=consumer-1] Assigned to partition(s): partition-0, partition-1, partition-2, partition-3
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Seeking to latest offset of partition partition-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Seeking to latest offset of partition partition-1
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Seeking to latest offset of partition partition-2
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Seeking to latest offset of partition partition-3

************************
Messages consumed: 0
************************
Messages consumed: 0

[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Resetting offset for partition partition-3 to position FetchPosition{offset=11275, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Resetting offset for partition partition-1 to position FetchPosition{offset=7891, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Resetting offset for partition partition-2 to position FetchPosition{offset=9996, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-1, groupId=consumer-1] Resetting offset for partition partition-0 to position FetchPosition{offset=11993, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

Looking at the source code, when doing first .poll(milliseconds) it will invoke:

public void resetPositionsIfNeeded() {
 Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();

 if (offsetResetTimestamps.isEmpty())
     return;

 resetPositionsAsync(offsetResetTimestamps);
}

which is executed async.

This async switch to latest causes me to miss some messages from time to time, which i cannot do as i'm writing tests. What is the best way to avoid this if someone has the best solution? Setting consumer.poll(5000) after consumer.seekToEnd() seems to fix it, but it adds a fixed wait, and i'm not sure will it work 100% consistent. Thanks in advance!

0

Browse other questions tagged or ask your own question.