Skip to content

Commit f06ce31

Browse files
committed
vertical (test)
1 parent aa2a95b commit f06ce31

File tree

2 files changed

+171
-5
lines changed

2 files changed

+171
-5
lines changed

autosubmit/platforms/wrappers/wrapper_builder.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,50 @@ def build_srun_launcher(self, jobs_list, footer=True):
842842
return srun_launcher
843843

844844

845+
class SrunVerticalWrapperBuilder(SrunWrapperBuilder):
846+
def build_imports(self):
847+
scripts_bash = "("
848+
for script in self.job_scripts:
849+
scripts_bash += f"\"{script}\" "
850+
scripts_bash += ")"
851+
return textwrap.dedent(f"""
852+
# Defining scripts to be run
853+
declare -a scripts_list={scripts_bash}
854+
declare -a job_mask={self.get_mask_general(1)}
855+
""").format('\n'.ljust(13))
856+
857+
def build_srun_launcher(self, jobs_list, footer=True):
858+
srun_launcher = textwrap.dedent(f"""
859+
i=0
860+
suffix=".cmd"
861+
for template in "${{{jobs_list}[@]}}"; do
862+
jobname=${{template%"$suffix"}}
863+
out="${{template}}.out.0"
864+
err="${{template}}.err.0"
865+
srun --ntasks=1 --cpu-bind=verbose,mask_cpu:${{job_mask[0]}} --distribution=block:block $template > $out 2> $err &
866+
((i=i+1))
867+
wait
868+
done
869+
""")
870+
if footer:
871+
srun_launcher += self._indent(textwrap.dedent(f"""
872+
for template in "${{{jobs_list}[@]}}"; do
873+
suffix_completed=".COMPLETED"
874+
completed_filename=${{template%"$suffix"}}
875+
completed_filename="$completed_filename"_COMPLETED
876+
completed_path=${{PWD}}/$completed_filename
877+
if [ -f "$completed_path" ]; then
878+
echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been COMPLETED"
879+
else
880+
echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been FAILED"
881+
touch {self.get_random_alphanumeric_string(5, 5)}_FAILED
882+
touch WRAPPER_FAILED
883+
fi
884+
done
885+
"""), 0)
886+
return srun_launcher
887+
888+
845889
class SrunVerticalHorizontalWrapperBuilder(SrunWrapperBuilder):
846890

847891
def build_imports(self):
@@ -947,3 +991,110 @@ def build_srun_launcher(self, jobs_list, footer=True):
947991
wait
948992
"""
949993
return srun_launcher
994+
995+
996+
class SrunHorizontalVerticalWrapperbuilder(SrunWrapperBuilder):
997+
998+
def build_imports(self):
999+
scripts_bash = textwrap.dedent("""
1000+
# Defining scripts to be run""")
1001+
list_index = 0
1002+
scripts_array_vars = "( "
1003+
scripts_array_index = "( "
1004+
for scripts in self.job_scripts:
1005+
built_array = "("
1006+
for script in scripts:
1007+
built_array += str("\"" + script + "\"") + " "
1008+
built_array += ")"
1009+
scripts_bash += textwrap.dedent("""
1010+
declare -a scripts_{0}={1}
1011+
""").format(str(list_index), str(built_array), '\n'.ljust(13))
1012+
scripts_array_vars += "\"scripts_{0}\" ".format(list_index)
1013+
scripts_array_index += "\"0\" ".format(list_index)
1014+
list_index += 1
1015+
scripts_array_vars += ")"
1016+
scripts_array_index += ")"
1017+
scripts_bash += textwrap.dedent("""
1018+
declare -a scripts_list={0}
1019+
declare -a scripts_index={1}
1020+
""").format(str(scripts_array_vars), str(scripts_array_index), '\n'.ljust(13))
1021+
mask_array = self.get_mask_general(len(self.job_scripts))
1022+
scripts_bash += textwrap.dedent("""
1023+
declare -a job_mask_array={0}
1024+
""").format(mask_array, '\n'.ljust(13))
1025+
return scripts_bash
1026+
1027+
def build_srun_launcher(self, jobs_list, footer=True):
1028+
srun_launcher = f"""
1029+
suffix=".cmd"
1030+
suffix_completed=".COMPLETED"
1031+
aux_scripts=("${{{jobs_list}[@]}}")
1032+
prev_script="empty"
1033+
as_index=0
1034+
horizontal_size=${{#scripts_index[@]}}
1035+
scripts_size=${{#scripts_0[@]}}
1036+
job_failed=0
1037+
while [ "${{#aux_scripts[@]}}" -gt 0 ]; do
1038+
i_list=0
1039+
for script_list in "${{{jobs_list}[@]}}"; do
1040+
declare -i job_index=${{scripts_index[$i_list]}}
1041+
declare -n scripts=$script_list
1042+
1043+
declare -n prev_horizontal_scripts=$prev_script
1044+
if [ $job_index -ne -1 ]; then
1045+
for horizontal_job in "${{scripts[@]:$job_index}}"; do
1046+
template=$horizontal_job
1047+
jobname=${{template%"$suffix"}}
1048+
as_index=0
1049+
multiplication_result=$(($i_list*$scripts_size))
1050+
as_index=$((multiplication_result+$job_index))
1051+
out="${{template}}.out"
1052+
err="${{template}}.err"
1053+
if [ $job_index -eq 0 ]; then
1054+
prev_template=$template
1055+
else
1056+
prev_template=${{scripts[((job_index-1))]}}
1057+
fi
1058+
completed_filename=${{prev_template%"$suffix"}}
1059+
completed_filename="$completed_filename"_COMPLETED
1060+
completed_path=${{PWD}}/$completed_filename
1061+
if [ ! $job_index -eq 0 ] && ( ! lsof "$err" > /dev/null 2>&1 && ! lsof "$out" > /dev/null 2>&1 ) && [ ! -f "$completed_path" ]; then
1062+
job_failed=1
1063+
break
1064+
fi
1065+
if [ $job_index -eq 0 ] || [ -f "$completed_path" ]; then # If first horizontal wrapper or last wrapper is completed
1066+
#srun -N1 --ntasks=1 --cpus-per-task=1 --cpu-bind=verbose,mask_cpu:${{job_mask_array[$job_index]}} --distribution=block:block $template > $out 2> $err &
1067+
#srun --ntasks=1 --cpu-bind=verbose,mask_cpu:$cpu_mask --distribution=block:block $template > $out 2> $err &
1068+
srun --ntasks=1 --cpu-bind=verbose,mask_cpu:${{job_mask_array[$job_index]}} --distribution=block:block $template > $out 2> $err &
1069+
job_index=$(($job_index+1))
1070+
else
1071+
break
1072+
fi
1073+
done
1074+
if [ "$job_failed" ]; then
1075+
break
1076+
fi
1077+
if [ $job_index -ge "${{#scripts[@]}}" ]; then
1078+
unset aux_scripts[$i_list]
1079+
job_index=-1
1080+
fi
1081+
fi
1082+
if [ "$job_failed" ]; then
1083+
break
1084+
fi
1085+
prev_script=("${{script_list[@]}}")
1086+
scripts_index[$i_list]=$job_index
1087+
i_list=$((i_list+1)) # check next list ( needed for save list index )
1088+
done
1089+
if [ "$job_failed" ]; then
1090+
echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has been FAILED"
1091+
touch {self.get_random_alphanumeric_string(5, 5)}_FAILED
1092+
touch WRAPPER_FAILED
1093+
break
1094+
else
1095+
echo "`date '+%d/%m/%Y_%H:%M:%S'` $template has COMPLETE"
1096+
fi
1097+
done
1098+
wait
1099+
"""
1100+
return srun_launcher

autosubmit/platforms/wrappers/wrapper_factory.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
from autosubmit.platforms.wrappers.wrapper_builder import WrapperDirector, PythonVerticalWrapperBuilder, \
2121
PythonHorizontalWrapperBuilder, PythonHorizontalVerticalWrapperBuilder, PythonVerticalHorizontalWrapperBuilder, \
22-
BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder,SrunVerticalHorizontalWrapperBuilder
22+
BashHorizontalWrapperBuilder, BashVerticalWrapperBuilder, SrunHorizontalWrapperBuilder, \
23+
SrunVerticalHorizontalWrapperBuilder, SrunVerticalWrapperBuilder, SrunHorizontalVerticalWrapperbuilder
2324
import re
2425

2526
class WrapperFactory(object):
@@ -148,7 +149,10 @@ def threads_directive(self, threads):
148149
class LocalWrapperFactory(WrapperFactory):
149150

150151
def vertical_wrapper(self, **kwargs):
151-
return PythonVerticalWrapperBuilder(**kwargs)
152+
if kwargs["method"] == "srun":
153+
return SrunVerticalWrapperBuilder(**kwargs)
154+
else:
155+
return PythonVerticalWrapperBuilder(**kwargs)
152156

153157
def horizontal_wrapper(self, **kwargs):
154158

@@ -158,7 +162,10 @@ def horizontal_wrapper(self, **kwargs):
158162
return PythonHorizontalWrapperBuilder(**kwargs)
159163

160164
def hybrid_wrapper_horizontal_vertical(self, **kwargs):
161-
return PythonHorizontalVerticalWrapperBuilder(**kwargs)
165+
if kwargs["method"] == 'srun':
166+
return SrunHorizontalVerticalWrapperbuilder(**kwargs)
167+
else:
168+
return PythonHorizontalVerticalWrapperBuilder(**kwargs)
162169

163170
def hybrid_wrapper_vertical_horizontal(self, **kwargs):
164171
if kwargs["method"] == 'srun':
@@ -250,8 +257,12 @@ def header_directives(self, **kwargs):
250257

251258
class SlurmWrapperFactory(WrapperFactory):
252259

260+
253261
def vertical_wrapper(self, **kwargs):
254-
return PythonVerticalWrapperBuilder(**kwargs)
262+
if kwargs["method"] == "srun":
263+
return SrunVerticalWrapperBuilder(**kwargs)
264+
else:
265+
return PythonVerticalWrapperBuilder(**kwargs)
255266

256267
def horizontal_wrapper(self, **kwargs):
257268

@@ -261,14 +272,18 @@ def horizontal_wrapper(self, **kwargs):
261272
return PythonHorizontalWrapperBuilder(**kwargs)
262273

263274
def hybrid_wrapper_horizontal_vertical(self, **kwargs):
264-
return PythonHorizontalVerticalWrapperBuilder(**kwargs)
275+
if kwargs["method"] == 'srun':
276+
return SrunHorizontalVerticalWrapperbuilder(**kwargs)
277+
else:
278+
return PythonHorizontalVerticalWrapperBuilder(**kwargs)
265279

266280
def hybrid_wrapper_vertical_horizontal(self, **kwargs):
267281
if kwargs["method"] == 'srun':
268282
return SrunVerticalHorizontalWrapperBuilder(**kwargs)
269283
else:
270284
return PythonVerticalHorizontalWrapperBuilder(**kwargs)
271285

286+
272287
def header_directives(self, **kwargs):
273288
return self.platform.wrapper_header(**kwargs)
274289

0 commit comments

Comments
 (0)