Skip to content

Commit 7132279

Browse files
committed
work on pluggable API for worker-side monitoring radio
patch so far is overloading self.monitoring_radio for two different uses that should be clarified: submit side radio and worker side radio its a bit complicated for having a default radio (UDPRadio or HTEXRadio) even when monitoring is turned off: I should check that the radio receiver does not get activated in this case: for example: * test that broken radio activation causes a startup error * test that configuration with broken radio doesn't cause a startup error when monitoringhub is not configured zmq radio should always listen, and be the place where all radio receivers send their data. udp radio and filesystem radio should turn into separate... something... processes? for prototyping i guess it doesn't matter where I make them live, but the most behaviour preserving would keep them somehow separated?
1 parent 521971e commit 7132279

File tree

18 files changed

+510
-341
lines changed

18 files changed

+510
-341
lines changed

parsl/dataflow/dflow.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -744,16 +744,16 @@ def launch_task(self, task_record: TaskRecord) -> Future:
744744

745745
if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
746746
wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO
747+
747748
(function, args, kwargs) = monitor_wrapper(f=function,
748749
args=args,
749750
kwargs=kwargs,
750751
x_try_id=try_id,
751752
x_task_id=task_id,
752-
monitoring_hub_url=self.monitoring.monitoring_hub_url,
753+
radio_config=executor.remote_monitoring_radio_config,
753754
run_id=self.run_id,
754755
logging_level=wrapper_logging_level,
755756
sleep_dur=self.monitoring.resource_monitoring_interval,
756-
radio_mode=executor.radio_mode,
757757
monitor_resources=executor.monitor_resources(),
758758
run_dir=self.run_dir)
759759

@@ -1181,6 +1181,18 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
11811181
executor.hub_address = self.monitoring.hub_address
11821182
executor.hub_zmq_port = self.monitoring.hub_zmq_port
11831183
executor.submit_monitoring_radio = self.monitoring.radio
1184+
# this will modify the radio config object: it will add relevant parameters needed
1185+
# for the particular remote radio sender to communicate back
1186+
logger.info("starting monitoring receiver "
1187+
f"for executor {executor} "
1188+
f"with remote monitoring radio config {executor.remote_monitoring_radio_config}")
1189+
executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config,
1190+
ip=self.monitoring.hub_address)
1191+
# TODO: this is a weird way to start the receiver.
1192+
# Rather than in executor.start, but there's a tangle here
1193+
# trying to make the executors usable in a non-pure-parsl
1194+
# context where there is no DFK to grab config out of?
1195+
# (and no monitoring...)
11841196
if hasattr(executor, 'provider'):
11851197
if hasattr(executor.provider, 'script_dir'):
11861198
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')

parsl/executors/base.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
import logging
12
import os
23
from abc import ABCMeta, abstractmethod
34
from concurrent.futures import Future
45
from typing import Any, Callable, Dict, Optional
56

67
from typing_extensions import Literal, Self
78

8-
from parsl.monitoring.radios import MonitoringRadioSender
9+
from parsl.monitoring.radios.base import (
10+
MonitoringRadioReceiver,
11+
MonitoringRadioSender,
12+
RadioConfig,
13+
)
14+
from parsl.monitoring.radios.udp import UDPRadio
15+
16+
logger = logging.getLogger(__name__)
917

1018

1119
class ParslExecutor(metaclass=ABCMeta):
@@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta):
1927
no arguments and re-raises any thrown exception.
2028
2129
In addition to the listed methods, a ParslExecutor instance must always
22-
have a member field:
30+
have these member fields:
2331
2432
label: str - a human readable label for the executor, unique
2533
with respect to other executors.
2634
27-
Per-executor monitoring behaviour can be influenced by exposing:
28-
29-
radio_mode: str - a string describing which radio mode should be used to
30-
send task resource data back to the submit side.
35+
remote_monitoring_radio_config: RadioConfig describing how tasks on this executor
36+
should report task resource status
3137
3238
An executor may optionally expose:
3339
@@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta):
4551
"""
4652

4753
label: str = "undefined"
48-
radio_mode: str = "udp"
4954

5055
def __init__(
5156
self,
5257
*,
58+
59+
# TODO: I'd like these two to go away but they're needed right now
60+
# to configure the interchange monitoring radio, that is
61+
# in addition to the submit and worker monitoring radios (!). They
62+
# are effectivley a third monitoring radio config, though, so what
63+
# should that look like for the interchange?
5364
hub_address: Optional[str] = None,
5465
hub_zmq_port: Optional[int] = None,
5566
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
@@ -58,10 +69,19 @@ def __init__(
5869
):
5970
self.hub_address = hub_address
6071
self.hub_zmq_port = hub_zmq_port
72+
73+
# these are parameters for the monitoring radio to be used on the remote side
74+
# eg. in workers - to send results back, and they should end up encapsulated
75+
# inside a RadioConfig.
6176
self.submit_monitoring_radio = submit_monitoring_radio
77+
self.remote_monitoring_radio_config: RadioConfig = UDPRadio()
78+
6279
self.run_dir = os.path.abspath(run_dir)
6380
self.run_id = run_id
6481

82+
# will be set externally later, which is pretty ugly
83+
self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None
84+
6585
def __enter__(self) -> Self:
6686
return self
6787

@@ -94,7 +114,13 @@ def shutdown(self) -> None:
94114
95115
This includes all attached resources such as workers and controllers.
96116
"""
97-
pass
117+
logger.debug("Starting base executor shutdown")
118+
# logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}")
119+
if self.monitoring_receiver is not None:
120+
logger.debug("Starting monitoring receiver shutdown")
121+
self.monitoring_receiver.shutdown()
122+
logger.debug("Done with monitoring receiver shutdown")
123+
logger.debug("Done with base executor shutdown")
98124

99125
def monitor_resources(self) -> bool:
100126
"""Should resource monitoring happen for tasks on running on this executor?

parsl/executors/high_throughput/executor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
)
3131
from parsl.executors.status_handling import BlockProviderExecutor
3232
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
33+
from parsl.monitoring.radios.base import HTEXRadio, RadioConfig
3334
from parsl.process_loggers import wrap_with_logs
3435
from parsl.providers import LocalProvider
3536
from parsl.providers.base import ExecutionProvider
@@ -267,11 +268,13 @@ def __init__(self,
267268
mpi_launcher: str = "mpiexec",
268269
manager_selector: ManagerSelector = RandomManagerSelector(),
269270
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
270-
encrypted: bool = False):
271+
encrypted: bool = False,
272+
remote_monitoring_radio_config: Optional[RadioConfig] = None):
271273

272274
logger.debug("Initializing HighThroughputExecutor")
273275

274276
BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler)
277+
275278
self.label = label
276279
self.worker_debug = worker_debug
277280
self.storage_access = storage_access
@@ -316,6 +319,12 @@ def __init__(self,
316319
self._workers_per_node = 1 # our best guess-- we do not have any provider hints
317320

318321
self._task_counter = 0
322+
323+
if remote_monitoring_radio_config is not None:
324+
self.remote_monitoring_radio_config = remote_monitoring_radio_config
325+
else:
326+
self.remote_monitoring_radio_config = HTEXRadio()
327+
319328
self.worker_ports = worker_ports
320329
self.worker_port_range = worker_port_range
321330
self.interchange_proc: Optional[subprocess.Popen] = None
@@ -347,8 +356,6 @@ def __init__(self,
347356
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
348357
self.interchange_launch_cmd = interchange_launch_cmd
349358

350-
radio_mode = "htex"
351-
352359
def _warn_deprecated(self, old: str, new: str):
353360
warnings.warn(
354361
f"{old} is deprecated and will be removed in a future release. "
@@ -852,6 +859,9 @@ def shutdown(self, timeout: float = 10.0):
852859
logger.info("Closing command client")
853860
self.command_client.close()
854861

862+
# TODO: implement this across all executors
863+
super().shutdown()
864+
855865
logger.info("Finished HighThroughputExecutor shutdown attempt")
856866

857867
def get_usage_information(self):

parsl/executors/high_throughput/interchange.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from parsl.executors.high_throughput.manager_record import ManagerRecord
2121
from parsl.executors.high_throughput.manager_selector import ManagerSelector
2222
from parsl.monitoring.message_type import MessageType
23-
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
23+
from parsl.monitoring.radios.base import MonitoringRadioSender, ZMQRadioSender
2424
from parsl.process_loggers import wrap_with_logs
2525
from parsl.serialize import serialize as serialize_object
2626
from parsl.utils import setproctitle

parsl/executors/high_throughput/mpi_executor.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from parsl.executors.status_handling import BlockProviderExecutor
1212
from parsl.jobs.states import JobStatus
13+
from parsl.monitoring.radios.base import RadioConfig
1314
from parsl.providers import LocalProvider
1415
from parsl.providers.base import ExecutionProvider
1516

@@ -56,7 +57,8 @@ def __init__(self,
5657
worker_logdir_root: Optional[str] = None,
5758
mpi_launcher: str = "mpiexec",
5859
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
59-
encrypted: bool = False):
60+
encrypted: bool = False,
61+
remote_monitoring_radio_config: Optional[RadioConfig] = None):
6062
super().__init__(
6163
# Hard-coded settings
6264
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
@@ -84,7 +86,15 @@ def __init__(self,
8486
worker_logdir_root=worker_logdir_root,
8587
mpi_launcher=mpi_launcher,
8688
block_error_handler=block_error_handler,
87-
encrypted=encrypted
89+
encrypted=encrypted,
90+
91+
# TODO:
92+
# worker-side monitoring in MPI-style code is probably going to be
93+
# broken - resource monitoring won't see any worker processes
94+
# most likely, as so perhaps it should have worker resource
95+
# monitoring disabled like the thread pool executor has?
96+
# (for related but different reasons...)
97+
remote_monitoring_radio_config=remote_monitoring_radio_config
8898
)
8999

90100
self.max_workers_per_block = max_workers_per_block

parsl/executors/taskvine/executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,8 @@ def shutdown(self, *args, **kwargs):
603603
self._finished_task_queue.close()
604604
self._finished_task_queue.join_thread()
605605

606+
super().shutdown()
607+
606608
logger.debug("TaskVine shutdown completed")
607609

608610
@wrap_with_logs

parsl/executors/threads.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def shutdown(self, block=True):
7272
"""
7373
logger.debug("Shutting down executor, which involves waiting for running tasks to complete")
7474
self.executor.shutdown(wait=block)
75+
super().shutdown()
7576
logger.debug("Done with executor shutdown")
7677

7778
def monitor_resources(self):

parsl/executors/workqueue/executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ def shutdown(self, *args, **kwargs):
715715
self.collector_queue.close()
716716
self.collector_queue.join_thread()
717717

718+
super().shutdown()
719+
718720
logger.debug("Work Queue shutdown completed")
719721

720722
@wrap_with_logs

parsl/monitoring/monitoring.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
import queue
77
import time
8-
from multiprocessing import Event, Process
8+
from multiprocessing import Event
99
from multiprocessing.queues import Queue
1010
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast
1111

@@ -14,7 +14,7 @@
1414
from parsl.log_utils import set_file_logger
1515
from parsl.monitoring.errors import MonitoringHubStartError
1616
from parsl.monitoring.message_type import MessageType
17-
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
17+
from parsl.monitoring.radios.base import MultiprocessingQueueRadioSender, RadioConfig
1818
from parsl.monitoring.router import router_starter
1919
from parsl.monitoring.types import AddressedMonitoringMessage
2020
from parsl.multiprocessing import ForkProcess, SizedQueue
@@ -129,7 +129,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
129129
# in the future, Queue will allow runtime subscripts.
130130

131131
if TYPE_CHECKING:
132-
comm_q: Queue[Union[Tuple[int, int], str]]
132+
comm_q: Queue[Union[int, str]]
133133
else:
134134
comm_q: Queue
135135

@@ -150,7 +150,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
150150
"resource_msgs": self.resource_msgs,
151151
"exit_event": self.router_exit_event,
152152
"hub_address": self.hub_address,
153-
"udp_port": self.hub_port,
154153
"zmq_port_range": self.hub_port_range,
155154
"logdir": self.logdir,
156155
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
@@ -172,13 +171,13 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
172171
self.dbm_proc.start()
173172
logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))
174173

175-
self.filesystem_proc = Process(target=filesystem_receiver,
176-
args=(self.logdir, self.resource_msgs, dfk_run_dir),
177-
name="Monitoring-Filesystem-Process",
178-
daemon=True
179-
)
180-
self.filesystem_proc.start()
181-
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
174+
# self.filesystem_proc = Process(target=filesystem_receiver,
175+
# args=(self.logdir, self.resource_msgs, dfk_run_dir),
176+
# name="Monitoring-Filesystem-Process",
177+
# daemon=True
178+
# )
179+
# self.filesystem_proc.start()
180+
# logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
182181

183182
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
184183

@@ -194,9 +193,23 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
194193
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
195194
raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}")
196195

197-
udp_port, zmq_port = comm_q_result
196+
zmq_port = comm_q_result
197+
198+
self.zmq_port = zmq_port
198199

199-
self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
200+
# need to initialize radio configs, perhaps first time a radio config is used
201+
# in each executor? (can't do that at startup because executor list is dynamic,
202+
# don't know all the executors till later)
203+
# self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
204+
# How can this config be populated properly?
205+
# There's a UDP port chosen right now by the monitoring router and
206+
# sent back a line above...
207+
# What does that look like for other radios? htexradio has no specific config at all,
208+
# filesystem radio has a path (that should have been created?) for config, and a loop
209+
# that needs to be running, started in this start method.
210+
# so something like... radio_config.receive() generates the appropriate receiver object?
211+
# which has a shutdown method on it for later. and also updates radio_config itself so
212+
# it has the right info to send across the wire? or some state driving like that?
200213

201214
logger.info("Monitoring Hub initialized")
202215

@@ -228,7 +241,7 @@ def close(self) -> None:
228241
)
229242
self.router_proc.terminate()
230243
self.dbm_proc.terminate()
231-
self.filesystem_proc.terminate()
244+
# self.filesystem_proc.terminate()
232245
logger.info("Setting router termination event")
233246
self.router_exit_event.set()
234247
logger.info("Waiting for router to terminate")
@@ -248,9 +261,9 @@ def close(self) -> None:
248261
# should this be message based? it probably doesn't need to be if
249262
# we believe we've received all messages
250263
logger.info("Terminating filesystem radio receiver process")
251-
self.filesystem_proc.terminate()
252-
self.filesystem_proc.join()
253-
self.filesystem_proc.close()
264+
# self.filesystem_proc.terminate()
265+
# self.filesystem_proc.join()
266+
# self.filesystem_proc.close()
254267

255268
logger.info("Closing monitoring multiprocessing queues")
256269
self.exception_q.close()
@@ -259,6 +272,17 @@ def close(self) -> None:
259272
self.resource_msgs.join_thread()
260273
logger.info("Closed monitoring multiprocessing queues")
261274

275+
def start_receiver(self, radio_config: RadioConfig, ip: str) -> Any:
276+
"""somehow start a radio receiver here and update radioconfig to be sent over the wire, without
277+
losing the info we need to shut down that receiver later...
278+
"""
279+
r = radio_config.create_receiver(ip=ip, resource_msgs=self.resource_msgs) # TODO: return a shutdownable...
280+
logger.info(f"BENC: created receiver {r}")
281+
# assert r is not None
282+
return r
283+
# ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without
284+
# expecting any more structure on it?
285+
262286

263287
@wrap_with_logs
264288
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:

0 commit comments

Comments
 (0)