Skip to content

Commit cfd9f86

Browse files
committed
Merge pull request dpkp#59 from mrtheb/master
flake8 (pep8 and pyflakes) clean-up
2 parents b0cacc9 + 59af614 commit cfd9f86

File tree

7 files changed

+100
-82
lines changed

7 files changed

+100
-82
lines changed

kafka/client.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import base64
21
from collections import defaultdict
32
from functools import partial
4-
from itertools import count, cycle
3+
from itertools import count
54
import logging
6-
from operator import attrgetter
7-
import struct
85
import time
9-
import zlib
106

11-
from kafka.common import *
7+
from kafka.common import ErrorMapping, TopicAndPartition
128
from kafka.conn import KafkaConnection
139
from kafka.protocol import KafkaProtocol
1410

@@ -212,8 +208,10 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
212208
order of input payloads
213209
"""
214210

215-
encoder = partial(KafkaProtocol.encode_produce_request,
216-
acks=acks, timeout=timeout)
211+
encoder = partial(
212+
KafkaProtocol.encode_produce_request,
213+
acks=acks,
214+
timeout=timeout)
217215

218216
if acks == 0:
219217
decoder = None
@@ -226,10 +224,10 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
226224
for resp in resps:
227225
# Check for errors
228226
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
229-
raise Exception("ProduceRequest for %s failed with "
230-
"errorcode=%d" % (
231-
TopicAndPartition(resp.topic, resp.partition),
232-
resp.error))
227+
raise Exception(
228+
"ProduceRequest for %s failed with errorcode=%d" %
229+
(TopicAndPartition(resp.topic, resp.partition),
230+
resp.error))
233231

234232
# Run the callback
235233
if callback is not None:
@@ -251,17 +249,18 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
251249
max_wait_time=max_wait_time,
252250
min_bytes=min_bytes)
253251

254-
resps = self._send_broker_aware_request(payloads, encoder,
255-
KafkaProtocol.decode_fetch_response)
252+
resps = self._send_broker_aware_request(
253+
payloads, encoder,
254+
KafkaProtocol.decode_fetch_response)
256255

257256
out = []
258257
for resp in resps:
259258
# Check for errors
260259
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
261-
raise Exception("FetchRequest for %s failed with "
262-
"errorcode=%d" % (
263-
TopicAndPartition(resp.topic, resp.partition),
264-
resp.error))
260+
raise Exception(
261+
"FetchRequest for %s failed with errorcode=%d" %
262+
(TopicAndPartition(resp.topic, resp.partition),
263+
resp.error))
265264

266265
# Run the callback
267266
if callback is not None:
@@ -272,9 +271,10 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
272271

273272
def send_offset_request(self, payloads=[], fail_on_error=True,
274273
callback=None):
275-
resps = self._send_broker_aware_request(payloads,
276-
KafkaProtocol.encode_offset_request,
277-
KafkaProtocol.decode_offset_response)
274+
resps = self._send_broker_aware_request(
275+
payloads,
276+
KafkaProtocol.encode_offset_request,
277+
KafkaProtocol.decode_offset_response)
278278

279279
out = []
280280
for resp in resps:

kafka/conn.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import struct
44
from threading import local
55

6+
from kafka.common import BufferUnderflowError
7+
68

79
log = logging.getLogger("kafka")
810

@@ -12,7 +14,7 @@ class KafkaConnection(local):
1214
A socket connection to a single Kafka broker
1315
1416
This class is _not_ thread safe. Each call to `send` must be followed
15-
by a call to `recv` in order to get the correct response. Eventually,
17+
by a call to `recv` in order to get the correct response. Eventually,
1618
we can do something in here to facilitate multiplexed requests/responses
1719
since the Kafka API includes a correlation id.
1820
"""
@@ -43,7 +45,7 @@ def _consume_response(self):
4345

4446
def _consume_response_iter(self):
4547
"""
46-
This method handles the response header and error messages. It
48+
This method handles the response header and error messages. It
4749
then returns an iterator for the chunks of the response
4850
"""
4951
log.debug("Handling response from Kafka")
@@ -57,13 +59,15 @@ def _consume_response_iter(self):
5759
messagesize = size - 4
5860
log.debug("About to read %d bytes from Kafka", messagesize)
5961

60-
# Read the remainder of the response
62+
# Read the remainder of the response
6163
total = 0
6264
while total < messagesize:
6365
resp = self._sock.recv(self.bufsize)
6466
log.debug("Read %d bytes from Kafka", len(resp))
6567
if resp == "":
66-
raise BufferUnderflowError("Not enough data to read this response")
68+
raise BufferUnderflowError(
69+
"Not enough data to read this response")
70+
6771
total += len(resp)
6872
yield resp
6973

@@ -75,9 +79,13 @@ def _consume_response_iter(self):
7579

7680
def send(self, request_id, payload):
7781
"Send a request to Kafka"
78-
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
82+
83+
log.debug(
84+
"About to send %d bytes to Kafka, request %d" %
85+
(len(payload), request_id))
86+
7987
sent = self._sock.sendall(payload)
80-
if sent != None:
88+
if sent is not None:
8189
raise RuntimeError("Kafka went away")
8290

8391
def recv(self, request_id):

kafka/consumer.py

+30-23
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from kafka.common import (
1010
ErrorMapping, FetchRequest,
11-
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest,
11+
OffsetRequest, OffsetCommitRequest,
1212
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
1313
)
1414

@@ -223,11 +223,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
223223
self.fetch_min_bytes = fetch_size_bytes
224224
self.fetch_started = defaultdict(bool) # defaults to false
225225

226-
super(SimpleConsumer, self).__init__(client, group, topic,
227-
partitions=partitions,
228-
auto_commit=auto_commit,
229-
auto_commit_every_n=auto_commit_every_n,
230-
auto_commit_every_t=auto_commit_every_t)
226+
super(SimpleConsumer, self).__init__(
227+
client, group, topic,
228+
partitions=partitions,
229+
auto_commit=auto_commit,
230+
auto_commit_every_n=auto_commit_every_n,
231+
auto_commit_every_t=auto_commit_every_t)
231232

232233
def provide_partition_info(self):
233234
"""
@@ -275,8 +276,8 @@ def seek(self, offset, whence):
275276

276277
resps = self.client.send_offset_request(reqs)
277278
for resp in resps:
278-
self.offsets[resp.partition] = resp.offsets[0] + \
279-
deltas[resp.partition]
279+
self.offsets[resp.partition] = \
280+
resp.offsets[0] + deltas[resp.partition]
280281
else:
281282
raise ValueError("Unexpected value for `whence`, %d" % whence)
282283

@@ -364,9 +365,10 @@ def __iter_partition__(self, partition, offset):
364365
req = FetchRequest(
365366
self.topic, partition, offset, self.client.bufsize)
366367

367-
(resp,) = self.client.send_fetch_request([req],
368-
max_wait_time=self.fetch_max_wait_time,
369-
min_bytes=fetch_size)
368+
(resp,) = self.client.send_fetch_request(
369+
[req],
370+
max_wait_time=self.fetch_max_wait_time,
371+
min_bytes=fetch_size)
370372

371373
assert resp.topic == self.topic
372374
assert resp.partition == partition
@@ -376,18 +378,22 @@ def __iter_partition__(self, partition, offset):
376378
for message in resp.messages:
377379
next_offset = message.offset
378380

379-
# update the offset before the message is yielded. This is
380-
# so that the consumer state is not lost in certain cases.
381-
# For eg: the message is yielded and consumed by the caller,
382-
# but the caller does not come back into the generator again.
383-
# The message will be consumed but the status will not be
384-
# updated in the consumer
381+
# update the offset before the message is yielded. This
382+
# is so that the consumer state is not lost in certain
383+
# cases.
384+
#
385+
# For eg: the message is yielded and consumed by the
386+
# caller, but the caller does not come back into the
387+
# generator again. The message will be consumed but the
388+
# status will not be updated in the consumer
385389
self.fetch_started[partition] = True
386390
self.offsets[partition] = message.offset
387391
yield message
388392
except ConsumerFetchSizeTooSmall, e:
389-
log.warn("Fetch size is too small, increasing by 1.5x and retrying")
390393
fetch_size *= 1.5
394+
log.warn(
395+
"Fetch size too small, increasing to %d (1.5x) and retry",
396+
fetch_size)
391397
continue
392398
except ConsumerNoMoreData, e:
393399
log.debug("Iteration was ended by %r", e)
@@ -429,11 +435,12 @@ def __init__(self, client, group, topic, auto_commit=True,
429435
num_procs=1, partitions_per_proc=0):
430436

431437
# Initiate the base consumer class
432-
super(MultiProcessConsumer, self).__init__(client, group, topic,
433-
partitions=None,
434-
auto_commit=auto_commit,
435-
auto_commit_every_n=auto_commit_every_n,
436-
auto_commit_every_t=auto_commit_every_t)
438+
super(MultiProcessConsumer, self).__init__(
439+
client, group, topic,
440+
partitions=None,
441+
auto_commit=auto_commit,
442+
auto_commit_every_n=auto_commit_every_n,
443+
auto_commit_every_t=auto_commit_every_t)
437444

438445
# Variables for managing and controlling the data flow from
439446
# consumer child process to master

kafka/protocol.py

+19-19
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ class KafkaProtocol(object):
2525
This class does not have any state associated with it, it is purely
2626
for organization.
2727
"""
28-
PRODUCE_KEY = 0
29-
FETCH_KEY = 1
30-
OFFSET_KEY = 2
31-
METADATA_KEY = 3
28+
PRODUCE_KEY = 0
29+
FETCH_KEY = 1
30+
OFFSET_KEY = 2
31+
METADATA_KEY = 3
3232
OFFSET_COMMIT_KEY = 6
33-
OFFSET_FETCH_KEY = 7
33+
OFFSET_FETCH_KEY = 7
3434

3535
ATTRIBUTE_CODEC_MASK = 0x03
3636
CODEC_NONE = 0x00
@@ -120,8 +120,8 @@ def _decode_message_set_iter(cls, data):
120120
yield OffsetAndMessage(offset, message)
121121
except BufferUnderflowError:
122122
if read_message is False:
123-
# If we get a partial read of a message, but haven't yielded anyhting
124-
# there's a problem
123+
# If we get a partial read of a message, but haven't
124+
# yielded anyhting there's a problem
125125
raise ConsumerFetchSizeTooSmall()
126126
else:
127127
raise StopIteration()
@@ -274,14 +274,14 @@ def decode_fetch_response(cls, data):
274274

275275
for i in range(num_partitions):
276276
((partition, error, highwater_mark_offset), cur) = \
277-
relative_unpack('>ihq', data, cur)
277+
relative_unpack('>ihq', data, cur)
278278

279279
(message_set, cur) = read_int_string(data, cur)
280280

281281
yield FetchResponse(
282-
topic, partition, error,
283-
highwater_mark_offset,
284-
KafkaProtocol._decode_message_set_iter(message_set))
282+
topic, partition, error,
283+
highwater_mark_offset,
284+
KafkaProtocol._decode_message_set_iter(message_set))
285285

286286
@classmethod
287287
def encode_offset_request(cls, client_id, correlation_id, payloads=None):
@@ -321,7 +321,7 @@ def decode_offset_response(cls, data):
321321

322322
for i in range(num_partitions):
323323
((partition, error, num_offsets,), cur) = \
324-
relative_unpack('>ihi', data, cur)
324+
relative_unpack('>ihi', data, cur)
325325

326326
offsets = []
327327
for j in range(num_offsets):
@@ -383,17 +383,17 @@ def decode_metadata_response(cls, data):
383383

384384
for j in range(num_partitions):
385385
((partition_error_code, partition, leader, numReplicas), cur) = \
386-
relative_unpack('>hiii', data, cur)
386+
relative_unpack('>hiii', data, cur)
387387

388-
(replicas, cur) = relative_unpack('>%di' % numReplicas,
389-
data, cur)
388+
(replicas, cur) = relative_unpack(
389+
'>%di' % numReplicas, data, cur)
390390

391391
((num_isr,), cur) = relative_unpack('>i', data, cur)
392392
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
393393

394394
partition_metadata[partition] = \
395-
PartitionMetadata(topic_name, partition, leader,
396-
replicas, isr)
395+
PartitionMetadata(
396+
topic_name, partition, leader, replicas, isr)
397397

398398
topic_metadata[topic_name] = partition_metadata
399399

@@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None):
531531
key: bytes, a key used for partition routing (optional)
532532
"""
533533
message_set = KafkaProtocol._encode_message_set(
534-
[create_message(payload) for payload in payloads])
534+
[create_message(payload) for payload in payloads])
535535

536536
gzipped = gzip_encode(message_set)
537537
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
@@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None):
552552
key: bytes, a key used for partition routing (optional)
553553
"""
554554
message_set = KafkaProtocol._encode_message_set(
555-
[create_message(payload) for payload in payloads])
555+
[create_message(payload) for payload in payloads])
556556

557557
snapped = snappy_encode(message_set)
558558
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY

kafka/queue.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ def __init__(self, client, topic, partition, out_queue, barrier,
2525
Process.__init__(self)
2626

2727
def __str__(self):
28-
return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \
29-
(self.topic, self.partition, self.consumer_sleep)
28+
return "[KafkaConsumerProcess: topic=%s, \
29+
partition=%s, sleep=%s]" % \
30+
(self.topic, self.partition, self.consumer_sleep)
3031

3132
def run(self):
3233
self.barrier.wait()
@@ -70,10 +71,12 @@ def __init__(self, client, topic, in_queue, barrier,
7071
Process.__init__(self)
7172

7273
def __str__(self):
73-
return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \
74-
flush_timeout=%s, timeout=%s]" % (
75-
self.topic, self.producer_flush_buffer,
76-
self.producer_flush_timeout, self.producer_timeout)
74+
return "[KafkaProducerProcess: topic=%s, \
75+
flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
76+
(self.topic,
77+
self.producer_flush_buffer,
78+
self.producer_flush_timeout,
79+
self.producer_timeout)
7780

7881
def run(self):
7982
self.barrier.wait()
@@ -104,8 +107,8 @@ def flush(messages):
104107
last_produce = time.time()
105108

106109
try:
107-
msg = KafkaClient.create_message(self.in_queue.get(True,
108-
self.producer_timeout))
110+
msg = KafkaClient.create_message(
111+
self.in_queue.get(True, self.producer_timeout))
109112
messages.append(msg)
110113

111114
except Empty:

0 commit comments

Comments
 (0)