Skip to content

Commit b992bc8

Browse files
committed
split monitoring router into radio-specific receivers: zmq and UDP
this is in preparation for pluggable/configurable radios: see PR #3315 in that future work, these receivers wont be launched unless configured... or at least, the UDP one won't (and similarly the filesystem one) and in pluggable mode we would expect arbitrary new receivers to exist so then theres no reason for these ones to be special this opens up, in a future PR, the ability to not need to poll at such a high frequency - or at all... the poll-timeout is to allow the other poll to happen, but this PR makes those two polls happen in two threads.
1 parent ffbf3d3 commit b992bc8

File tree

1 file changed

+44
-14
lines changed

1 file changed

+44
-14
lines changed

parsl/monitoring/router.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pickle
66
import queue
77
import socket
8+
import threading
89
import time
910
from multiprocessing.synchronize import Event
1011
from typing import Optional, Tuple, Union
@@ -105,7 +106,28 @@ def __init__(self,
105106
self.resource_msgs = resource_msgs
106107
self.exit_event = exit_event
107108

109+
@wrap_with_logs(target="monitoring_router")
108110
def start(self) -> None:
111+
self.logger.info("Starting UDP listener thread")
112+
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener)
113+
udp_radio_receiver_thread.start()
114+
115+
self.logger.info("Starting ZMQ listener thread")
116+
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener)
117+
zmq_radio_receiver_thread.start()
118+
119+
# exit when both of those have exiting
120+
# TODO: this is to preserve the existing behaviour of start(), but it
121+
# isn't necessarily the *right* thing to do...
122+
123+
self.logger.info("Joining on ZMQ listener thread")
124+
zmq_radio_receiver_thread.join()
125+
self.logger.info("Joining on UDP listener thread")
126+
udp_radio_receiver_thread.join()
127+
self.logger.info("Joined on both ZMQ and UDP listener threads")
128+
129+
@wrap_with_logs(target="monitoring_router")
130+
def start_udp_listener(self) -> None:
109131
try:
110132
while not self.exit_event.is_set():
111133
try:
@@ -116,6 +138,26 @@ def start(self) -> None:
116138
except socket.timeout:
117139
pass
118140

141+
self.logger.info("UDP listener draining")
142+
last_msg_received_time = time.time()
143+
while time.time() - last_msg_received_time < self.atexit_timeout:
144+
try:
145+
data, addr = self.udp_sock.recvfrom(2048)
146+
msg = pickle.loads(data)
147+
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
148+
self.resource_msgs.put((msg, addr))
149+
last_msg_received_time = time.time()
150+
except socket.timeout:
151+
pass
152+
153+
self.logger.info("UDP listener finishing normally")
154+
finally:
155+
self.logger.info("UDP listener finished")
156+
157+
@wrap_with_logs(target="monitoring_router")
158+
def start_zmq_listener(self) -> None:
159+
try:
160+
while not self.exit_event.is_set():
119161
try:
120162
dfk_loop_start = time.time()
121163
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
@@ -158,21 +200,9 @@ def start(self) -> None:
158200
# thing to do.
159201
self.logger.warning("Failure processing a ZMQ message", exc_info=True)
160202

161-
self.logger.info("Monitoring router draining")
162-
last_msg_received_time = time.time()
163-
while time.time() - last_msg_received_time < self.atexit_timeout:
164-
try:
165-
data, addr = self.udp_sock.recvfrom(2048)
166-
msg = pickle.loads(data)
167-
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
168-
self.resource_msgs.put((msg, addr))
169-
last_msg_received_time = time.time()
170-
except socket.timeout:
171-
pass
172-
173-
self.logger.info("Monitoring router finishing normally")
203+
self.logger.info("ZMQ listener finishing normally")
174204
finally:
175-
self.logger.info("Monitoring router finished")
205+
self.logger.info("ZMQ listener finished")
176206

177207

178208
@wrap_with_logs

0 commit comments

Comments
 (0)