Skip to content

Commit 2abf2cd

Browse files
authored
KIP-207: Add ListOffsetsRequest v5 / handle OffsetNotAvailableError (#2657)
1 parent 93bcdde commit 2abf2cd

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

kafka/consumer/fetcher.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ def _group_list_offset_requests(self, timestamps):
508508
return dict(timestamps_by_node)
509509

510510
def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
511-
version = self._client.api_version(ListOffsetsRequest, max_version=4)
511+
version = self._client.api_version(ListOffsetsRequest, max_version=5)
512512
if self.config['isolation_level'] == 'read_committed' and version < 2:
513513
raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2')
514514
by_topic = collections.defaultdict(list)
@@ -521,14 +521,14 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs):
521521
data = (tp.partition, timestamp, 1)
522522
by_topic[tp.topic].append(data)
523523

524-
if version <= 1:
524+
if version >= 2:
525525
request = ListOffsetsRequest[version](
526526
-1,
527+
self._isolation_level,
527528
list(six.iteritems(by_topic)))
528529
else:
529530
request = ListOffsetsRequest[version](
530531
-1,
531-
self._isolation_level,
532532
list(six.iteritems(by_topic)))
533533

534534
# Client returns a future that only fails on network issues
@@ -588,7 +588,9 @@ def _handle_list_offsets_response(self, future, response):
588588
" message format version is before 0.10.0", partition)
589589
elif error_type in (Errors.NotLeaderForPartitionError,
590590
Errors.ReplicaNotAvailableError,
591-
Errors.KafkaStorageError):
591+
Errors.KafkaStorageError,
592+
Errors.OffsetNotAvailableError,
593+
Errors.LeaderNotAvailableError):
592594
log.debug("Attempt to fetch offsets for partition %s failed due"
593595
" to %s, retrying.", error_type.__name__, partition)
594596
partitions_to_retry.add(partition)

0 commit comments

Comments
 (0)