Skip to content

Commit 4695c00

Browse files
authored
Support custom interchange launch command (#3514)
The command used to launch the HTEX interchange process can now be customized.
1 parent b007d6d commit 4695c00

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
"--mpi-launcher={mpi_launcher} "
5757
"--available-accelerators {accelerators}")
5858

59+
DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py"
60+
5961
GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider`
6062
Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`,
6163
:class:`~parsl.providers.cobalt.cobalt.Cobalt`,
@@ -76,6 +78,10 @@
7678
cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example:
7779
launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}"
7880
81+
interchange_launch_cmd : str
82+
Custom command line string to launch the interchange process from the executor. If undefined,
83+
the executor will use the default "interchange.py" command.
84+
7985
address : string
8086
An address to connect to the main Parsl process which is reachable from the network in which
8187
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx).
@@ -231,6 +237,7 @@ def __init__(self,
231237
label: str = 'HighThroughputExecutor',
232238
provider: ExecutionProvider = LocalProvider(),
233239
launch_cmd: Optional[str] = None,
240+
interchange_launch_cmd: Optional[str] = None,
234241
address: Optional[str] = None,
235242
worker_ports: Optional[Tuple[int, int]] = None,
236243
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
@@ -329,6 +336,10 @@ def __init__(self,
329336
launch_cmd = DEFAULT_LAUNCH_CMD
330337
self.launch_cmd = launch_cmd
331338

339+
if not interchange_launch_cmd:
340+
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
341+
self.interchange_launch_cmd = interchange_launch_cmd
342+
332343
radio_mode = "htex"
333344

334345
def _warn_deprecated(self, old: str, new: str):
@@ -544,7 +555,7 @@ def _start_local_interchange_process(self) -> None:
544555

545556
config_pickle = pickle.dumps(interchange_config)
546557

547-
self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE)
558+
self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE)
548559
stdin = self.interchange_proc.stdin
549560
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"
550561

parsl/executors/high_throughput/mpi_executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def __init__(self,
3838
label: str = 'MPIExecutor',
3939
provider: ExecutionProvider = LocalProvider(),
4040
launch_cmd: Optional[str] = None,
41+
interchange_launch_cmd: Optional[str] = None,
4142
address: Optional[str] = None,
4243
worker_ports: Optional[Tuple[int, int]] = None,
4344
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
@@ -66,6 +67,7 @@ def __init__(self,
6667
label=label,
6768
provider=provider,
6869
launch_cmd=launch_cmd,
70+
interchange_launch_cmd=interchange_launch_cmd,
6971
address=address,
7072
worker_ports=worker_ports,
7173
worker_port_range=worker_port_range,

parsl/tests/test_htex/test_htex.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,16 @@ def test_max_workers_per_node():
136136

137137
# Ensure max_workers_per_node takes precedence
138138
assert htex.max_workers_per_node == htex.max_workers == 1
139+
140+
141+
@pytest.mark.local
142+
def test_htex_launch_cmd():
143+
htex = HighThroughputExecutor()
144+
assert htex.launch_cmd.startswith("process_worker_pool.py")
145+
assert htex.interchange_launch_cmd == "interchange.py"
146+
147+
launch_cmd = "custom-launch-cmd"
148+
ix_launch_cmd = "custom-ix-launch-cmd"
149+
htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd)
150+
assert htex.launch_cmd == launch_cmd
151+
assert htex.interchange_launch_cmd == ix_launch_cmd

0 commit comments

Comments
 (0)