Skip to content

Commit b007d6d

Browse files
authored
Fix a bug and add support for N masters, N workers with load balancing in RadicalPilotExecutor (#3060)
* Fix issue Radical executor hangs rather than reports error if MPI is not installed #3070, which is not terminating due to internal component failure. * Add support to N masters and N workers with task load balancing across the masters.
1 parent 2828ac5 commit b007d6d

File tree

3 files changed

+119
-113
lines changed

3 files changed

+119
-113
lines changed

parsl/executors/radical/executor.py

+105-65
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
from concurrent.futures import Future
1111
from functools import partial
12-
from pathlib import Path, PosixPath
12+
from pathlib import PosixPath
1313
from typing import Dict, Optional
1414

1515
import requests
@@ -24,7 +24,7 @@
2424
from parsl.serialize.errors import DeserializationError, SerializationError
2525
from parsl.utils import RepresentationMixin
2626

27-
from .rpex_resources import ResourceConfig
27+
from .rpex_resources import CLIENT, MPI, ResourceConfig
2828

2929
try:
3030
import radical.pilot as rp
@@ -59,7 +59,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
5959
``rp.PilotManager`` and ``rp.TaskManager``.
6060
2. "translate": Unwrap, identify, and parse Parsl ``apps`` into ``rp.TaskDescription``.
6161
3. "submit": Submit Parsl apps to ``rp.TaskManager``.
62-
4. "shut_down": Shut down the RADICAL-Pilot runtime and all associated components.
62+
4. "shutdown": Shut down the RADICAL-Pilot runtime and all associated components.
6363
6464
Here is a diagram
6565
@@ -138,19 +138,26 @@ def __init__(self,
138138
self.future_tasks: Dict[str, Future] = {}
139139

140140
if rpex_cfg:
141-
self.rpex_cfg = rpex_cfg
141+
self.rpex_cfg = rpex_cfg.get_config()
142142
elif not rpex_cfg and 'local' in resource:
143-
self.rpex_cfg = ResourceConfig()
143+
self.rpex_cfg = ResourceConfig().get_config()
144144
else:
145-
raise ValueError('Resource config file must be '
146-
'specified for a non-local execution')
145+
raise ValueError('Resource config must be '
146+
'specified for a non-local resources')
147147

148148
def task_state_cb(self, task, state):
149149
"""
150150
Update the state of Parsl Future apps
151151
Based on RP task state callbacks.
152152
"""
153-
if not task.uid.startswith('master'):
153+
# check the Master/Worker state
154+
if task.mode in [rp.RAPTOR_MASTER, rp.RAPTOR_WORKER]:
155+
if state == rp.FAILED:
156+
exception = RuntimeError(f'{task.uid} failed with internal error: {task.stderr}')
157+
self._fail_all_tasks(exception)
158+
159+
# check all other tasks state
160+
else:
154161
parsl_task = self.future_tasks[task.uid]
155162

156163
if state == rp.DONE:
@@ -186,6 +193,23 @@ def task_state_cb(self, task, state):
186193
else:
187194
parsl_task.set_exception('Task failed for an unknown reason')
188195

196+
def _fail_all_tasks(self, exception):
197+
"""
198+
Fail all outstanding tasks with the given exception.
199+
200+
This method iterates through all outstanding tasks in the
201+
`_future_tasks` dictionary, which have not yet completed,
202+
and sets the provided exception as their result, indicating
203+
a failure.
204+
205+
Parameters:
206+
- exception: The exception to be set as the result for all
207+
outstanding tasks.
208+
"""
209+
for fut_task in self.future_tasks.values():
210+
if not fut_task.done():
211+
fut_task.set_exception(exception)
212+
189213
def start(self):
190214
"""Create the Pilot component and pass it.
191215
"""
@@ -202,63 +226,62 @@ def start(self):
202226
'resource': self.resource}
203227

204228
if not self.resource or 'local' in self.resource:
205-
# move the agent sandbox to the working dir mainly
206-
# for debugging purposes. This will allow parsl
207-
# to include the agent sandbox with the ci artifacts.
208-
if os.environ.get("LOCAL_SANDBOX"):
209-
pd_init['sandbox'] = self.run_dir
210-
os.environ["RADICAL_LOG_LVL"] = "DEBUG"
211-
212-
logger.info("RPEX will be running in the local mode")
229+
os.environ["RADICAL_LOG_LVL"] = "DEBUG"
230+
logger.info("RPEX will be running in local mode")
213231

214232
pd = rp.PilotDescription(pd_init)
215233
pd.verify()
216234

217-
self.rpex_cfg = self.rpex_cfg._get_cfg_file(path=self.run_dir)
218-
cfg = ru.Config(cfg=ru.read_json(self.rpex_cfg))
235+
# start RP's main components TMGR, PMGR and Pilot
236+
self.tmgr = rp.TaskManager(session=self.session)
237+
self.pmgr = rp.PilotManager(session=self.session)
238+
self.pilot = self.pmgr.submit_pilots(pd)
219239

220-
self.master = cfg.master_descr
221-
self.n_masters = cfg.n_masters
240+
if not self.pilot.description.get('cores') or not self.pilot.description.get('nodes'):
241+
logger.warning('no "cores/nodes" per pilot were set, using default resources')
242+
243+
self.tmgr.add_pilots(self.pilot)
244+
self.tmgr.register_callback(self.task_state_cb)
222245

223-
tds = list()
224-
master_path = '{0}/rpex_master.py'.format(PWD)
225246
worker_path = '{0}/rpex_worker.py'.format(PWD)
226247

227-
for i in range(self.n_masters):
228-
td = rp.TaskDescription(self.master)
229-
td.mode = rp.RAPTOR_MASTER
230-
td.uid = ru.generate_id('master.%(item_counter)06d', ru.ID_CUSTOM,
248+
self.masters = []
249+
250+
logger.info(f'Starting {self.rpex_cfg.n_masters} masters and {self.rpex_cfg.n_workers} workers for each master')
251+
252+
# create N masters
253+
for _ in range(self.rpex_cfg.n_masters):
254+
md = rp.TaskDescription(self.rpex_cfg.master_descr)
255+
md.uid = ru.generate_id('rpex.master.%(item_counter)06d', ru.ID_CUSTOM,
231256
ns=self.session.uid)
232-
td.ranks = 1
233-
td.cores_per_rank = 1
234-
td.arguments = [self.rpex_cfg, i]
235-
td.input_staging = self._stage_files([File(master_path),
236-
File(worker_path),
237-
File(self.rpex_cfg)], mode='in')
238-
tds.append(td)
239257

240-
self.pmgr = rp.PilotManager(session=self.session)
241-
self.tmgr = rp.TaskManager(session=self.session)
258+
# submit the master to the TMGR
259+
master = self.tmgr.submit_raptors(md)[0]
260+
self.masters.append(master)
242261

243-
# submit pilot(s)
244-
pilot = self.pmgr.submit_pilots(pd)
245-
if not pilot.description.get('cores'):
246-
logger.warning('no "cores" per pilot was set, using default resources {0}'.format(pilot.resources))
262+
workers = []
263+
# create N workers for each master and submit them to the TMGR
264+
for _ in range(self.rpex_cfg.n_workers):
265+
wd = rp.TaskDescription(self.rpex_cfg.worker_descr)
266+
wd.uid = ru.generate_id('rpex.worker.%(item_counter)06d', ru.ID_CUSTOM,
267+
ns=self.session.uid)
268+
wd.raptor_id = master.uid
269+
wd.input_staging = self._stage_files([File(worker_path)], mode='in')
270+
workers.append(wd)
247271

248-
self.tmgr.submit_tasks(tds)
272+
self.tmgr.submit_workers(workers)
273+
274+
self.select_master = self._cyclic_master_selector()
249275

250276
# prepare or use the current env for the agent/pilot side environment
251-
if cfg.pilot_env_mode != 'client':
252-
logger.info("creating {0} environment for the executor".format(cfg.pilot_env.name))
253-
pilot.prepare_env(env_name=cfg.pilot_env.name,
254-
env_spec=cfg.pilot_env.as_dict())
277+
if self.rpex_cfg.pilot_env_mode != CLIENT:
278+
logger.info("creating {0} environment for the executor".format(self.rpex_cfg.pilot_env.name))
279+
self.pilot.prepare_env(env_name=self.rpex_cfg.pilot_env.name,
280+
env_spec=self.rpex_cfg.pilot_env.as_dict())
255281
else:
256282
client_env = sys.prefix
257283
logger.info("reusing ({0}) environment for the executor".format(client_env))
258284

259-
self.tmgr.add_pilots(pilot)
260-
self.tmgr.register_callback(self.task_state_cb)
261-
262285
# create a bulking thread to run the actual task submission
263286
# to RP in bulks
264287
if self.bulk_mode:
@@ -272,8 +295,21 @@ def start(self):
272295
self._bulk_thread.daemon = True
273296
self._bulk_thread.start()
274297

298+
logger.info('bulk mode is on, submitting tasks in bulks')
299+
275300
return True
276301

302+
def _cyclic_master_selector(self):
303+
"""
304+
Balance tasks submission across N masters and N workers
305+
"""
306+
current_master = 0
307+
masters_uids = [m.uid for m in self.masters]
308+
309+
while True:
310+
yield masters_uids[current_master]
311+
current_master = (current_master + 1) % len(self.masters)
312+
277313
def unwrap(self, func, args):
278314
"""
279315
Unwrap a Parsl app and its args for further processing.
@@ -364,22 +400,25 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs):
364400

365401
# This is the default mode where the bash_app will be executed as
366402
# as a single core process by RP. For cores > 1 the user must use
367-
# above or use MPI functions if their code is Python.
403+
# task.mode=rp.TASK_EXECUTABLE (above) or use MPI functions if their
404+
# code is Python.
368405
else:
369406
task.mode = rp.TASK_PROC
370-
task.raptor_id = 'master.%06d' % (tid % self.n_masters)
407+
task.raptor_id = next(self.select_master)
371408
task.executable = self._pack_and_apply_message(func, args, kwargs)
372409

373410
elif PYTHON in task_type or not task_type:
374411
task.mode = rp.TASK_FUNCTION
375-
task.raptor_id = 'master.%06d' % (tid % self.n_masters)
412+
task.raptor_id = next(self.select_master)
376413
if kwargs.get('walltime'):
377414
func = timeout(func, kwargs['walltime'])
378415

379-
# we process MPI function differently
380-
if 'comm' in kwargs:
416+
# Check how to serialize the function object
417+
if MPI in self.rpex_cfg.worker_type.lower():
418+
task.use_mpi = True
381419
task.function = rp.PythonTask(func, *args, **kwargs)
382420
else:
421+
task.use_mpi = False
383422
task.function = self._pack_and_apply_message(func, args, kwargs)
384423

385424
task.input_staging = self._stage_files(kwargs.get("inputs", []),
@@ -394,7 +433,7 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs):
394433
try:
395434
task.verify()
396435
except ru.typeddict.TDKeyError as e:
397-
raise Exception(f'{e}. Please check Radical.Pilot TaskDescription documentation')
436+
raise Exception(f'{e}. Please check: https://radicalpilot.readthedocs.io/en/stable/ documentation')
398437

399438
return task
400439

@@ -413,14 +452,20 @@ def _pack_and_apply_message(self, func, args, kwargs):
413452

414453
def _unpack_and_set_parsl_exception(self, parsl_task, exception):
415454
try:
416-
s = rp.utils.deserialize_bson(exception)
455+
try:
456+
s = rp.utils.deserialize_bson(exception)
457+
except Exception:
458+
s = exception
459+
417460
if isinstance(s, RemoteExceptionWrapper):
418461
try:
419462
s.reraise()
420463
except Exception as e:
421464
parsl_task.set_exception(e)
422465
elif isinstance(s, Exception):
423466
parsl_task.set_exception(s)
467+
elif isinstance(s, str):
468+
parsl_task.set_exception(eval(s))
424469
else:
425470
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
426471
except Exception as e:
@@ -440,16 +485,10 @@ def _set_stdout_stderr(self, task, kwargs):
440485
elif isinstance(k_val, PosixPath):
441486
k_val = k_val.__str__()
442487

443-
# if the stderr/out has no path
444-
# then we consider it local and
445-
# we just set the path to the cwd
446-
if '/' not in k_val:
447-
k_val = CWD + '/' + k_val
448-
449-
# finally set the stderr/out to
450-
# the desired name by the user
488+
# set the stderr/out to the desired
489+
# name by the user
451490
setattr(task, k, k_val)
452-
task.sandbox = Path(k_val).parent.__str__()
491+
task.sandbox = CWD
453492

454493
def _stage_files(self, files, mode):
455494
"""
@@ -477,7 +516,7 @@ def _stage_files(self, files, mode):
477516
# this indicates that the user
478517
# did not provided a specific
479518
# output file and RP will stage out
480-
# the task.output from pilot://task_folder
519+
# the task.stdout from pilot://task_folder
481520
# to the CWD or file.url
482521
if '/' not in file.url:
483522
f = {'source': file.filename,
@@ -548,7 +587,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
548587

549588
def shutdown(self, hub=True, targets='all', block=False):
550589
"""Shutdown the executor, including all RADICAL-Pilot components."""
551-
logger.info("RadicalPilotExecutor shutdown")
590+
logger.info("RadicalPilotExecutor is terminating...")
552591
self.session.close(download=True)
592+
logger.info("RadicalPilotExecutor is terminated.")
553593

554594
return True

parsl/executors/radical/rpex_master.py

-41
This file was deleted.

0 commit comments

Comments
 (0)