Skip to content

Commit 3e61aea

Browse files
committed
Move ProducerBatch to separate file
1 parent b7f9af2 commit 3e61aea

File tree

4 files changed

+164
-149
lines changed

4 files changed

+164
-149
lines changed

kafka/producer/producer_batch.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from __future__ import absolute_import, division
2+
3+
import logging
4+
import time
5+
6+
try:
7+
# enum in stdlib as of py3.4
8+
from enum import IntEnum # pylint: disable=import-error
9+
except ImportError:
10+
# vendored backport module
11+
from kafka.vendor.enum34 import IntEnum
12+
13+
import kafka.errors as Errors
14+
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
15+
16+
17+
log = logging.getLogger(__name__)
18+
19+
20+
class FinalState(IntEnum):
21+
ABORTED = 0
22+
FAILED = 1
23+
SUCCEEDED = 2
24+
25+
26+
class ProducerBatch(object):
27+
def __init__(self, tp, records, now=None):
28+
now = time.time() if now is None else now
29+
self.max_record_size = 0
30+
self.created = now
31+
self.drained = None
32+
self.attempts = 0
33+
self.last_attempt = now
34+
self.last_append = now
35+
self.records = records
36+
self.topic_partition = tp
37+
self.produce_future = FutureProduceResult(tp)
38+
self._retry = False
39+
self._final_state = None
40+
41+
@property
42+
def final_state(self):
43+
return self._final_state
44+
45+
@property
46+
def record_count(self):
47+
return self.records.next_offset()
48+
49+
@property
50+
def producer_id(self):
51+
return self.records.producer_id if self.records else None
52+
53+
@property
54+
def producer_epoch(self):
55+
return self.records.producer_epoch if self.records else None
56+
57+
@property
58+
def has_sequence(self):
59+
return self.records.has_sequence if self.records else False
60+
61+
def try_append(self, timestamp_ms, key, value, headers, now=None):
62+
metadata = self.records.append(timestamp_ms, key, value, headers)
63+
if metadata is None:
64+
return None
65+
66+
now = time.time() if now is None else now
67+
self.max_record_size = max(self.max_record_size, metadata.size)
68+
self.last_append = now
69+
future = FutureRecordMetadata(
70+
self.produce_future,
71+
metadata.offset,
72+
metadata.timestamp,
73+
metadata.crc,
74+
len(key) if key is not None else -1,
75+
len(value) if value is not None else -1,
76+
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
77+
return future
78+
79+
def abort(self, exception):
80+
"""Abort the batch and complete the future and callbacks."""
81+
if self._final_state is not None:
82+
raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state)
83+
self._final_state = FinalState.ABORTED
84+
85+
log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception)
86+
self._complete_future(-1, -1, exception)
87+
88+
def done(self, base_offset=None, timestamp_ms=None, exception=None):
89+
"""
90+
Finalize the state of a batch. Final state, once set, is immutable. This function may be called
91+
once or twice on a batch. It may be called twice if
92+
1. An inflight batch expires before a response from the broker is received. The batch's final
93+
state is set to FAILED. But it could succeed on the broker and second time around batch.done() may
94+
try to set SUCCEEDED final state.
95+
96+
2. If a transaction abortion happens or if the producer is closed forcefully, the final state is
97+
ABORTED but again it could succeed if broker responds with a success.
98+
99+
Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
100+
Attempted transitions from one failure state to the same or a different failed state are ignored.
101+
Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
102+
"""
103+
final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED
104+
if self._final_state is None:
105+
self._final_state = final_state
106+
if final_state is FinalState.SUCCEEDED:
107+
log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset)
108+
else:
109+
log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s",
110+
self.topic_partition, base_offset, exception)
111+
self._complete_future(base_offset, timestamp_ms, exception)
112+
return True
113+
114+
elif self._final_state is not FinalState.SUCCEEDED:
115+
if final_state is FinalState.SUCCEEDED:
116+
# Log if a previously unsuccessful batch succeeded later on.
117+
log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.",
118+
final_state, self.topic_partition, base_offset, self._final_state)
119+
else:
120+
# FAILED --> FAILED and ABORTED --> FAILED transitions are ignored.
121+
log.debug("Ignored state transition %s -> %s for %s batch with base offset %s",
122+
self._final_state, final_state, self.topic_partition, base_offset)
123+
else:
124+
# A SUCCESSFUL batch must not attempt another state change.
125+
raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state))
126+
return False
127+
128+
def _complete_future(self, base_offset, timestamp_ms, exception):
129+
if self.produce_future.is_done:
130+
raise Errors.IllegalStateError('Batch is already closed!')
131+
elif exception is None:
132+
self.produce_future.success((base_offset, timestamp_ms))
133+
else:
134+
self.produce_future.failure(exception)
135+
136+
def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None):
137+
now = time.time() if now is None else now
138+
return delivery_timeout_ms / 1000 <= now - self.created
139+
140+
def in_retry(self):
141+
return self._retry
142+
143+
def retry(self, now=None):
144+
now = time.time() if now is None else now
145+
self._retry = True
146+
self.attempts += 1
147+
self.last_attempt = now
148+
self.last_append = now
149+
150+
@property
151+
def is_done(self):
152+
return self.produce_future.is_done
153+
154+
def __str__(self):
155+
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
156+
self.topic_partition, self.records.next_offset())
157+
158+
159+

kafka/producer/record_accumulator.py

Lines changed: 1 addition & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,8 @@
66
import threading
77
import time
88

9-
try:
10-
# enum in stdlib as of py3.4
11-
from enum import IntEnum # pylint: disable=import-error
12-
except ImportError:
13-
# vendored backport module
14-
from kafka.vendor.enum34 import IntEnum
15-
169
import kafka.errors as Errors
17-
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
10+
from kafka.producer.producer_batch import ProducerBatch
1811
from kafka.record.memory_records import MemoryRecordsBuilder
1912
from kafka.structs import TopicPartition
2013

@@ -41,145 +34,6 @@ def get(self):
4134
return self._val
4235

4336

44-
class FinalState(IntEnum):
45-
ABORTED = 0
46-
FAILED = 1
47-
SUCCEEDED = 2
48-
49-
50-
class ProducerBatch(object):
51-
def __init__(self, tp, records, now=None):
52-
now = time.time() if now is None else now
53-
self.max_record_size = 0
54-
self.created = now
55-
self.drained = None
56-
self.attempts = 0
57-
self.last_attempt = now
58-
self.last_append = now
59-
self.records = records
60-
self.topic_partition = tp
61-
self.produce_future = FutureProduceResult(tp)
62-
self._retry = False
63-
self._final_state = None
64-
65-
@property
66-
def final_state(self):
67-
return self._final_state
68-
69-
@property
70-
def record_count(self):
71-
return self.records.next_offset()
72-
73-
@property
74-
def producer_id(self):
75-
return self.records.producer_id if self.records else None
76-
77-
@property
78-
def producer_epoch(self):
79-
return self.records.producer_epoch if self.records else None
80-
81-
@property
82-
def has_sequence(self):
83-
return self.records.has_sequence if self.records else False
84-
85-
def try_append(self, timestamp_ms, key, value, headers, now=None):
86-
metadata = self.records.append(timestamp_ms, key, value, headers)
87-
if metadata is None:
88-
return None
89-
90-
now = time.time() if now is None else now
91-
self.max_record_size = max(self.max_record_size, metadata.size)
92-
self.last_append = now
93-
future = FutureRecordMetadata(
94-
self.produce_future,
95-
metadata.offset,
96-
metadata.timestamp,
97-
metadata.crc,
98-
len(key) if key is not None else -1,
99-
len(value) if value is not None else -1,
100-
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
101-
return future
102-
103-
def abort(self, exception):
104-
"""Abort the batch and complete the future and callbacks."""
105-
if self._final_state is not None:
106-
raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state)
107-
self._final_state = FinalState.ABORTED
108-
109-
log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception)
110-
self._complete_future(-1, -1, exception)
111-
112-
def done(self, base_offset=None, timestamp_ms=None, exception=None):
113-
"""
114-
Finalize the state of a batch. Final state, once set, is immutable. This function may be called
115-
once or twice on a batch. It may be called twice if
116-
1. An inflight batch expires before a response from the broker is received. The batch's final
117-
state is set to FAILED. But it could succeed on the broker and second time around batch.done() may
118-
try to set SUCCEEDED final state.
119-
120-
2. If a transaction abortion happens or if the producer is closed forcefully, the final state is
121-
ABORTED but again it could succeed if broker responds with a success.
122-
123-
Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged.
124-
Attempted transitions from one failure state to the same or a different failed state are ignored.
125-
Attempted transitions from SUCCEEDED to the same or a failed state throw an exception.
126-
"""
127-
final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED
128-
if self._final_state is None:
129-
self._final_state = final_state
130-
if final_state is FinalState.SUCCEEDED:
131-
log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset)
132-
else:
133-
log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s",
134-
self.topic_partition, base_offset, exception)
135-
self._complete_future(base_offset, timestamp_ms, exception)
136-
return True
137-
138-
elif self._final_state is not FinalState.SUCCEEDED:
139-
if final_state is FinalState.SUCCEEDED:
140-
# Log if a previously unsuccessful batch succeeded later on.
141-
log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.",
142-
final_state, self.topic_partition, base_offset, self._final_state)
143-
else:
144-
# FAILED --> FAILED and ABORTED --> FAILED transitions are ignored.
145-
log.debug("Ignored state transition %s -> %s for %s batch with base offset %s",
146-
self._final_state, final_state, self.topic_partition, base_offset)
147-
else:
148-
# A SUCCESSFUL batch must not attempt another state change.
149-
raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state))
150-
return False
151-
152-
def _complete_future(self, base_offset, timestamp_ms, exception):
153-
if self.produce_future.is_done:
154-
raise Errors.IllegalStateError('Batch is already closed!')
155-
elif exception is None:
156-
self.produce_future.success((base_offset, timestamp_ms))
157-
else:
158-
self.produce_future.failure(exception)
159-
160-
def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None):
161-
now = time.time() if now is None else now
162-
return delivery_timeout_ms / 1000 <= now - self.created
163-
164-
def in_retry(self):
165-
return self._retry
166-
167-
def retry(self, now=None):
168-
now = time.time() if now is None else now
169-
self._retry = True
170-
self.attempts += 1
171-
self.last_attempt = now
172-
self.last_append = now
173-
174-
@property
175-
def is_done(self):
176-
return self.produce_future.is_done
177-
178-
def __str__(self):
179-
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
180-
self.topic_partition, self.records.next_offset())
181-
182-
18337
class RecordAccumulator(object):
18438
"""
18539
This class maintains a dequeue per TopicPartition that accumulates messages

test/test_record_accumulator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
from kafka.cluster import ClusterMetadata
77
from kafka.errors import IllegalStateError, KafkaError
88
from kafka.producer.future import FutureRecordMetadata, RecordMetadata
9-
from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
9+
from kafka.producer.producer_batch import ProducerBatch
10+
from kafka.producer.record_accumulator import RecordAccumulator
1011
from kafka.record.default_records import DefaultRecordBatchBuilder
1112
from kafka.record.memory_records import MemoryRecordsBuilder
1213
from kafka.structs import TopicPartition

test/test_sender.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2020
from kafka.producer.kafka import KafkaProducer
2121
from kafka.protocol.produce import ProduceRequest
22-
from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
22+
from kafka.producer.producer_batch import ProducerBatch
23+
from kafka.producer.record_accumulator import RecordAccumulator
2324
from kafka.producer.sender import PartitionResponse, Sender
2425
from kafka.producer.transaction_manager import TransactionManager
2526
from kafka.record.memory_records import MemoryRecordsBuilder

0 commit comments

Comments
 (0)