-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster.py
141 lines (108 loc) · 4.09 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Batteries
import contextlib
import os
import signal
import time
from datetime import timedelta
# Third-party
from loguru import logger
# Local imports
from config import Config
from modules.api import PingAPI
from shared.process import UnixProcess
class PingMaster(UnixProcess):
"""
The ping master process will monitor and manage its children.
Args:
shared.process.UnixProcess (class): UnixProcess class.
"""
def __init__(self, configpath):
"""
Creates the main process.
Args:
configpath (str): The configuration file path.
"""
super().__init__('master process')
self._configpath = configpath
self._children = []
@staticmethod
def _loadchildren():
"""
Loads children from configuration into process map.
Raises:
InvalidConfiguration: On invalid configurations.
Returns:
list: Configured children list.
"""
return [
[PingAPI, {}, None],
]
def _monit(self):
"""
Monitors the instances of running processes.
"""
# For each children
for index, (cls, kwargs, proc) in enumerate(self._children):
# Spawn children if required
if proc is None:
try:
self._children[index][2] = cls(**kwargs)
self._children[index][2].start()
except Exception as e:
self._children.pop(index)
logger.error(f'Could not instantiate/spawn child \'{cls.__name__}\' due to: {str(e)}')
logger.info(f'Removed child \'{cls.__name__}\' from processes list.')
elif not (proc.is_alive() or proc.exitcode is None):
# Join process and delete instance
proc.join(timeout=0.1)
self._children[index][2] = None
@logger.catch
def run(self):
"""
This will run in a separate process.
"""
# Daemonize
self.daemonize()
# Create helper sink
logger.add(
Config.getpath('log.file'),
level=Config.get('log.level'), colorize=True, enqueue=True,
format='<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> |'
'<yellow>{process.name: <23}</yellow> | '
'<level>{message}</level> (<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan>)',
rotation=timedelta(days=1), retention=timedelta(days=30), compression='gz')
# Set process title
self.setprocname()
# Set signal handlers
# self.sigreg(signal.SIGHUP, self._reload)
self.sigreg(signal.SIGINT, self.sighandler)
self.sigreg(signal.SIGTERM, self.sighandler)
# Write PID file
with open(Config.getpath('pidfile'), 'w+') as pidfile:
pidfile.write(str(os.getpid()))
# Load children processes
self._children = self._loadchildren()
# While not stopping
while self._stop is False:
# Monit instances
self._monit()
time.sleep(1)
logger.debug('Terminating...')
# Stop all children whose instance is not None
children = [proc for _, _, proc in self._children if proc]
# While children have not stopped
while children:
for index, proc in enumerate(children):
logger.debug(f'Terminating child: {proc.name} with pid {proc.pid}...')
# Send SIGTERM to child process
os.kill(proc.pid, signal.SIGINT if isinstance(proc, PingAPI) else signal.SIGTERM)
# On join fail, SIGKILL child process
proc.join(timeout=1)
# If child has not stopped, give it time
if proc.is_alive() or proc.exitcode is None:
continue
# Remove children
children.pop(index)
# Remove pidfile and socket
with contextlib.suppress(FileNotFoundError):
os.unlink(Config.getpath('pidfile'))