Skip to content

Commit d2ee35c

Browse files
committed
work on pluggable API for worker-side monitoring radio
patch so far is overloading self.monitoring_radio for two different uses that should be clarified: submit side radio and worker side radio its a bit complicated for having a default radio (UDPRadio or HTEXRadio) even when monitoring is turned off: I should check that the radio receiver does not get activated in this case: for example: * test that broken radio activation causes a startup error * test that configuration with broken radio doesn't cause a startup error when monitoringhub is not configured zmq radio should always listen, and be the place where all radio receivers send their data. udp radio and filesystem radio should turn into separate... something... processes? for prototyping i guess it doesn't matter where I make them live, but the most behaviour preserving would keep them somehow separated?
1 parent dc94b8a commit d2ee35c

File tree

18 files changed

+516
-347
lines changed

18 files changed

+516
-347
lines changed

Diff for: parsl/dataflow/dflow.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -744,16 +744,16 @@ def launch_task(self, task_record: TaskRecord) -> Future:
744744

745745
if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
746746
wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO
747+
747748
(function, args, kwargs) = monitor_wrapper(f=function,
748749
args=args,
749750
kwargs=kwargs,
750751
x_try_id=try_id,
751752
x_task_id=task_id,
752-
monitoring_hub_url=self.monitoring.monitoring_hub_url,
753+
radio_config=executor.remote_monitoring_radio_config,
753754
run_id=self.run_id,
754755
logging_level=wrapper_logging_level,
755756
sleep_dur=self.monitoring.resource_monitoring_interval,
756-
radio_mode=executor.radio_mode,
757757
monitor_resources=executor.monitor_resources(),
758758
run_dir=self.run_dir)
759759

@@ -1181,6 +1181,18 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
11811181
executor.hub_address = self.monitoring.hub_address
11821182
executor.hub_zmq_port = self.monitoring.hub_zmq_port
11831183
executor.submit_monitoring_radio = self.monitoring.radio
1184+
# this will modify the radio config object: it will add relevant parameters needed
1185+
# for the particular remote radio sender to communicate back
1186+
logger.info("starting monitoring receiver "
1187+
f"for executor {executor} "
1188+
f"with remote monitoring radio config {executor.remote_monitoring_radio_config}")
1189+
executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config,
1190+
ip=self.monitoring.hub_address)
1191+
# TODO: this is a weird way to start the receiver.
1192+
# Rather than in executor.start, but there's a tangle here
1193+
# trying to make the executors usable in a non-pure-parsl
1194+
# context where there is no DFK to grab config out of?
1195+
# (and no monitoring...)
11841196
if hasattr(executor, 'provider'):
11851197
if hasattr(executor.provider, 'script_dir'):
11861198
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')

Diff for: parsl/executors/base.py

+34-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
import logging
12
import os
23
from abc import ABCMeta, abstractmethod
34
from concurrent.futures import Future
45
from typing import Any, Callable, Dict, Optional
56

67
from typing_extensions import Literal, Self
78

8-
from parsl.monitoring.radios import MonitoringRadioSender
9+
from parsl.monitoring.radios.base import (
10+
MonitoringRadioReceiver,
11+
MonitoringRadioSender,
12+
RadioConfig,
13+
)
14+
from parsl.monitoring.radios.udp import UDPRadio
15+
16+
logger = logging.getLogger(__name__)
917

1018

1119
class ParslExecutor(metaclass=ABCMeta):
@@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta):
1927
no arguments and re-raises any thrown exception.
2028
2129
In addition to the listed methods, a ParslExecutor instance must always
22-
have a member field:
30+
have these member fields:
2331
2432
label: str - a human readable label for the executor, unique
2533
with respect to other executors.
2634
27-
Per-executor monitoring behaviour can be influenced by exposing:
28-
29-
radio_mode: str - a string describing which radio mode should be used to
30-
send task resource data back to the submit side.
35+
remote_monitoring_radio_config: RadioConfig describing how tasks on this executor
36+
should report task resource status
3137
3238
An executor may optionally expose:
3339
@@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta):
4551
"""
4652

4753
label: str = "undefined"
48-
radio_mode: str = "udp"
4954

5055
def __init__(
5156
self,
5257
*,
58+
59+
# TODO: I'd like these two to go away but they're needed right now
60+
# to configure the interchange monitoring radio, that is
61+
# in addition to the submit and worker monitoring radios (!). They
62+
# are effectivley a third monitoring radio config, though, so what
63+
# should that look like for the interchange?
5364
hub_address: Optional[str] = None,
5465
hub_zmq_port: Optional[int] = None,
5566
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
@@ -58,10 +69,19 @@ def __init__(
5869
):
5970
self.hub_address = hub_address
6071
self.hub_zmq_port = hub_zmq_port
72+
73+
# these are parameters for the monitoring radio to be used on the remote side
74+
# eg. in workers - to send results back, and they should end up encapsulated
75+
# inside a RadioConfig.
6176
self.submit_monitoring_radio = submit_monitoring_radio
77+
self.remote_monitoring_radio_config: RadioConfig = UDPRadio()
78+
6279
self.run_dir = os.path.abspath(run_dir)
6380
self.run_id = run_id
6481

82+
# will be set externally later, which is pretty ugly
83+
self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None
84+
6585
def __enter__(self) -> Self:
6686
return self
6787

@@ -94,7 +114,13 @@ def shutdown(self) -> None:
94114
95115
This includes all attached resources such as workers and controllers.
96116
"""
97-
pass
117+
logger.debug("Starting base executor shutdown")
118+
# logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}")
119+
if self.monitoring_receiver is not None:
120+
logger.debug("Starting monitoring receiver shutdown")
121+
self.monitoring_receiver.shutdown()
122+
logger.debug("Done with monitoring receiver shutdown")
123+
logger.debug("Done with base executor shutdown")
98124

99125
def monitor_resources(self) -> bool:
100126
"""Should resource monitoring happen for tasks on running on this executor?

Diff for: parsl/executors/high_throughput/executor.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
from parsl.executors.status_handling import BlockProviderExecutor
2828
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
29+
from parsl.monitoring.radios.base import HTEXRadio, RadioConfig
2930
from parsl.process_loggers import wrap_with_logs
3031
from parsl.providers import LocalProvider
3132
from parsl.providers.base import ExecutionProvider
@@ -262,11 +263,13 @@ def __init__(self,
262263
enable_mpi_mode: bool = False,
263264
mpi_launcher: str = "mpiexec",
264265
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
265-
encrypted: bool = False):
266+
encrypted: bool = False,
267+
remote_monitoring_radio_config: Optional[RadioConfig] = None):
266268

267269
logger.debug("Initializing HighThroughputExecutor")
268270

269271
BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler)
272+
270273
self.label = label
271274
self.worker_debug = worker_debug
272275
self.storage_access = storage_access
@@ -310,6 +313,12 @@ def __init__(self,
310313
self._workers_per_node = 1 # our best guess-- we do not have any provider hints
311314

312315
self._task_counter = 0
316+
317+
if remote_monitoring_radio_config is not None:
318+
self.remote_monitoring_radio_config = remote_monitoring_radio_config
319+
else:
320+
self.remote_monitoring_radio_config = HTEXRadio()
321+
313322
self.worker_ports = worker_ports
314323
self.worker_port_range = worker_port_range
315324
self.interchange_proc: Optional[subprocess.Popen] = None
@@ -341,8 +350,6 @@ def __init__(self,
341350
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
342351
self.interchange_launch_cmd = interchange_launch_cmd
343352

344-
radio_mode = "htex"
345-
346353
def _warn_deprecated(self, old: str, new: str):
347354
warnings.warn(
348355
f"{old} is deprecated and will be removed in a future release. "
@@ -845,6 +852,9 @@ def shutdown(self, timeout: float = 10.0):
845852
logger.info("Closing command client")
846853
self.command_client.close()
847854

855+
# TODO: implement this across all executors
856+
super().shutdown()
857+
848858
logger.info("Finished HighThroughputExecutor shutdown attempt")
849859

850860
def get_usage_information(self):

Diff for: parsl/executors/high_throughput/interchange.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
2121
from parsl.executors.high_throughput.manager_record import ManagerRecord
2222
from parsl.monitoring.message_type import MessageType
23-
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
23+
from parsl.monitoring.radios.base import MonitoringRadioSender, ZMQRadioSender
2424
from parsl.process_loggers import wrap_with_logs
2525
from parsl.serialize import serialize as serialize_object
2626
from parsl.utils import setproctitle

Diff for: parsl/executors/high_throughput/mpi_executor.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from parsl.executors.status_handling import BlockProviderExecutor
1212
from parsl.jobs.states import JobStatus
13+
from parsl.monitoring.radios.base import RadioConfig
1314
from parsl.providers import LocalProvider
1415
from parsl.providers.base import ExecutionProvider
1516

@@ -56,7 +57,8 @@ def __init__(self,
5657
worker_logdir_root: Optional[str] = None,
5758
mpi_launcher: str = "mpiexec",
5859
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
59-
encrypted: bool = False):
60+
encrypted: bool = False,
61+
remote_monitoring_radio_config: Optional[RadioConfig] = None):
6062
super().__init__(
6163
# Hard-coded settings
6264
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
@@ -84,7 +86,15 @@ def __init__(self,
8486
worker_logdir_root=worker_logdir_root,
8587
mpi_launcher=mpi_launcher,
8688
block_error_handler=block_error_handler,
87-
encrypted=encrypted
89+
encrypted=encrypted,
90+
91+
# TODO:
92+
# worker-side monitoring in MPI-style code is probably going to be
93+
# broken - resource monitoring won't see any worker processes
94+
# most likely, as so perhaps it should have worker resource
95+
# monitoring disabled like the thread pool executor has?
96+
# (for related but different reasons...)
97+
remote_monitoring_radio_config=remote_monitoring_radio_config
8898
)
8999

90100
self.max_workers_per_block = max_workers_per_block

Diff for: parsl/executors/taskvine/executor.py

+2
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,8 @@ def shutdown(self, *args, **kwargs):
601601
self._finished_task_queue.close()
602602
self._finished_task_queue.join_thread()
603603

604+
super().shutdown()
605+
604606
logger.debug("TaskVine shutdown completed")
605607

606608
@wrap_with_logs

Diff for: parsl/executors/threads.py

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def shutdown(self, block=True):
7272
"""
7373
logger.debug("Shutting down executor, which involves waiting for running tasks to complete")
7474
self.executor.shutdown(wait=block)
75+
super().shutdown()
7576
logger.debug("Done with executor shutdown")
7677

7778
def monitor_resources(self):

Diff for: parsl/executors/workqueue/executor.py

+2
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,8 @@ def shutdown(self, *args, **kwargs):
713713
self.collector_queue.close()
714714
self.collector_queue.join_thread()
715715

716+
super().shutdown()
717+
716718
logger.debug("Work Queue shutdown completed")
717719

718720
@wrap_with_logs

Diff for: parsl/monitoring/monitoring.py

+41-17
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
import queue
77
import time
8-
from multiprocessing import Event, Process
8+
from multiprocessing import Event
99
from multiprocessing.queues import Queue
1010
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast
1111

@@ -14,7 +14,7 @@
1414
from parsl.log_utils import set_file_logger
1515
from parsl.monitoring.errors import MonitoringHubStartError
1616
from parsl.monitoring.message_type import MessageType
17-
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
17+
from parsl.monitoring.radios.base import MultiprocessingQueueRadioSender, RadioConfig
1818
from parsl.monitoring.router import router_starter
1919
from parsl.monitoring.types import AddressedMonitoringMessage
2020
from parsl.multiprocessing import ForkProcess, SizedQueue
@@ -129,7 +129,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
129129
# in the future, Queue will allow runtime subscripts.
130130

131131
if TYPE_CHECKING:
132-
comm_q: Queue[Union[Tuple[int, int], str]]
132+
comm_q: Queue[Union[int, str]]
133133
else:
134134
comm_q: Queue
135135

@@ -150,7 +150,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
150150
"resource_msgs": self.resource_msgs,
151151
"exit_event": self.router_exit_event,
152152
"hub_address": self.hub_address,
153-
"udp_port": self.hub_port,
154153
"zmq_port_range": self.hub_port_range,
155154
"logdir": self.logdir,
156155
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
@@ -172,13 +171,13 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
172171
self.dbm_proc.start()
173172
logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))
174173

175-
self.filesystem_proc = Process(target=filesystem_receiver,
176-
args=(self.logdir, self.resource_msgs, dfk_run_dir),
177-
name="Monitoring-Filesystem-Process",
178-
daemon=True
179-
)
180-
self.filesystem_proc.start()
181-
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
174+
# self.filesystem_proc = Process(target=filesystem_receiver,
175+
# args=(self.logdir, self.resource_msgs, dfk_run_dir),
176+
# name="Monitoring-Filesystem-Process",
177+
# daemon=True
178+
# )
179+
# self.filesystem_proc.start()
180+
# logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
182181

183182
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
184183

@@ -194,9 +193,23 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
194193
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
195194
raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}")
196195

197-
udp_port, zmq_port = comm_q_result
196+
zmq_port = comm_q_result
197+
198+
self.zmq_port = zmq_port
198199

199-
self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
200+
# need to initialize radio configs, perhaps first time a radio config is used
201+
# in each executor? (can't do that at startup because executor list is dynamic,
202+
# don't know all the executors till later)
203+
# self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
204+
# How can this config be populated properly?
205+
# There's a UDP port chosen right now by the monitoring router and
206+
# sent back a line above...
207+
# What does that look like for other radios? htexradio has no specific config at all,
208+
# filesystem radio has a path (that should have been created?) for config, and a loop
209+
# that needs to be running, started in this start method.
210+
# so something like... radio_config.receive() generates the appropriate receiver object?
211+
# which has a shutdown method on it for later. and also updates radio_config itself so
212+
# it has the right info to send across the wire? or some state driving like that?
200213

201214
logger.info("Monitoring Hub initialized")
202215

@@ -228,7 +241,7 @@ def close(self) -> None:
228241
)
229242
self.router_proc.terminate()
230243
self.dbm_proc.terminate()
231-
self.filesystem_proc.terminate()
244+
# self.filesystem_proc.terminate()
232245
logger.info("Setting router termination event")
233246
self.router_exit_event.set()
234247
logger.info("Waiting for router to terminate")
@@ -248,9 +261,9 @@ def close(self) -> None:
248261
# should this be message based? it probably doesn't need to be if
249262
# we believe we've received all messages
250263
logger.info("Terminating filesystem radio receiver process")
251-
self.filesystem_proc.terminate()
252-
self.filesystem_proc.join()
253-
self.filesystem_proc.close()
264+
# self.filesystem_proc.terminate()
265+
# self.filesystem_proc.join()
266+
# self.filesystem_proc.close()
254267

255268
logger.info("Closing monitoring multiprocessing queues")
256269
self.exception_q.close()
@@ -259,6 +272,17 @@ def close(self) -> None:
259272
self.resource_msgs.join_thread()
260273
logger.info("Closed monitoring multiprocessing queues")
261274

275+
def start_receiver(self, radio_config: RadioConfig, ip: str) -> Any:
276+
"""somehow start a radio receiver here and update radioconfig to be sent over the wire, without
277+
losing the info we need to shut down that receiver later...
278+
"""
279+
r = radio_config.create_receiver(ip=ip, resource_msgs=self.resource_msgs) # TODO: return a shutdownable...
280+
logger.info(f"BENC: created receiver {r}")
281+
# assert r is not None
282+
return r
283+
# ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without
284+
# expecting any more structure on it?
285+
262286

263287
@wrap_with_logs
264288
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:

0 commit comments

Comments
 (0)