Skip to content

Commit ee5436f

Browse files
authored
KafkaProducer: Handle UnknownProducerIdError (#2663)
1 parent 9227674 commit ee5436f

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

kafka/producer/sender.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,13 @@ def _fail_batch(self, batch, partition_response):
473473
# about the previously committed message. Note that this will discard the producer id and sequence
474474
# numbers for all existing partitions.
475475
self._transaction_manager.reset_producer_id()
476+
elif isinstance(exception, Errors.UnknownProducerIdError):
477+
# If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will
478+
# therefore accept a write with sequence number 0. We reset the sequence number for the partition here so
479+
# that the producer can continue after aborting the transaction. All inflight-requests to this partition
480+
# will also fail with an UnknownProducerId error, so the sequence will remain at 0. Note that if the
481+
# broker supports bumping the epoch, we will later reset all sequence numbers after calling InitProducerId
482+
self._transaction_manager.reset_sequence_for_partition(batch.topic_partition)
476483
elif isinstance(exception, (Errors.ClusterAuthorizationFailedError,
477484
Errors.TransactionalIdAuthorizationFailedError,
478485
Errors.ProducerFencedError,

kafka/producer/transaction_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,10 @@ def increment_sequence_number(self, tp, increment):
330330
else:
331331
self._sequence_numbers[tp] += increment
332332

333+
def reset_sequence_for_partition(self, tp):
334+
with self._lock:
335+
self._sequence_numbers.pop(tp, None)
336+
333337
def next_request_handler(self, has_incomplete_batches):
334338
with self._lock:
335339
if self._new_partitions_in_transaction:

0 commit comments

Comments
 (0)