Skip to content

Commit 9227674

Browse files
authored
KIP-467: Augment ProduceResponse error messaging for specific culprit records (#2661)
1 parent 0a87130 commit 9227674

File tree

9 files changed

+456
-326
lines changed

9 files changed

+456
-326
lines changed

kafka/errors.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ def __str__(self):
1515
return '{0}: {1}'.format(self.__class__.__name__,
1616
super(KafkaError, self).__str__())
1717

18+
def __eq__(self, other):
19+
return self.__class__ == other.__class__ and self.args == other.args
20+
1821

1922
class Cancelled(KafkaError):
2023
retriable = True

kafka/producer/future.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,35 @@ def wait(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
32+
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
3535
# packing args as a tuple is a minor speed optimization
36-
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
36+
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
3737
produce_future.add_callback(self._produce_success)
3838
produce_future.add_errback(self.failure)
3939

40-
def _produce_success(self, offset_and_timestamp):
41-
offset, produce_timestamp_ms = offset_and_timestamp
40+
def _produce_success(self, result):
41+
offset, produce_timestamp_ms, record_exceptions_fn = result
4242

4343
# Unpacking from args tuple is minor speed optimization
44-
(relative_offset, timestamp_ms, checksum,
44+
(batch_index, timestamp_ms, checksum,
4545
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
4646

47-
# None is when Broker does not support the API (<0.10) and
48-
# -1 is when the broker is configured for CREATE_TIME timestamps
49-
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
50-
timestamp_ms = produce_timestamp_ms
51-
if offset != -1 and relative_offset is not None:
52-
offset += relative_offset
53-
tp = self._produce_future.topic_partition
54-
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
55-
checksum, serialized_key_size,
56-
serialized_value_size, serialized_header_size)
57-
self.success(metadata)
47+
if record_exceptions_fn is not None:
48+
self.failure(record_exceptions_fn(batch_index))
49+
else:
50+
# None is when Broker does not support the API (<0.10) and
51+
# -1 is when the broker is configured for CREATE_TIME timestamps
52+
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
53+
timestamp_ms = produce_timestamp_ms
54+
if offset != -1 and batch_index is not None:
55+
offset += batch_index
56+
tp = self._produce_future.topic_partition
57+
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
58+
checksum, serialized_key_size,
59+
serialized_value_size, serialized_header_size)
60+
self.success(metadata)
5861

5962
def get(self, timeout=None):
6063
if not self.is_done and not self._produce_future.wait(timeout):

kafka/producer/producer_batch.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
from __future__ import absolute_import, division
2+
3+
import logging
4+
import time
5+
6+
try:
7+
# enum in stdlib as of py3.4
8+
from enum import IntEnum # pylint: disable=import-error
9+
except ImportError:
10+
# vendored backport module
11+
from kafka.vendor.enum34 import IntEnum
12+
13+
import kafka.errors as Errors
14+
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
15+
16+
17+
log = logging.getLogger(__name__)
18+
19+
20+
class FinalState(IntEnum):
21+
ABORTED = 0
22+
FAILED = 1
23+
SUCCEEDED = 2
24+
25+
26+
class ProducerBatch(object):
27+
def __init__(self, tp, records, now=None):
28+
now = time.time() if now is None else now
29+
self.max_record_size = 0
30+
self.created = now
31+
self.drained = None
32+
self.attempts = 0
33+
self.last_attempt = now
34+
self.last_append = now
35+
self.records = records
36+
self.topic_partition = tp
37+
self.produce_future = FutureProduceResult(tp)
38+
self._retry = False
39+
self._final_state = None
40+
41+
@property
42+
def final_state(self):
43+
return self._final_state
44+
45+
@property
46+
def record_count(self):
47+
return self.records.next_offset()
48+
49+
@property
50+
def producer_id(self):
51+
return self.records.producer_id if self.records else None
52+
53+
@property
54+
def producer_epoch(self):
55+
return self.records.producer_epoch if self.records else None
56+
57+
@property
58+
def has_sequence(self):
59+
return self.records.has_sequence if self.records else False
60+
61+
def try_append(self, timestamp_ms, key, value, headers, now=None):
62+
metadata = self.records.append(timestamp_ms, key, value, headers)
63+
if metadata is None:
64+
return None
65+
66+
now = time.time() if now is None else now
67+
self.max_record_size = max(self.max_record_size, metadata.size)
68+
self.last_append = now
69+
future = FutureRecordMetadata(
70+
self.produce_future,
71+
metadata.offset,
72+
metadata.timestamp,
73+
metadata.crc,
74+
len(key) if key is not None else -1,
75+
len(value) if value is not None else -1,
76+
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
77+
return future
78+
79+
def abort(self, exception):
80+
"""Abort the batch and complete the future and callbacks."""
81+
if self._final_state is not None:
82+
raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state)
83+
self._final_state = FinalState.ABORTED
84+
85+
log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception)
86+
self._complete_future(-1, -1, lambda _: exception)
87+
88+
def complete(self, base_offset, log_append_time):
89+
"""Complete the batch successfully.
90+
91+
Arguments:
92+
base_offset (int): The base offset of the messages assigned by the server
93+
log_append_time (int): The log append time or -1 if CreateTime is being used
94+
95+
Returns: True if the batch was completed as a result of this call, and False
96+
if it had been completed previously.
97+
"""
98+
return self.done(base_offset=base_offset, timestamp_ms=log_append_time)
99+
100+
def complete_exceptionally(self, top_level_exception, record_exceptions_fn):
101+
"""
102+
Complete the batch exceptionally. The provided top-level exception will be used
103+
for each record future contained in the batch.
104+
105+
Arguments:
106+
top_level_exception (Exception): top-level partition error.
107+
record_exceptions_fn (callable int -> Exception): Record exception function mapping
108+
batch_index to the respective record exception.
109+
Returns: True if the batch was completed as a result of this call, and False
110+
if it had been completed previously.
111+
"""
112+
assert isinstance(top_level_exception, Exception)
113+
assert callable(record_exceptions_fn)
114+
return self.done(top_level_exception=top_level_exception, record_exceptions_fn=record_exceptions_fn)
115+
116+
def done(self, base_offset=None, timestamp_ms=None, top_level_exception=None, record_exceptions_fn=None):
117+
"""
118+
Finalize the state of a batch. Final state, once set, is immutable. This function may be called
119+
once or twice on a batch. It may be called twice if
120+
1. An inflight batch expires before a response from the broker is received. The batch's final
121+
state is set to FAILED. But it could succeed on the broker and second time around batch.done() may
122+
try to set SUCCEEDED final state.
123+
124+
2. If a transaction abortion happens or if the producer is closed forcefully, the final state is
125+
ABORTED but again it could succeed if broker responds with a success.
126+
127+
Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
128+
Attempted transitions from one failure state to the same or a different failed state are ignored.
129+
Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
130+
"""
131+
final_state = FinalState.SUCCEEDED if top_level_exception is None else FinalState.FAILED
132+
if self._final_state is None:
133+
self._final_state = final_state
134+
if final_state is FinalState.SUCCEEDED:
135+
log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset)
136+
else:
137+
log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s",
138+
self.topic_partition, base_offset, top_level_exception)
139+
self._complete_future(base_offset, timestamp_ms, record_exceptions_fn)
140+
return True
141+
142+
elif self._final_state is not FinalState.SUCCEEDED:
143+
if final_state is FinalState.SUCCEEDED:
144+
# Log if a previously unsuccessful batch succeeded later on.
145+
log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.",
146+
final_state, self.topic_partition, base_offset, self._final_state)
147+
else:
148+
# FAILED --> FAILED and ABORTED --> FAILED transitions are ignored.
149+
log.debug("Ignored state transition %s -> %s for %s batch with base offset %s",
150+
self._final_state, final_state, self.topic_partition, base_offset)
151+
else:
152+
# A SUCCESSFUL batch must not attempt another state change.
153+
raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state))
154+
return False
155+
156+
def _complete_future(self, base_offset, timestamp_ms, record_exceptions_fn):
157+
if self.produce_future.is_done:
158+
raise Errors.IllegalStateError('Batch is already closed!')
159+
self.produce_future.success((base_offset, timestamp_ms, record_exceptions_fn))
160+
161+
def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None):
162+
now = time.time() if now is None else now
163+
return delivery_timeout_ms / 1000 <= now - self.created
164+
165+
def in_retry(self):
166+
return self._retry
167+
168+
def retry(self, now=None):
169+
now = time.time() if now is None else now
170+
self._retry = True
171+
self.attempts += 1
172+
self.last_attempt = now
173+
self.last_append = now
174+
175+
@property
176+
def is_done(self):
177+
return self.produce_future.is_done
178+
179+
def __str__(self):
180+
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
181+
self.topic_partition, self.records.next_offset())
182+
183+
184+

0 commit comments

Comments
 (0)