Skip to content

Commit cfd34f0

Browse files
committed
split monitoring router into radio-specific receivers: zmq and UDP
this is in preparation for pluggable/configurable radios: see PR #3315 in that future work, these receivers wont be launched unless configured... or at least, the UDP one won't (and similarly the filesystem one) and in pluggable mode we would expect arbitrary new receivers to exist so then theres no reason for these ones to be special this opens up, in a future PR, the ability to not need to poll at such a high frequency - or at all... the poll-timeout is to allow the other poll to happen, but this PR makes those two polls happen in two threads.
1 parent 999b5fb commit cfd34f0

File tree

1 file changed

+44
-14
lines changed

1 file changed

+44
-14
lines changed

parsl/monitoring/router.py

+44-14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pickle
66
import queue
77
import socket
8+
import threading
89
import time
910
from multiprocessing.synchronize import Event
1011
from typing import Optional, Tuple, Union
@@ -108,7 +109,28 @@ def __init__(self,
108109
self.resource_msgs = resource_msgs
109110
self.exit_event = exit_event
110111

112+
@wrap_with_logs(target="monitoring_router")
111113
def start(self) -> None:
114+
self.logger.info("Starting UDP listener thread")
115+
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener)
116+
udp_radio_receiver_thread.start()
117+
118+
self.logger.info("Starting ZMQ listener thread")
119+
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener)
120+
zmq_radio_receiver_thread.start()
121+
122+
# exit when both of those have exiting
123+
# TODO: this is to preserve the existing behaviour of start(), but it
124+
# isn't necessarily the *right* thing to do...
125+
126+
self.logger.info("Joining on ZMQ listener thread")
127+
zmq_radio_receiver_thread.join()
128+
self.logger.info("Joining on UDP listener thread")
129+
udp_radio_receiver_thread.join()
130+
self.logger.info("Joined on both ZMQ and UDP listener threads")
131+
132+
@wrap_with_logs(target="monitoring_router")
133+
def start_udp_listener(self) -> None:
112134
try:
113135
while not self.exit_event.is_set():
114136
try:
@@ -119,6 +141,26 @@ def start(self) -> None:
119141
except socket.timeout:
120142
pass
121143

144+
self.logger.info("UDP listener draining")
145+
last_msg_received_time = time.time()
146+
while time.time() - last_msg_received_time < self.atexit_timeout:
147+
try:
148+
data, addr = self.udp_sock.recvfrom(2048)
149+
msg = pickle.loads(data)
150+
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
151+
self.resource_msgs.put((msg, addr))
152+
last_msg_received_time = time.time()
153+
except socket.timeout:
154+
pass
155+
156+
self.logger.info("UDP listener finishing normally")
157+
finally:
158+
self.logger.info("UDP listener finished")
159+
160+
@wrap_with_logs(target="monitoring_router")
161+
def start_zmq_listener(self) -> None:
162+
try:
163+
while not self.exit_event.is_set():
122164
try:
123165
dfk_loop_start = time.time()
124166
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
@@ -161,21 +203,9 @@ def start(self) -> None:
161203
# thing to do.
162204
self.logger.warning("Failure processing a ZMQ message", exc_info=True)
163205

164-
self.logger.info("Monitoring router draining")
165-
last_msg_received_time = time.time()
166-
while time.time() - last_msg_received_time < self.atexit_timeout:
167-
try:
168-
data, addr = self.udp_sock.recvfrom(2048)
169-
msg = pickle.loads(data)
170-
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
171-
self.resource_msgs.put((msg, addr))
172-
last_msg_received_time = time.time()
173-
except socket.timeout:
174-
pass
175-
176-
self.logger.info("Monitoring router finishing normally")
206+
self.logger.info("ZMQ listener finishing normally")
177207
finally:
178-
self.logger.info("Monitoring router finished")
208+
self.logger.info("ZMQ listener finished")
179209

180210

181211
@wrap_with_logs

0 commit comments

Comments
 (0)