Skip to content

Commit ffbf3d3

Browse files
committed
make some parameters be included as self attributes
this is to avoid wiring them around multiple method calls when self is already there these are for use by upcoming threads they are all multiprocessing objects, so should all be thread-safe, so it should be fine to make them accessible as self attributes from multiple threads
1 parent 7c12fed commit ffbf3d3

File tree

1 file changed

+29
-17
lines changed

1 file changed

+29
-17
lines changed

parsl/monitoring/router.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ def __init__(self,
3232
logdir: str = ".",
3333
run_id: str,
3434
logging_level: int = logging.INFO,
35-
atexit_timeout: int = 3 # in seconds
35+
atexit_timeout: int = 3, # in seconds
36+
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
37+
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
38+
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
39+
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
40+
exit_event: Event,
3641
):
3742
""" Initializes a monitoring configuration class.
3843
@@ -52,6 +57,7 @@ def __init__(self,
5257
atexit_timeout : float, optional
5358
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
5459
60+
TODO: documentation of new parameters
5561
"""
5662
os.makedirs(logdir, exist_ok=True)
5763
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
@@ -93,19 +99,20 @@ def __init__(self,
9399
min_port=zmq_port_range[0],
94100
max_port=zmq_port_range[1])
95101

96-
def start(self,
97-
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
98-
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
99-
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
100-
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
101-
exit_event: Event) -> None:
102+
self.priority_msgs = priority_msgs
103+
self.node_msgs = node_msgs
104+
self.block_msgs = block_msgs
105+
self.resource_msgs = resource_msgs
106+
self.exit_event = exit_event
107+
108+
def start(self) -> None:
102109
try:
103-
while not exit_event.is_set():
110+
while not self.exit_event.is_set():
104111
try:
105112
data, addr = self.udp_sock.recvfrom(2048)
106113
resource_msg = pickle.loads(data)
107114
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
108-
resource_msgs.put((resource_msg, addr))
115+
self.resource_msgs.put((resource_msg, addr))
109116
except socket.timeout:
110117
pass
111118

@@ -125,15 +132,15 @@ def start(self,
125132

126133
if msg[0] == MessageType.NODE_INFO:
127134
msg[1]['run_id'] = self.run_id
128-
node_msgs.put(msg_0)
135+
self.node_msgs.put(msg_0)
129136
elif msg[0] == MessageType.RESOURCE_INFO:
130-
resource_msgs.put(msg_0)
137+
self.resource_msgs.put(msg_0)
131138
elif msg[0] == MessageType.BLOCK_INFO:
132-
block_msgs.put(msg_0)
139+
self.block_msgs.put(msg_0)
133140
elif msg[0] == MessageType.TASK_INFO:
134-
priority_msgs.put(msg_0)
141+
self.priority_msgs.put(msg_0)
135142
elif msg[0] == MessageType.WORKFLOW_INFO:
136-
priority_msgs.put(msg_0)
143+
self.priority_msgs.put(msg_0)
137144
else:
138145
# There is a type: ignore here because if msg[0]
139146
# is of the correct type, this code is unreachable,
@@ -158,7 +165,7 @@ def start(self,
158165
data, addr = self.udp_sock.recvfrom(2048)
159166
msg = pickle.loads(data)
160167
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
161-
resource_msgs.put((msg, addr))
168+
self.resource_msgs.put((msg, addr))
162169
last_msg_received_time = time.time()
163170
except socket.timeout:
164171
pass
@@ -191,7 +198,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
191198
zmq_port_range=zmq_port_range,
192199
logdir=logdir,
193200
logging_level=logging_level,
194-
run_id=run_id)
201+
run_id=run_id,
202+
priority_msgs=priority_msgs,
203+
node_msgs=node_msgs,
204+
block_msgs=block_msgs,
205+
resource_msgs=resource_msgs,
206+
exit_event=exit_event)
195207
except Exception as e:
196208
logger.error("MonitoringRouter construction failed.", exc_info=True)
197209
comm_q.put(f"Monitoring router construction failed: {e}")
@@ -200,7 +212,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
200212

201213
router.logger.info("Starting MonitoringRouter in router_starter")
202214
try:
203-
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
215+
router.start()
204216
except Exception as e:
205217
router.logger.exception("router.start exception")
206218
exception_q.put(('Hub', str(e)))

0 commit comments

Comments
 (0)