Skip to content

Commit 067c60f

Browse files
Sort keys and fold resubmit steps by virtue of dflow (#269)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced workflow management with a new approach for handling workflow steps and resubmission keys. - Streamlined error handling to improve workflow integrity during resubmission. - **Bug Fixes** - Improved clarity and efficiency in the workflow management system. - **Tests** - Updated tests to align with new workflow logic, replacing old key folding tests with resubmission key validation. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: zjgemi <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 35e0b97 commit 067c60f

File tree

3 files changed

+135
-239
lines changed

3 files changed

+135
-239
lines changed

dpgen2/entrypoint/submit.py

Lines changed: 40 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -808,136 +808,43 @@ def print_list_steps(
808808
return "\n".join(ret)
809809

810810

811-
def successful_step_keys(wf):
812-
all_step_keys = []
813-
steps = wf.query_step()
814-
# For reused steps whose startedAt are identical, sort them by key
815-
steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
816-
for step in steps:
817-
if step.key is not None and step.phase == "Succeeded":
818-
all_step_keys.append(step.key)
819-
return all_step_keys
820-
821-
822-
def get_superop(key):
823-
if "prep-train" in key:
824-
return key.replace("prep-train", "prep-run-train")
825-
elif "run-train-" in key:
826-
return re.sub("run-train-[0-9]*", "prep-run-train", key)
827-
elif "prep-lmp" in key:
828-
return key.replace("prep-lmp", "prep-run-explore")
829-
elif "run-lmp-" in key:
830-
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
831-
elif "prep-fp" in key:
832-
return key.replace("prep-fp", "prep-run-fp")
833-
elif "run-fp-" in key:
834-
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
835-
elif "prep-caly-input" in key:
836-
return key.replace("prep-caly-input", "prep-run-explore")
837-
elif "collect-run-calypso-" in key:
838-
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
839-
elif "prep-dp-optim-" in key:
840-
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
841-
elif "run-dp-optim-" in key:
842-
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
843-
elif "prep-caly-model-devi" in key:
844-
return key.replace("prep-caly-model-devi", "prep-run-explore")
845-
elif "run-caly-model-devi" in key:
846-
return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key)
847-
elif "caly-evo-step" in key:
848-
return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key)
849-
elif "diffcsp-gen-" in key:
850-
return re.sub("diffcsp-gen-[0-9]*", "prep-run-explore", key)
851-
elif "prep-relax" in key:
852-
return re.sub("prep-relax", "prep-run-explore", key)
853-
elif "run-relax-" in key:
854-
return re.sub("run-relax-[0-9]*", "prep-run-explore", key)
855-
return None
856-
857-
858-
def fold_keys(all_step_keys):
859-
folded_keys = {}
860-
for key in all_step_keys:
861-
is_superop = False
862-
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
863-
if superop in key:
864-
if key not in folded_keys:
865-
folded_keys[key] = []
866-
is_superop = True
867-
break
868-
if is_superop:
869-
continue
870-
superop = get_superop(key)
871-
# if its super OP is succeeded, fold it into its super OP
872-
if superop is not None and superop in all_step_keys:
873-
if superop not in folded_keys:
874-
folded_keys[superop] = []
875-
folded_keys[superop].append(key)
876-
else:
877-
folded_keys[key] = [key]
878-
for k, v in folded_keys.items():
879-
if v == []:
880-
folded_keys[k] = [k]
881-
return folded_keys
882-
883-
884811
def get_resubmit_keys(
885812
wf,
886813
):
887-
all_step_keys = successful_step_keys(wf)
888-
step_keys = [
889-
"prep-run-train",
890-
"prep-train",
891-
"run-train",
892-
"prep-caly-input",
893-
"prep-caly-model-devi",
894-
"run-caly-model-devi",
895-
"prep-run-explore",
896-
"prep-lmp",
897-
"run-lmp",
898-
"diffcsp-gen",
899-
"prep-relax",
900-
"run-relax",
814+
wf_info = wf.query()
815+
all_steps = [
816+
step
817+
for step in wf_info.get_step(sort_by_generation=True)
818+
if step.key is not None
819+
]
820+
super_keys = ["prep-run-train", "prep-run-explore", "prep-run-fp"]
821+
other_keys = [
901822
"select-confs",
902-
"prep-run-fp",
903-
"prep-fp",
904-
"run-fp",
905823
"collect-data",
906824
"scheduler",
907825
"id",
908826
]
909-
if (
910-
len(
911-
matched_step_key(
912-
all_step_keys,
913-
[
914-
"collect-run-calypso",
915-
"prep-dp-optim",
916-
"run-dp-optim",
917-
],
918-
)
919-
)
920-
> 0
921-
):
922-
# calypso default mode
923-
step_keys += [
924-
"collect-run-calypso",
925-
"prep-dp-optim",
926-
"run-dp-optim",
927-
]
928-
else:
929-
# calypso merge mode
930-
step_keys.append("caly-evo-step")
931827

932-
all_step_keys = matched_step_key(
933-
all_step_keys,
934-
step_keys,
935-
)
936-
all_step_keys = sort_slice_ops(
937-
all_step_keys,
938-
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
939-
)
940-
folded_keys = fold_keys(all_step_keys)
828+
folded_keys = {}
829+
for step in all_steps:
830+
if len(matched_step_key([step.key], super_keys)) > 0:
831+
sub_steps = wf_info.get_step(parent_id=step.id, sort_by_generation=True)
832+
sub_keys = [
833+
step.key
834+
for step in sub_steps
835+
if step.key is not None and step.phase == "Succeeded"
836+
]
837+
sub_keys = sort_slice_ops(
838+
sub_keys,
839+
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
840+
)
841+
if step.phase == "Succeeded":
842+
folded_keys[step.key] = sub_keys
843+
else:
844+
for key in sub_keys:
845+
folded_keys[key] = [key]
846+
elif len(matched_step_key([step.key], other_keys)) > 0:
847+
folded_keys[step.key] = [step.key]
941848
return folded_keys
942849

943850

@@ -955,7 +862,12 @@ def resubmit_concurrent_learning(
955862

956863
old_wf = Workflow(id=wfid)
957864
folded_keys = get_resubmit_keys(old_wf)
958-
all_step_keys = sum(folded_keys.values(), [])
865+
all_step_keys = []
866+
super_keys = {}
867+
for super_key, keys in folded_keys.items():
868+
all_step_keys += keys
869+
for key in keys:
870+
super_keys[key] = super_key
959871

960872
if list_steps:
961873
prt_str = print_keys_in_nice_format(
@@ -971,21 +883,16 @@ def resubmit_concurrent_learning(
971883
if fold:
972884
reused_folded_keys = {}
973885
for key in reused_keys:
974-
superop = get_superop(key)
975-
if superop is not None:
976-
if superop not in reused_folded_keys:
977-
reused_folded_keys[superop] = []
978-
reused_folded_keys[superop].append(key)
979-
else:
980-
reused_folded_keys[key] = [key]
886+
super_key = super_keys[key]
887+
if super_key not in reused_folded_keys:
888+
reused_folded_keys[super_key] = []
889+
reused_folded_keys[super_key].append(key)
981890
for k, v in reused_folded_keys.items():
982891
# reuse the super OP iif all steps within it are reused
983-
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
892+
if set(v) == set(folded_keys[k]):
984893
reused_folded_keys[k] = [k]
985894
reused_keys = sum(reused_folded_keys.values(), [])
986-
reuse_step = old_wf.query_step(key=reused_keys)
987-
# For reused steps whose startedAt are identical, sort them by key
988-
reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
895+
reuse_step = old_wf.query_step(key=reused_keys, sort_by_generation=True)
989896

990897
wf = submit_concurrent_learning(
991898
wf_config,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ classifiers = [
1717
dependencies = [
1818
'numpy',
1919
'dpdata>=0.2.20',
20-
'pydflow>=1.8.95',
20+
'pydflow>=1.8.97',
2121
'dargs>=0.3.1',
2222
'scipy',
2323
'lbg',

0 commit comments

Comments
 (0)