Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not for merge] start a monitoring radio plugin API #3315

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from parsl.jobs.job_status_poller import JobStatusPoller
from parsl.monitoring import MonitoringHub
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.remote import monitor_wrapper
from parsl.process_loggers import wrap_with_logs
from parsl.usage_tracking.usage import UsageTracker
Expand Down Expand Up @@ -110,8 +111,11 @@ def __init__(self, config: Config) -> None:
self.monitoring: Optional[MonitoringHub]
self.monitoring = config.monitoring

self.monitoring_radio = None

if self.monitoring:
self.monitoring.start(self.run_dir, self.config.run_dir)
self.monitoring_radio = MultiprocessingQueueRadioSender(self.monitoring.resource_msgs)

self.time_began = datetime.datetime.now()
self.time_completed: Optional[datetime.datetime] = None
Expand Down Expand Up @@ -156,9 +160,9 @@ def __init__(self, config: Config) -> None:
'host': gethostname(),
}

if self.monitoring:
self.monitoring.send((MessageType.WORKFLOW_INFO,
workflow_info))
if self.monitoring_radio:
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
workflow_info))

if config.checkpoint_files is not None:
checkpoint_files = config.checkpoint_files
Expand Down Expand Up @@ -231,9 +235,9 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
raise InternalConsistencyError(f"Exit case for {mode} should be unreachable, validated by typeguard on Config()")

def _send_task_log_info(self, task_record: TaskRecord) -> None:
if self.monitoring:
if self.monitoring_radio:
task_log_info = self._create_task_log_info(task_record)
self.monitoring.send((MessageType.TASK_INFO, task_log_info))
self.monitoring_radio.send((MessageType.TASK_INFO, task_log_info))

def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -738,11 +742,10 @@ def launch_task(self, task_record: TaskRecord) -> Future:
kwargs=kwargs,
x_try_id=try_id,
x_task_id=task_id,
monitoring_hub_url=self.monitoring.monitoring_hub_url,
radio_config=executor.remote_monitoring_radio_config,
run_id=self.run_id,
logging_level=wrapper_logging_level,
sleep_dur=self.monitoring.resource_monitoring_interval,
radio_mode=executor.radio_mode,
monitor_resources=executor.monitor_resources(),
run_dir=self.run_dir)

Expand Down Expand Up @@ -1129,6 +1132,19 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
executor.run_dir = self.run_dir
if self.monitoring:
executor.monitoring_messages = self.monitoring.resource_msgs
# this will modify the radio config object: it will add relevant parameters needed
# for the particular remote radio sender to communicate back
logger.info("starting monitoring receiver "
f"for executor {executor} "
f"with remote monitoring radio config {executor.remote_monitoring_radio_config}")
executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config,
ip=self.monitoring.hub_address,
run_dir=self.run_dir)
# TODO: this is a weird way to start the receiver.
# Rather than in executor.start, but there's a tangle here
# trying to make the executors usable in a non-pure-parsl
# context where there is no DFK to grab config out of?
# (and no monitoring...)
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
Expand Down Expand Up @@ -1215,15 +1231,16 @@ def cleanup(self) -> None:
logger.info("Terminated executors")
self.time_completed = datetime.datetime.now()

if self.monitoring:
if self.monitoring_radio:
logger.info("Sending final monitoring message")
self.monitoring.send((MessageType.WORKFLOW_INFO,
{'tasks_failed_count': self.task_state_counts[States.failed],
'tasks_completed_count': self.task_state_counts[States.exec_done],
"time_began": self.time_began,
'time_completed': self.time_completed,
'run_id': self.run_id, 'rundir': self.run_dir}))
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
{'tasks_failed_count': self.task_state_counts[States.failed],
'tasks_completed_count': self.task_state_counts[States.exec_done],
"time_began": self.time_began,
'time_completed': self.time_completed,
'run_id': self.run_id, 'rundir': self.run_dir}))

if self.monitoring:
logger.info("Terminating monitoring")
self.monitoring.close()
logger.info("Terminated monitoring")
Expand Down
29 changes: 23 additions & 6 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import os
from abc import ABCMeta, abstractmethod
from concurrent.futures import Future
Expand All @@ -8,8 +9,12 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios.base import MonitoringRadioReceiver, RadioConfig
from parsl.monitoring.radios.udp import UDPRadio
from parsl.monitoring.types import TaggedMonitoringMessage

logger = logging.getLogger(__name__)


class ParslExecutor(metaclass=ABCMeta):
"""Executors are abstractions that represent available compute resources
Expand All @@ -27,10 +32,8 @@ class ParslExecutor(metaclass=ABCMeta):
label: str - a human readable label for the executor, unique
with respect to other executors.

Per-executor monitoring behaviour can be influenced by exposing:

radio_mode: str - a string describing which radio mode should be used to
send task resource data back to the submit side.
remote_monitoring_radio_config: RadioConfig describing how tasks on this executor
should report task resource status

An executor may optionally expose:

Expand All @@ -55,7 +58,6 @@ class ParslExecutor(metaclass=ABCMeta):
"""

label: str = "undefined"
radio_mode: str = "udp"

def __init__(
self,
Expand All @@ -65,9 +67,18 @@ def __init__(
run_id: Optional[str] = None,
):
self.monitoring_messages = monitoring_messages

# these are parameters for the monitoring radio to be used on the remote side
# eg. in workers - to send results back, and they should end up encapsulated
# inside a RadioConfig.
self.remote_monitoring_radio_config: RadioConfig = UDPRadio()

self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id

# will be set externally later, which is pretty ugly
self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None

def __enter__(self) -> Self:
return self

Expand Down Expand Up @@ -100,7 +111,13 @@ def shutdown(self) -> None:

This includes all attached resources such as workers and controllers.
"""
pass
logger.debug("Starting base executor shutdown")
# logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}")
if self.monitoring_receiver is not None:
logger.debug("Starting monitoring receiver shutdown")
self.monitoring_receiver.shutdown()
logger.debug("Done with monitoring receiver shutdown")
logger.debug("Done with base executor shutdown")

def monitor_resources(self) -> bool:
"""Should resource monitoring happen for tasks on running on this executor?
Expand Down
16 changes: 14 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.radios.base import RadioConfig
from parsl.monitoring.radios.htex import HTEXRadio
from parsl.monitoring.radios.zmq_router import ZMQRadioReceiver, start_zmq_receiver
from parsl.process_loggers import wrap_with_logs
from parsl.providers import LocalProvider
Expand Down Expand Up @@ -261,11 +263,13 @@ def __init__(self,
worker_logdir_root: Optional[str] = None,
manager_selector: ManagerSelector = RandomManagerSelector(),
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
remote_monitoring_radio_config: Optional[RadioConfig] = None):

logger.debug("Initializing HighThroughputExecutor")

BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler)

self.label = label
self.worker_debug = worker_debug
self.storage_access = storage_access
Expand Down Expand Up @@ -310,6 +314,12 @@ def __init__(self,
self._workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0

if remote_monitoring_radio_config is not None:
self.remote_monitoring_radio_config = remote_monitoring_radio_config
else:
self.remote_monitoring_radio_config = HTEXRadio()

self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.interchange_proc: Optional[subprocess.Popen] = None
Expand Down Expand Up @@ -339,7 +349,6 @@ def __init__(self,
self.zmq_monitoring = None
self.hub_zmq_port = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"

Expand Down Expand Up @@ -879,6 +888,9 @@ def shutdown(self, timeout: float = 10.0):
if self.zmq_monitoring:
self.zmq_monitoring.close()

# TODO: implement this across all executors
super().shutdown()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
14 changes: 12 additions & 2 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import JobStatus
from parsl.launchers import SimpleLauncher
from parsl.monitoring.radios.base import RadioConfig
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider

Expand Down Expand Up @@ -67,7 +68,8 @@ def __init__(self,
worker_logdir_root: Optional[str] = None,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
remote_monitoring_radio_config: Optional[RadioConfig] = None):
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
Expand All @@ -94,7 +96,15 @@ def __init__(self,
address_probe_timeout=address_probe_timeout,
worker_logdir_root=worker_logdir_root,
block_error_handler=block_error_handler,
encrypted=encrypted
encrypted=encrypted,

# TODO:
# worker-side monitoring in MPI-style code is probably going to be
# broken - resource monitoring won't see any worker processes
# most likely, as so perhaps it should have worker resource
# monitoring disabled like the thread pool executor has?
# (for related but different reasons...)
remote_monitoring_radio_config=remote_monitoring_radio_config
)
self.enable_mpi_mode = True
self.mpi_launcher = mpi_launcher
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
Default is None.
"""

radio_mode = "filesystem"

@typeguard.typechecked
def __init__(self,
label: str = "TaskVineExecutor",
Expand Down Expand Up @@ -601,6 +599,8 @@ def shutdown(self, *args, **kwargs):
self._finished_task_queue.close()
self._finished_task_queue.join_thread()

super().shutdown()

logger.debug("TaskVine shutdown completed")

@wrap_with_logs
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def shutdown(self, block=True):
"""
logger.debug("Shutting down executor, which involves waiting for running tasks to complete")
self.executor.shutdown(wait=block)
super().shutdown()
logger.debug("Done with executor shutdown")

def monitor_resources(self):
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin):
specifiation for each task).
"""

radio_mode = "filesystem"

@typeguard.typechecked
def __init__(self,
label: str = "WorkQueueExecutor",
Expand Down Expand Up @@ -712,6 +710,8 @@ def shutdown(self, *args, **kwargs):
self.collector_queue.close()
self.collector_queue.join_thread()

super().shutdown()

logger.debug("Work Queue shutdown completed")

@wrap_with_logs
Expand Down
Loading
Loading