Skip to content

Commit 71d9c71

Browse files
authored
Don't copy monitoring address/port parameters into the DFK. (#3522)
Prior to this PR, monitoring hub address and ZMQ port were stored as attributes of the DFK. The address also existed as an attribute on dfk.monitoring, and the ZMQ port was returned by dfk.monitoring.start Afte this PR, those values are not added to the DFK, but instead are accessed via dfk.monitoring. These two attributes are now only set on a new executor when monitoring is enabled, rather than always being intialised by the DFK. Default values now come from the executor __init__ method, which is a more usual style in Python for providing default values. See PR #3361 This is part of ongoing work to introduce more pluggable monitoring network connectivity - see PR #3315
1 parent 878889b commit 71d9c71

File tree

3 files changed

+7
-11
lines changed

3 files changed

+7
-11
lines changed

parsl/dataflow/dflow.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None:
113113
self.monitoring: Optional[MonitoringHub]
114114
self.monitoring = config.monitoring
115115

116-
# hub address and port for interchange to connect
117-
self.hub_address = None # type: Optional[str]
118-
self.hub_zmq_port = None # type: Optional[int]
119116
if self.monitoring:
120117
if self.monitoring.logdir is None:
121118
self.monitoring.logdir = self.run_dir
122-
self.hub_address = self.monitoring.hub_address
123-
self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
119+
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
124120

125121
self.time_began = datetime.datetime.now()
126122
self.time_completed: Optional[datetime.datetime] = None
@@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
11811177
for executor in executors:
11821178
executor.run_id = self.run_id
11831179
executor.run_dir = self.run_dir
1184-
executor.hub_address = self.hub_address
1185-
executor.hub_zmq_port = self.hub_zmq_port
11861180
if self.monitoring:
1181+
executor.hub_address = self.monitoring.hub_address
1182+
executor.hub_zmq_port = self.monitoring.hub_zmq_port
11871183
executor.monitoring_radio = self.monitoring.radio
11881184
if hasattr(executor, 'provider'):
11891185
if hasattr(executor.provider, 'script_dir'):

parsl/monitoring/monitoring.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def __init__(self,
105105
self.resource_monitoring_enabled = resource_monitoring_enabled
106106
self.resource_monitoring_interval = resource_monitoring_interval
107107

108-
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int:
108+
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
109109

110110
logger.debug("Starting MonitoringHub")
111111

@@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
207207

208208
logger.info("Monitoring Hub initialized")
209209

210-
return zmq_port
210+
self.hub_zmq_port = zmq_port
211211

212212
# TODO: tighten the Any message format
213213
def send(self, mtype: MessageType, message: Any) -> None:

parsl/tests/test_monitoring/test_fuzz_zmq.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def test_row_counts():
4444
# the latter is what i'm most suspicious of in my present investigation
4545

4646
# dig out the interchange port...
47-
hub_address = parsl.dfk().hub_address
48-
hub_zmq_port = parsl.dfk().hub_zmq_port
47+
hub_address = parsl.dfk().monitoring.hub_address
48+
hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port
4949

5050
# this will send a string to a new socket connection
5151
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

0 commit comments

Comments
 (0)