diff --git a/example.py b/example.py index 3a2dc928b..846feba7f 100644 --- a/example.py +++ b/example.py @@ -5,8 +5,8 @@ from kafka.producer import SimpleProducer def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") + producer = SimpleProducer(client) + producer.send_messages("my-topic", "test") def consume_example(client): consumer = SimpleConsumer(client, "test-group", "my-topic") diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa7603c..1584c4792 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,13 +9,13 @@ from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) -from kafka.producer import SimpleProducer, KeyedProducer -from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner -from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.producer import ConsoleProducer, KeyedProducer, SimpleProducer +from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner +from kafka.consumer import ConsoleConsumer, MultiProcessConsumer, SimpleConsumer __all__ = [ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', - 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', - 'MultiProcessConsumer', 'create_message', 'create_gzip_message', - 'create_snappy_message' + 'ConsoleProducer', 'RoundRobinPartitioner', 'HashedPartitioner', + 'ConsoleConsumer', 'SimpleConsumer', 'MultiProcessConsumer', + 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/_script_utils.py b/kafka/_script_utils.py new file mode 100644 index 000000000..b784d151b --- /dev/null +++ b/kafka/_script_utils.py @@ -0,0 +1,35 @@ +import logging +from optparse import OptionParser, Option, OptionValueError + +from kafka import KafkaClient + +logging.basicConfig() + +# Add this attribute to easily check if the option is required +Option.ATTRS.append("required") + +BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True, + help="Required: The address of a kafka broker") +TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True, + help="Required: The topic to consume from") + +def parse_options(*extra_options): + parser = OptionParser() + options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options) + parser.add_options(options) + (opts, args) = parser.parse_args() + + missing = [o._long_opts[0] for o in options + if o.required and getattr(opts, o.dest) is None] + if missing: + parser.error("Missing required option(s) %s" % ", ".join(missing)) + + return opts + +def get_client(broker, client_id=KafkaClient.CLIENT_ID): + try: + (host, port) = broker.split(':') + except ValueError: + raise OptionValueError("Broker should be in the form 'host:port'") + + return KafkaClient(host, int(port), client_id) diff --git a/kafka/consumer.py b/kafka/consumer.py index 28b53ec92..5658de2f2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -674,3 +674,9 @@ def get_messages(self, count=1, block=True, timeout=10): self._auto_commit() return messages + + +class ConsoleConsumer(SimpleConsumer): + def run(self): + for message in self: + print(message.message.value) diff --git a/kafka/producer.py b/kafka/producer.py index 12a293401..cf7b8003b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -255,3 +255,13 @@ def send(self, topic, key, msg): def __repr__(self): return '' % self.async + + +class ConsoleProducer(SimpleProducer): + def run(self, topic): + import readline + while True: + try: + self.send_messages(topic, raw_input()) + except EOFError: + break diff --git a/kafka/util.py b/kafka/util.py index 54052fb03..16f0a6f19 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from collections import defaultdict import struct from threading import Thread, Event +from time import sleep, time from kafka.common import BufferUnderflowError @@ -114,3 +115,23 @@ def stop(self): self.active.set() self.thread.join(self.t + 1) self.timer = None + +DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS = 30 +def ensure_topic_creation(client, topic, + timeout=DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS): + if timeout is not None: + max_time = time() + timeout + + while timeout is None or timeout > 0: + client.load_metadata_for_topics(topic) + if client.has_metadata_for_topic(topic): + return + + if timeout is not None: + # If we have a timeout, reduce it to the appropriate value + timeout = max_time - time() + + sleep(1) + + raise RuntimeError("Unable to create topic %s" % topic) + diff --git a/scripts/kp_consumer b/scripts/kp_consumer new file mode 100755 index 000000000..933b058d1 --- /dev/null +++ b/scripts/kp_consumer @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +from optparse import Option +import sys + +from kafka import ConsoleConsumer +from kafka._script_utils import parse_options, get_client + +GROUP_OPTION = Option("-g", "--group", dest="group", required=True, + help="Required: The consumer group") +CLIENT_ID = "kp_consumer" + +def main(): + options = parse_options(GROUP_OPTION) + client = get_client(options.broker, CLIENT_ID) + consumer = ConsoleConsumer(client, options.group, options.topic, + auto_commit=False) + try: + consumer.run() + except KeyboardInterrupt: + consumer.stop() + finally: + client.close() + print("Done!") + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/kp_create_topic b/scripts/kp_create_topic new file mode 100755 index 000000000..38cb268f0 --- /dev/null +++ b/scripts/kp_create_topic @@ -0,0 +1,22 @@ +#!/usr/bin/env python + +import sys + +from kafka.util import ensure_topic_creation +from kafka._script_utils import parse_options, get_client + +CLIENT_ID = "kp_create_topic" + +def main(): + options = parse_options() + client = get_client(options.broker, CLIENT_ID) + try: + print "Creating topic %s..." % options.topic + ensure_topic_creation(client, options.topic) + finally: + client.close() + print("Done!") + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/kp_producer b/scripts/kp_producer new file mode 100755 index 000000000..1cfd8389c --- /dev/null +++ b/scripts/kp_producer @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +import sys + +from kafka import ConsoleProducer +from kafka._script_utils import parse_options, get_client + +CLIENT_ID = "kp_producer" + +def main(): + options = parse_options() + client = get_client(options.broker, CLIENT_ID) + producer = ConsoleProducer(client) + try: + producer.run(options.topic) + producer.stop() + except KeyboardInterrupt: + producer.stop() + finally: + client.close() + print("Done!") + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/setup.py b/setup.py index 0869fee3e..1f9ca26c5 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ def run(self): cmdclass={"test": Tox}, packages=["kafka"], + scripts=["scripts/kp_consumer", "scripts/kp_producer", "scripts/kp_create_topic"], author="David Arthur", author_email="mumrah@gmail.com", diff --git a/test/test_integration.py b/test/test_integration.py index d0da523eb..5b317da72 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -9,6 +9,7 @@ from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.util import ensure_topic_creation from .fixtures import ZookeeperFixture, KafkaFixture @@ -17,20 +18,6 @@ def random_string(l): return s -def ensure_topic_creation(client, topic_name): - times = 0 - while True: - times += 1 - client.load_metadata_for_topics(topic_name) - if client.has_metadata_for_topic(topic_name): - break - print "Waiting for %s topic to be created" % topic_name - time.sleep(1) - - if times > 30: - raise Exception("Unable to create topic %s" % topic_name) - - class KafkaTestCase(unittest.TestCase): def setUp(self): self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))