From bd70e5f810e954b8a01ffaca1f46615b32adaa0f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 17 Jul 2024 18:36:07 +0000 Subject: [PATCH] to remove this msg[1]['run_id'] = self.run_id from the monitoring router, the interchange should fully construct these messages - c.f. how the task wrapper knows the run id so add run id as an interchange parameter and populate there ... which means htex needs to know about run_ids ... which is fine as it knows about monitoring generally, in order to send the messages into monitoring... this is tested by test_row_counts which fails without the run_id populated due to: > sqlite3.IntegrityError: NOT NULL constraint failed: node.run_id in the database manager. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/high_throughput/executor.py | 1 + parsl/executors/high_throughput/interchange.py | 4 ++++ parsl/monitoring/monitoring.py | 3 +-- parsl/monitoring/router.py | 7 +------ parsl/tests/test_htex/test_zmq_binding.py | 3 ++- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 88ef063230..344173c4b1 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None: if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6c181cdee7..1a56195c07 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None: "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 54607f42d9..a080a84956 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -55,6 +55,7 @@ def __init__(self, poll_period: int, cert_dir: Optional[str], manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -125,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id monitoring_radio.send((MessageType.NODE_INFO, d)) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index b21ba8953b..e1de80116c 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -106,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -154,7 +154,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9b6b4fd618..ca535d24b9 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -32,7 +32,6 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds resource_msgs: "Queue[AddressedMonitoringMessage]", @@ -69,7 +68,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -171,7 +169,6 @@ def start_zmq_listener(self) -> None: msg_0 = (msg, 0) if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id self.resource_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO or msg[0] == MessageType.BLOCK_INFO: self.resource_msgs.put(msg_0) @@ -214,8 +211,7 @@ def router_starter(*, zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -223,7 +219,6 @@ def router_starter(*, zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id, resource_msgs=resource_msgs, exit_event=exit_event) except Exception as e: diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 2273443b99..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s logdir=".", logging_level=logging.INFO, manager_selector=RandomManagerSelector(), - poll_period=10) + poll_period=10, + run_id="test_run_id") @pytest.fixture