-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy path_script_utils.py
35 lines (26 loc) · 1.15 KB
/
_script_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import logging
from optparse import OptionParser, Option, OptionValueError
from kafka import KafkaClient
logging.basicConfig()
# Add this attribute to easily check if the option is required
Option.ATTRS.append("required")
BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True,
help="Required: The address of a kafka broker")
TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True,
help="Required: The topic to consume from")
def parse_options(*extra_options):
parser = OptionParser()
options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options)
parser.add_options(options)
(opts, args) = parser.parse_args()
missing = [o._long_opts[0] for o in options
if o.required and getattr(opts, o.dest) is None]
if missing:
parser.error("Missing required option(s) %s" % ", ".join(missing))
return opts
def get_client(broker, client_id=KafkaClient.CLIENT_ID):
try:
(host, port) = broker.split(':')
except ValueError:
raise OptionValueError("Broker should be in the form 'host:port'")
return KafkaClient(host, int(port), client_id)