diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ad88702744..4c65f36843 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -56,6 +56,8 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") +DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py" + GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, :class:`~parsl.providers.cobalt.cobalt.Cobalt`, @@ -76,6 +78,10 @@ cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}" + interchange_launch_cmd : str + Custom command line string to launch the interchange process from the executor. If undefined, + the executor will use the default "interchange.py" command. + address : string An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx). @@ -231,6 +237,7 @@ def __init__(self, label: str = 'HighThroughputExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[str] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -329,6 +336,10 @@ def __init__(self, launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd + if not interchange_launch_cmd: + interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD + self.interchange_launch_cmd = interchange_launch_cmd + radio_mode = "htex" def _warn_deprecated(self, old: str, new: str): @@ -544,7 +555,7 @@ def _start_local_interchange_process(self) -> None: config_pickle = pickle.dumps(interchange_config) - self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE) + self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE) stdin = self.interchange_proc.stdin assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index 69071557c8..b8045d38b3 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -38,6 +38,7 @@ def __init__(self, label: str = 'MPIExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[str] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -66,6 +67,7 @@ def __init__(self, label=label, provider=provider, launch_cmd=launch_cmd, + interchange_launch_cmd=interchange_launch_cmd, address=address, worker_ports=worker_ports, worker_port_range=worker_port_range, diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 2227529f82..2d1aafda85 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -136,3 +136,16 @@ def test_max_workers_per_node(): # Ensure max_workers_per_node takes precedence assert htex.max_workers_per_node == htex.max_workers == 1 + + +@pytest.mark.local +def test_htex_launch_cmd(): + htex = HighThroughputExecutor() + assert htex.launch_cmd.startswith("process_worker_pool.py") + assert htex.interchange_launch_cmd == "interchange.py" + + launch_cmd = "custom-launch-cmd" + ix_launch_cmd = "custom-ix-launch-cmd" + htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd) + assert htex.launch_cmd == launch_cmd + assert htex.interchange_launch_cmd == ix_launch_cmd