Skip to content

Commit 3493380

Browse files
authored
Add optional timeout_ms kwarg to remaining consumer/coordinator methods (#2544)
1 parent 1bd6573 commit 3493380

File tree

6 files changed

+101
-70
lines changed

6 files changed

+101
-70
lines changed

kafka/consumer/fetcher.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,21 @@ def send_fetches(self):
135135
self._clean_done_fetch_futures()
136136
return futures
137137

138-
def reset_offsets_if_needed(self, partitions):
138+
def reset_offsets_if_needed(self, partitions, timeout_ms=None):
139139
"""Lookup and set offsets for any partitions which are awaiting an
140140
explicit reset.
141141
142142
Arguments:
143143
partitions (set of TopicPartitions): the partitions to reset
144+
145+
Raises:
146+
KafkaTimeoutError if timeout_ms provided
144147
"""
148+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets')
145149
for tp in partitions:
146150
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
147151
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
148-
self._reset_offset(tp)
152+
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
149153

150154
def _clean_done_fetch_futures(self):
151155
while True:
@@ -160,7 +164,7 @@ def in_flight_fetches(self):
160164
self._clean_done_fetch_futures()
161165
return bool(self._fetch_futures)
162166

163-
def update_fetch_positions(self, partitions):
167+
def update_fetch_positions(self, partitions, timeout_ms=None):
164168
"""Update the fetch positions for the provided partitions.
165169
166170
Arguments:
@@ -169,7 +173,9 @@ def update_fetch_positions(self, partitions):
169173
Raises:
170174
NoOffsetForPartitionError: if no offset is stored for a given
171175
partition and no reset policy is available
176+
KafkaTimeoutError if timeout_ms provided.
172177
"""
178+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
173179
# reset the fetch position to the committed position
174180
for tp in partitions:
175181
if not self._subscriptions.is_assigned(tp):
@@ -182,12 +188,12 @@ def update_fetch_positions(self, partitions):
182188
continue
183189

184190
if self._subscriptions.is_offset_reset_needed(tp):
185-
self._reset_offset(tp)
191+
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
186192
elif self._subscriptions.assignment[tp].committed is None:
187193
# there's no committed position, so we need to reset with the
188194
# default strategy
189195
self._subscriptions.need_offset_reset(tp)
190-
self._reset_offset(tp)
196+
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
191197
else:
192198
committed = self._subscriptions.assignment[tp].committed.offset
193199
log.debug("Resetting offset for partition %s to the committed"
@@ -216,14 +222,15 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
216222
offsets[tp] = offsets[tp].offset
217223
return offsets
218224

219-
def _reset_offset(self, partition):
225+
def _reset_offset(self, partition, timeout_ms=None):
220226
"""Reset offsets for the given partition using the offset reset strategy.
221227
222228
Arguments:
223229
partition (TopicPartition): the partition that needs reset offset
224230
225231
Raises:
226232
NoOffsetForPartitionError: if no offset reset strategy is defined
233+
KafkaTimeoutError if timeout_ms provided
227234
"""
228235
timestamp = self._subscriptions.assignment[partition].reset_strategy
229236
if timestamp is OffsetResetStrategy.EARLIEST:
@@ -235,7 +242,7 @@ def _reset_offset(self, partition):
235242

236243
log.debug("Resetting offset for partition %s to %s offset.",
237244
partition, strategy)
238-
offsets = self._retrieve_offsets({partition: timestamp})
245+
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)
239246

240247
if partition in offsets:
241248
offset = offsets[partition].offset
@@ -263,11 +270,14 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None):
263270
retrieved offset, timestamp, and leader_epoch. If offset does not exist for
264271
the provided timestamp, that partition will be missing from
265272
this mapping.
273+
274+
Raises:
275+
KafkaTimeoutError if timeout_ms provided
266276
"""
267277
if not timestamps:
268278
return {}
269279

270-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find coordinator')
280+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout fetching offsets')
271281
timestamps = copy.copy(timestamps)
272282
while True:
273283
if not timestamps:

kafka/consumer/group.py

+45-39
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import socket
66
import time
77

8-
from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
8+
from kafka.errors import KafkaConfigurationError, KafkaTimeoutError, UnsupportedVersionError
99

1010
from kafka.vendor import six
1111

@@ -18,6 +18,7 @@
1818
from kafka.metrics import MetricConfig, Metrics
1919
from kafka.protocol.list_offsets import OffsetResetStrategy
2020
from kafka.structs import OffsetAndMetadata, TopicPartition
21+
from kafka.util import timeout_ms_fn
2122
from kafka.version import __version__
2223

2324
log = logging.getLogger(__name__)
@@ -521,7 +522,7 @@ def commit_async(self, offsets=None, callback=None):
521522
offsets, callback=callback)
522523
return future
523524

524-
def commit(self, offsets=None):
525+
def commit(self, offsets=None, timeout_ms=None):
525526
"""Commit offsets to kafka, blocking until success or error.
526527
527528
This commits offsets only to Kafka. The offsets committed using this API
@@ -545,9 +546,9 @@ def commit(self, offsets=None):
545546
assert self.config['group_id'] is not None, 'Requires group_id'
546547
if offsets is None:
547548
offsets = self._subscription.all_consumed_offsets()
548-
self._coordinator.commit_offsets_sync(offsets)
549+
self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms)
549550

550-
def committed(self, partition, metadata=False):
551+
def committed(self, partition, metadata=False, timeout_ms=None):
551552
"""Get the last committed offset for the given partition.
552553
553554
This offset will be used as the position for the consumer
@@ -564,6 +565,9 @@ def committed(self, partition, metadata=False):
564565
565566
Returns:
566567
The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
568+
569+
Raises:
570+
KafkaTimeoutError if timeout_ms provided
567571
"""
568572
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
569573
assert self.config['group_id'] is not None, 'Requires group_id'
@@ -572,10 +576,10 @@ def committed(self, partition, metadata=False):
572576
if self._subscription.is_assigned(partition):
573577
committed = self._subscription.assignment[partition].committed
574578
if committed is None:
575-
self._coordinator.refresh_committed_offsets_if_needed()
579+
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
576580
committed = self._subscription.assignment[partition].committed
577581
else:
578-
commit_map = self._coordinator.fetch_committed_offsets([partition])
582+
commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
579583
if partition in commit_map:
580584
committed = commit_map[partition]
581585
else:
@@ -670,17 +674,13 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
670674
assert not self._closed, 'KafkaConsumer is closed'
671675

672676
# Poll for new data until the timeout expires
673-
start = time.time()
674-
remaining = timeout_ms
677+
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
675678
while not self._closed:
676-
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
679+
records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
677680
if records:
678681
return records
679682

680-
elapsed_ms = (time.time() - start) * 1000
681-
remaining = timeout_ms - elapsed_ms
682-
683-
if remaining <= 0:
683+
if inner_timeout_ms() <= 0:
684684
break
685685

686686
return {}
@@ -695,14 +695,14 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
695695
Returns:
696696
dict: Map of topic to list of records (may be empty).
697697
"""
698-
begin = time.time()
699-
if not self._coordinator.poll(timeout_ms=timeout_ms):
698+
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
699+
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
700700
return {}
701701

702702
# Fetch positions if we have partitions we're subscribed to that we
703703
# don't know the offset for
704704
if not self._subscription.has_all_fetch_positions():
705-
self._update_fetch_positions(self._subscription.missing_fetch_positions())
705+
self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms())
706706

707707
# If data is available already, e.g. from a previous network client
708708
# poll() call to commit, then just return it immediately
@@ -723,9 +723,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
723723
if len(futures):
724724
self._client.poll(timeout_ms=0)
725725

726-
timeout_ms -= (time.time() - begin) * 1000
727-
timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000))
728-
self._client.poll(timeout_ms=timeout_ms)
726+
self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
729727
# after the long poll, we should check whether the group needs to rebalance
730728
# prior to returning data so that the group can stabilize faster
731729
if self._coordinator.need_rejoin():
@@ -734,7 +732,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
734732
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
735733
return records
736734

737-
def position(self, partition):
735+
def position(self, partition, timeout_ms=None):
738736
"""Get the offset of the next record that will be fetched
739737
740738
Arguments:
@@ -748,7 +746,7 @@ def position(self, partition):
748746
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
749747
position = self._subscription.assignment[partition].position
750748
if position is None:
751-
self._update_fetch_positions([partition])
749+
self._update_fetch_positions([partition], timeout_ms=timeout_ms)
752750
position = self._subscription.assignment[partition].position
753751
return position.offset if position else None
754752

@@ -1103,35 +1101,43 @@ def _use_consumer_group(self):
11031101
return False
11041102
return True
11051103

1106-
def _update_fetch_positions(self, partitions):
1104+
def _update_fetch_positions(self, partitions, timeout_ms=None):
11071105
"""Set the fetch position to the committed position (if there is one)
11081106
or reset it using the offset reset policy the user has configured.
11091107
11101108
Arguments:
11111109
partitions (List[TopicPartition]): The partitions that need
11121110
updating fetch positions.
11131111
1112+
Returns True if fetch positions updated, False if timeout
1113+
11141114
Raises:
11151115
NoOffsetForPartitionError: If no offset is stored for a given
11161116
partition and no offset reset policy is defined.
11171117
"""
1118-
# Lookup any positions for partitions which are awaiting reset (which may be the
1119-
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
1120-
# this check first to avoid an unnecessary lookup of committed offsets (which
1121-
# typically occurs when the user is manually assigning partitions and managing
1122-
# their own offsets).
1123-
self._fetcher.reset_offsets_if_needed(partitions)
1124-
1125-
if not self._subscription.has_all_fetch_positions():
1126-
# if we still don't have offsets for all partitions, then we should either seek
1127-
# to the last committed position or reset using the auto reset policy
1128-
if (self.config['api_version'] >= (0, 8, 1) and
1129-
self.config['group_id'] is not None):
1130-
# first refresh commits for all assigned partitions
1131-
self._coordinator.refresh_committed_offsets_if_needed()
1132-
1133-
# Then, do any offset lookups in case some positions are not known
1134-
self._fetcher.update_fetch_positions(partitions)
1118+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
1119+
try:
1120+
# Lookup any positions for partitions which are awaiting reset (which may be the
1121+
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
1122+
# this check first to avoid an unnecessary lookup of committed offsets (which
1123+
# typically occurs when the user is manually assigning partitions and managing
1124+
# their own offsets).
1125+
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())
1126+
1127+
if not self._subscription.has_all_fetch_positions():
1128+
# if we still don't have offsets for all partitions, then we should either seek
1129+
# to the last committed position or reset using the auto reset policy
1130+
if (self.config['api_version'] >= (0, 8, 1) and
1131+
self.config['group_id'] is not None):
1132+
# first refresh commits for all assigned partitions
1133+
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())
1134+
1135+
# Then, do any offset lookups in case some positions are not known
1136+
self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms())
1137+
return True
1138+
1139+
except KafkaTimeoutError:
1140+
return False
11351141

11361142
def _message_generator_v2(self):
11371143
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())

kafka/coordinator/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def group_protocols(self):
163163
pass
164164

165165
@abc.abstractmethod
166-
def _on_join_prepare(self, generation, member_id):
166+
def _on_join_prepare(self, generation, member_id, timeout_ms=None):
167167
"""Invoked prior to each group join or rejoin.
168168
169169
This is typically used to perform any cleanup from the previous
@@ -415,7 +415,8 @@ def join_group(self, timeout_ms=None):
415415
# while another rebalance is still in progress.
416416
if not self.rejoining:
417417
self._on_join_prepare(self._generation.generation_id,
418-
self._generation.member_id)
418+
self._generation.member_id,
419+
timeout_ms=inner_timeout_ms())
419420
self.rejoining = True
420421

421422
# fence off the heartbeat thread explicitly so that it cannot

0 commit comments

Comments
 (0)