-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimestamp_kafka_producer.py
62 lines (47 loc) · 2.11 KB
/
timestamp_kafka_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from kafka import KafkaProducer
import random
import time
import sys, os
import logging
import csv
import ujson
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
try:
logging.info("Initialization...")
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = sys.argv[1]
filename = sys.argv[2]
logging.info("Sending messages to kafka '%s' topic..." % topic)
with open(filename, 'rt') as f:
try:
reader = csv.DictReader(f, delimiter = '\t')
line = next(reader)
while line != None:
logging.info(line)
try:
next_line = next(reader)
this_tweet_epoch = time.mktime(time.strptime(line['PUBDATE'], "%d/%m/%Y %H:%M"))
next_tweet_epoch = time.mktime(time.strptime(next_line['PUBDATE'], "%d/%m/%Y %H:%M"))
time_to_sleep = abs(next_tweet_epoch - this_tweet_epoch)
if time_to_sleep > 60:
time_to_sleep = random.uniform(float(2), float(20))
elif time_to_sleep == 0 or time_to_sleep == 60:
time_to_sleep = random.uniform(float(0.0), float(0.3))
logging.info('Sleeping for %f seconds' % time_to_sleep)
producer.send(topic, bytes(ujson.dumps(line, escape_forward_slashes=False, encode_html_chars=False, ensure_ascii=False), 'utf8'))
time.sleep(time_to_sleep)
line = next_line
except StopIteration:
next_line = None
finally:
f.close()
logging.info("Waiting to complete delivery...")
producer.flush()
logging.info("End")
except KeyboardInterrupt:
logging.info('Interrupted from keyboard, shutdown')
try:
sys.exit(0)
except SystemExit:
os._exit(0)