Skip to content

Commit 40506c2

Browse files
committed
Merge branch 'issue-28'
Closes dpkp#28
2 parents 6704050 + dd109e2 commit 40506c2

File tree

13 files changed

+567
-338
lines changed

13 files changed

+567
-338
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1+
*.egg-info
12
*.pyc
3+
.tox
24
build
5+
dist
6+
MANIFEST

README.md

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,12 @@ pip install python-snappy
101101

102102
# Tests
103103

104-
Some of the tests will fail if Snappy is not installed. These tests will throw
105-
NotImplementedError. If you see other failures, they might be bugs - so please
106-
report them!
107-
108104
## Run the unit tests
109105

110106
_These are broken at the moment_
111107

112108
```shell
113-
python -m test.unit
109+
tox ./test/test_unit.py
114110
```
115111

116112
## Run the integration tests
@@ -125,15 +121,9 @@ cd kafka-src
125121
./sbt package
126122
```
127123

128-
Next start up a ZooKeeper server on localhost:2181
129-
130-
```shell
131-
/opt/zookeeper/bin/zkServer.sh start
132-
```
133-
134-
And finally run the tests. This will actually start up real Kafka brokers and send messages in using the
135-
client.
124+
And then run the tests. This will actually start up real local Zookeeper
125+
instance and Kafka brokers, and send messages in using the client.
136126

137127
```shell
138-
python -m test.integration
128+
tox ./test/test_integration.py
139129
```

kafka/codec.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,48 @@
11
from cStringIO import StringIO
22
import gzip
3-
import logging
4-
5-
log = logging.getLogger("kafka.codec")
63

74
try:
85
import snappy
9-
hasSnappy = True
6+
_has_snappy = True
107
except ImportError:
11-
log.warn("Snappy codec not available")
12-
hasSnappy = False
8+
_has_snappy = False
9+
10+
11+
def has_gzip():
12+
return True
13+
14+
15+
def has_snappy():
16+
return _has_snappy
1317

1418

1519
def gzip_encode(payload):
16-
buf = StringIO()
17-
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
18-
f.write(payload)
19-
f.close()
20-
buf.seek(0)
21-
out = buf.read()
22-
buf.close()
23-
return out
20+
buffer = StringIO()
21+
handle = gzip.GzipFile(fileobj=buffer, mode="w")
22+
handle.write(payload)
23+
handle.close()
24+
buffer.seek(0)
25+
result = buffer.read()
26+
buffer.close()
27+
return result
2428

2529

2630
def gzip_decode(payload):
27-
buf = StringIO(payload)
28-
f = gzip.GzipFile(fileobj=buf, mode='r')
29-
out = f.read()
30-
f.close()
31-
buf.close()
32-
return out
31+
buffer = StringIO(payload)
32+
handle = gzip.GzipFile(fileobj=buffer, mode='r')
33+
result = handle.read()
34+
handle.close()
35+
buffer.close()
36+
return result
3337

3438

3539
def snappy_encode(payload):
36-
if not hasSnappy:
37-
raise NotImplementedError("Snappy codec not available")
40+
if not _has_snappy:
41+
raise NotImplementedError("Snappy codec is not available")
3842
return snappy.compress(payload)
3943

4044

4145
def snappy_decode(payload):
42-
if not hasSnappy:
43-
raise NotImplementedError("Snappy codec not available")
46+
if not _has_snappy:
47+
raise NotImplementedError("Snappy codec is not available")
4448
return snappy.decompress(payload)

kafka/consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ def get_or_init_offset_callback(resp):
8383
for partition in self.client.topic_partitions[topic]:
8484
self.offsets[partition] = 0
8585

86+
def stop(self):
87+
if self.commit_timer is not None:
88+
self.commit_timer.stop()
89+
self.commit()
90+
8691
def seek(self, offset, whence):
8792
"""
8893
Alter the current offset in the consumer, similar to fseek

setup.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,41 @@
1-
from distutils.core import setup
1+
import os.path
2+
import sys
3+
4+
from setuptools import setup, Command
5+
6+
7+
class Tox(Command):
8+
user_options = []
9+
def initialize_options(self):
10+
pass
11+
12+
def finalize_options(self):
13+
pass
14+
15+
def run(self):
16+
import tox
17+
sys.exit(tox.cmdline([]))
18+
219

320
setup(
421
name="kafka-python",
522
version="0.8.1-1",
23+
24+
install_requires=["distribute", "tox"],
25+
tests_require=["tox"],
26+
cmdclass={"test": Tox},
27+
28+
packages=["kafka"],
29+
630
author="David Arthur",
731
author_email="[email protected]",
832
url="https://github.com/mumrah/kafka-python",
9-
packages=["kafka"],
1033
license="Copyright 2012, David Arthur under Apache License, v2.0",
1134
description="Pure Python client for Apache Kafka",
12-
long_description=open("README.md").read(),
35+
long_description="""
36+
This module provides low-level protocol support for Apache Kafka as well as
37+
high-level consumer and producer classes. Request batching is supported by the
38+
protocol as well as broker-aware request routing. Gzip and Snappy compression
39+
is also supported for message sets.
40+
"""
1341
)

0 commit comments

Comments
 (0)