Skip to content

Commit cfbfe05

Browse files
committed
KAFKA-12548; Propagate record error messages to application
1 parent 3e61aea commit cfbfe05

File tree

6 files changed

+237
-138
lines changed

6 files changed

+237
-138
lines changed

kafka/producer/future.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,35 @@ def wait(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
32+
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
3535
# packing args as a tuple is a minor speed optimization
36-
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
36+
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
3737
produce_future.add_callback(self._produce_success)
3838
produce_future.add_errback(self.failure)
3939

40-
def _produce_success(self, offset_and_timestamp):
41-
offset, produce_timestamp_ms = offset_and_timestamp
40+
def _produce_success(self, result):
41+
offset, produce_timestamp_ms, record_exceptions_fn = result
4242

4343
# Unpacking from args tuple is minor speed optimization
44-
(relative_offset, timestamp_ms, checksum,
44+
(batch_index, timestamp_ms, checksum,
4545
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
4646

47-
# None is when Broker does not support the API (<0.10) and
48-
# -1 is when the broker is configured for CREATE_TIME timestamps
49-
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
50-
timestamp_ms = produce_timestamp_ms
51-
if offset != -1 and relative_offset is not None:
52-
offset += relative_offset
53-
tp = self._produce_future.topic_partition
54-
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
55-
checksum, serialized_key_size,
56-
serialized_value_size, serialized_header_size)
57-
self.success(metadata)
47+
if record_exceptions_fn is not None:
48+
self.failure(record_exceptions_fn(batch_index))
49+
else:
50+
# None is when Broker does not support the API (<0.10) and
51+
# -1 is when the broker is configured for CREATE_TIME timestamps
52+
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
53+
timestamp_ms = produce_timestamp_ms
54+
if offset != -1 and batch_index is not None:
55+
offset += batch_index
56+
tp = self._produce_future.topic_partition
57+
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
58+
checksum, serialized_key_size,
59+
serialized_value_size, serialized_header_size)
60+
self.success(metadata)
5861

5962
def get(self, timeout=None):
6063
if not self.is_done and not self._produce_future.wait(timeout):

kafka/producer/producer_batch.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,37 @@ def abort(self, exception):
8383
self._final_state = FinalState.ABORTED
8484

8585
log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception)
86-
self._complete_future(-1, -1, exception)
86+
self._complete_future(-1, -1, lambda _: exception)
8787

88-
def done(self, base_offset=None, timestamp_ms=None, exception=None):
88+
def complete(self, base_offset, log_append_time):
89+
"""Complete the batch successfully.
90+
91+
Arguments:
92+
base_offset (int): The base offset of the messages assigned by the server
93+
log_append_time (int): The log append time or -1 if CreateTime is being used
94+
95+
Returns: True if the batch was completed as a result of this call, and False
96+
if it had been completed previously.
97+
"""
98+
return self.done(base_offset=base_offset, timestamp_ms=log_append_time)
99+
100+
def complete_exceptionally(self, top_level_exception, record_exceptions_fn):
101+
"""
102+
Complete the batch exceptionally. The provided top-level exception will be used
103+
for each record future contained in the batch.
104+
105+
Arguments:
106+
top_level_exception (Exception): top-level partition error.
107+
record_exceptions_fn (callable int -> Exception): Record exception function mapping
108+
batch_index to the respective record exception.
109+
Returns: True if the batch was completed as a result of this call, and False
110+
if it had been completed previously.
111+
"""
112+
assert isinstance(top_level_exception, Exception)
113+
assert callable(record_exceptions_fn)
114+
return self.done(top_level_exception=top_level_exception, record_exceptions_fn=record_exceptions_fn)
115+
116+
def done(self, base_offset=None, timestamp_ms=None, top_level_exception=None, record_exceptions_fn=None):
89117
"""
90118
Finalize the state of a batch. Final state, once set, is immutable. This function may be called
91119
once or twice on a batch. It may be called twice if
@@ -100,15 +128,15 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None):
100128
Attempted transitions from one failure state to the same or a different failed state are ignored.
101129
Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
102130
"""
103-
final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED
131+
final_state = FinalState.SUCCEEDED if top_level_exception is None else FinalState.FAILED
104132
if self._final_state is None:
105133
self._final_state = final_state
106134
if final_state is FinalState.SUCCEEDED:
107135
log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset)
108136
else:
109137
log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s",
110-
self.topic_partition, base_offset, exception)
111-
self._complete_future(base_offset, timestamp_ms, exception)
138+
self.topic_partition, base_offset, top_level_exception)
139+
self._complete_future(base_offset, timestamp_ms, record_exceptions_fn)
112140
return True
113141

114142
elif self._final_state is not FinalState.SUCCEEDED:
@@ -125,13 +153,10 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None):
125153
raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state))
126154
return False
127155

128-
def _complete_future(self, base_offset, timestamp_ms, exception):
156+
def _complete_future(self, base_offset, timestamp_ms, record_exceptions_fn):
129157
if self.produce_future.is_done:
130158
raise Errors.IllegalStateError('Batch is already closed!')
131-
elif exception is None:
132-
self.produce_future.success((base_offset, timestamp_ms))
133-
else:
134-
self.produce_future.failure(exception)
159+
self.produce_future.success((base_offset, timestamp_ms, record_exceptions_fn))
135160

136161
def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None):
137162
now = time.time() if now is None else now

kafka/producer/sender.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,23 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
436436
for batch in batches:
437437
self._complete_batch(batch, PartitionResponse())
438438

439+
def _record_exceptions_fn(self, top_level_exception, record_errors, error_message):
440+
"""Returns a fn mapping batch_index to exception"""
441+
# When no record_errors, all batches resolve to top-level exception
442+
if not record_errors:
443+
return lambda _: top_level_exception
444+
445+
record_errors_dict = dict(record_errors)
446+
def record_exceptions_fn(batch_index):
447+
if batch_index not in record_errors_dict:
448+
return Errors.KafkaError(
449+
"Failed to append record because it was part of a batch which had one more more invalid records")
450+
record_error = record_errors_dict[batch_index]
451+
err_msg = record_error or error_message or top_level_exception.description
452+
exc = top_level_exception.__class__ if len(record_errors) == 1 else Errors.InvalidRecordError
453+
return exc(err_msg)
454+
return record_exceptions_fn
455+
439456
def _fail_batch(self, batch, partition_response):
440457
if partition_response.error is Errors.TopicAuthorizationFailedError:
441458
exception = Errors.TopicAuthorizationFailedError(batch.topic_partition.topic)
@@ -467,7 +484,8 @@ def _fail_batch(self, batch, partition_response):
467484
if self._sensors:
468485
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
469486

470-
if batch.done(base_offset=partition_response.base_offset, timestamp_ms=partition_response.log_append_time, exception=exception):
487+
record_exceptions_fn = self._record_exceptions_fn(exception, partition_response.record_errors, partition_response.error_message)
488+
if batch.complete_exceptionally(exception, record_exceptions_fn):
471489
self._maybe_remove_from_inflight_batches(batch)
472490
self._accumulator.deallocate(batch)
473491

@@ -520,7 +538,7 @@ def _complete_batch(self, batch, partition_response):
520538
self._metadata.request_update()
521539

522540
else:
523-
if batch.done(base_offset=partition_response.base_offset, timestamp_ms=partition_response.log_append_time):
541+
if batch.complete(partition_response.base_offset, partition_response.log_append_time):
524542
self._maybe_remove_from_inflight_batches(batch)
525543
self._accumulator.deallocate(batch)
526544

test/test_producer_batch.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# pylint: skip-file
2+
from __future__ import absolute_import, division
3+
4+
import pytest
5+
6+
from kafka.errors import IllegalStateError, KafkaError
7+
from kafka.producer.future import FutureRecordMetadata, RecordMetadata
8+
from kafka.producer.producer_batch import ProducerBatch
9+
from kafka.record.memory_records import MemoryRecordsBuilder
10+
from kafka.structs import TopicPartition
11+
12+
13+
@pytest.fixture
14+
def tp():
15+
return TopicPartition('foo', 0)
16+
17+
18+
@pytest.fixture
19+
def memory_records_builder():
20+
return MemoryRecordsBuilder(magic=2, compression_type=0, batch_size=100000)
21+
22+
23+
@pytest.fixture
24+
def batch(tp, memory_records_builder):
25+
return ProducerBatch(tp, memory_records_builder)
26+
27+
28+
def test_producer_batch_producer_id(tp, memory_records_builder):
29+
batch = ProducerBatch(tp, memory_records_builder)
30+
assert batch.producer_id == -1
31+
batch.records.set_producer_state(123, 456, 789, False)
32+
assert batch.producer_id == 123
33+
memory_records_builder.close()
34+
assert batch.producer_id == 123
35+
36+
37+
@pytest.mark.parametrize("magic", [0, 1, 2])
38+
def test_producer_batch_try_append(magic):
39+
tp = TopicPartition('foo', 0)
40+
records = MemoryRecordsBuilder(
41+
magic=magic, compression_type=0, batch_size=100000)
42+
batch = ProducerBatch(tp, records)
43+
assert batch.record_count == 0
44+
future = batch.try_append(0, b'key', b'value', [])
45+
assert isinstance(future, FutureRecordMetadata)
46+
assert not future.is_done
47+
batch.complete(123, 456)
48+
assert future.is_done
49+
# record-level checksum only provided in v0/v1 formats; payload includes magic-byte
50+
if magic == 0:
51+
checksum = 592888119
52+
elif magic == 1:
53+
checksum = 213653215
54+
else:
55+
checksum = None
56+
57+
expected_metadata = RecordMetadata(
58+
topic=tp[0], partition=tp[1], topic_partition=tp,
59+
offset=123, timestamp=456, checksum=checksum,
60+
serialized_key_size=3, serialized_value_size=5, serialized_header_size=-1)
61+
assert future.value == expected_metadata
62+
63+
64+
def test_producer_batch_retry(batch):
65+
assert not batch.in_retry()
66+
batch.retry()
67+
assert batch.in_retry()
68+
69+
70+
def test_batch_abort(batch):
71+
future = batch.try_append(123, None, b'msg', [])
72+
batch.abort(KafkaError())
73+
assert future.is_done
74+
75+
# subsequent completion should be ignored
76+
assert not batch.complete(500, 2342342341)
77+
assert not batch.complete_exceptionally(KafkaError('top_level'), lambda _: KafkaError('record'))
78+
79+
assert future.is_done
80+
with pytest.raises(KafkaError):
81+
future.get()
82+
83+
84+
def test_batch_cannot_abort_twice(batch):
85+
future = batch.try_append(123, None, b'msg', [])
86+
batch.abort(KafkaError())
87+
with pytest.raises(IllegalStateError):
88+
batch.abort(KafkaError())
89+
assert future.is_done
90+
with pytest.raises(KafkaError):
91+
future.get()
92+
93+
94+
def test_batch_cannot_complete_twice(batch):
95+
future = batch.try_append(123, None, b'msg', [])
96+
batch.complete(500, 10)
97+
with pytest.raises(IllegalStateError):
98+
batch.complete(1000, 20)
99+
record_metadata = future.get()
100+
assert record_metadata.offset == 500
101+
assert record_metadata.timestamp == 10
102+
103+
104+
def _test_complete_exceptionally(batch, record_count, top_level_exception, record_exceptions_fn):
105+
futures = []
106+
for i in range(record_count):
107+
futures.append(batch.try_append(0, b'key', b'value', []))
108+
109+
assert record_count == batch.record_count
110+
111+
batch.complete_exceptionally(top_level_exception, record_exceptions_fn)
112+
assert batch.is_done
113+
114+
for i, future in enumerate(futures):
115+
assert future.is_done
116+
assert future.failed()
117+
assert isinstance(future.exception, RuntimeError)
118+
assert record_exceptions_fn(i) == future.exception
119+
120+
121+
def test_complete_exceptionally_with_record_errors(batch):
122+
record_count = 5
123+
top_level_exception = RuntimeError()
124+
125+
record_exceptions_map = {0: RuntimeError(), 3: RuntimeError()}
126+
record_exceptions_fn = lambda i: record_exceptions_map.get(i, top_level_exception)
127+
128+
_test_complete_exceptionally(batch, record_count, top_level_exception, record_exceptions_fn)
129+
130+
131+
def test_complete_exceptionally_with_null_record_errors(batch):
132+
record_count = 5
133+
top_level_exception = RuntimeError()
134+
135+
with pytest.raises(AssertionError):
136+
_test_complete_exceptionally(batch, record_count, top_level_exception, None)

0 commit comments

Comments
 (0)