-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
116 lines (106 loc) · 4.38 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import sys
import time
import re
import ssl
import json
import importlib
import logging
import random
import string
import multiprocessing
import pika
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), 'processors'))
log_format = ('%(asctime)s - %(levelname)s - %(name)s - %(funcName)s: %(message)s')
logging.basicConfig(level='INFO', format=log_format)
class ECConsumer(object):
exchange = 'xpublic'
exchange_type = 'topic'
queue = 'q_anonymous_PySarra_'
for i in range(0, 12):
queue = queue + random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
def __init__(self, task):
self.task = task
self.routing_key = 'v02.post.'
self.topic = None
self.regex = None
if 'topic' in task:
if task['topic']:
if not task['topic'].startswith('*.WXO-DD.'):
task['topic'] = '*.WXO-DD.' + task['topic']
self.routing_key = self.routing_key + str(task['topic']) + '.'
self.topic = task['topic']
self.routing_key = self.routing_key + '#'
if 'regex' in task:
if task['regex']:
self.regex = re.compile(task['regex'])
self.processor = importlib.import_module(task['processor'])
def connect(self):
credentials = pika.PlainCredentials('anonymous', 'anonymous')
context = ssl.create_default_context()
parameters = pika.ConnectionParameters('dd.weather.gc.ca',
5671,
'/',
credentials,
ssl_options=pika.SSLOptions(context, server_hostname='weather.gc.ca'))
return pika.BlockingConnection(parameters)
def run(self):
connection = None
try:
connection = self.connect()
channel = connection.channel()
channel.queue_declare(queue=self.queue)
channel.queue_bind(exchange=self.exchange, queue=self.queue, routing_key=self.routing_key)
channel.basic_consume(queue=self.queue, on_message_callback=self.on_message, auto_ack=False)
logging.info('starting consumer')
channel.start_consuming()
except Exception as e:
logging.error(f'error: {e}')
if connection:
connection.close()
logging.info('shutting down consumer due to error')
sys.exit(1)
def on_message(self, channel, method, properties, body):
logging.debug(f'received message: {body}')
topic = self.topic.lstrip('*.WXO-DD.')
url = body.split()[1].decode() + body.split()[2].decode()
filename = url.split('/')[-1]
if not self.regex or self.regex.search(url):
message = {
'topic': topic,
'url': url,
'filename': filename
}
self.processor.process(message)
channel.basic_ack(method.delivery_tag)
if __name__ == '__main__':
script_path = os.path.dirname(os.path.realpath(__file__))
config_path = os.path.join(script_path, 'config.json')
config = json.load(open(config_path))
processes = []
retries = 0
max_retries = 5
for task in config['tasks']:
consumer = ECConsumer(task)
process = multiprocessing.Process(target=consumer.run, name=json.dumps(task))
processes.append(process)
process.start()
while True:
process_restarted = False
time.sleep(5)
for index, process in enumerate(processes):
if not process.is_alive():
if retries == max_retries:
logging.critical(f"all restart attempts for task {process.name} failed. shutting down.")
for p in processes:
p.terminate()
sys.exit(1)
logging.error(f"task {process.name} died. restarting... (attempt {retries + 1}/{max_retries})")
consumer = ECConsumer(task)
process = multiprocessing.Process(target=consumer.run, name=json.dumps(task))
processes[index] = process
process.start()
process_restarted = True
# do not increment the retry counter for every dead process
if process_restarted:
retries += 1