Skip to content

Latest commit



427 lines (349 loc) · 18 KB

File metadata and controls

427 lines (349 loc) · 18 KB


DPGEN2 allows developers to contribute exploration strategies. The exploration strategy defines how the configuration space is explored by molecular simulations in each DPGEN iteration. Notice that we are not restricted to molecular dynamics, any molecular simulation is, in priciple, allowed. For example, Monte Carlo, enhanced sampling, structure optimization, and so on.

An exploration strategy takes the history of exploration as input, and gives back DPGEN the exploration tasks (we call it task group) and the rule to select configurations from the trajectories generated by the tasks (we call it configuration selector).

One can contribute from three aspects:

Stage scheduler

The stage scheduler takes an exploration report passed from the exploration scheduler as input, and tells the exploration scheduler if the exploration in the stage is converged, if not, returns a group of exploration tasks and a configuration selector that are used in the next DPGEN iteration.

Detailed explanation of the concepts are found here.

All the stage schedulers are derived from the abstract base class StageScheduler. The only interface to be implemented is StageScheduler.plan_next_iteration. One may check the doc string for the explanation of the interface.

class StageScheduler(ABC):
    The scheduler for an exploration stage.

    def plan_next_iteration(
            hist_reports : List[ExplorationReport],
            report : ExplorationReport,
            confs : List[Path],
    ) -> Tuple[bool, ExplorationTaskGroup, ConfSelector] :
        Make the plan for the next iteration of the stage.

        It checks the report of the current and all historical iterations of the stage,
        and tells if the iterations are converged.
        If not converged, it will plan the next ieration for the stage.

        hist_reports: List[ExplorationReport]
            The historical exploration report of the stage. If this is the first iteration of the stage, this list is empty.
        report : ExplorationReport
            The exploration report of this iteration.
        confs: List[Path]
            A list of configurations generated during the exploration. May be used to generate new configurations for the next iteration.

        converged: bool
            If the stage converged.
        task: ExplorationTaskGroup
            A `ExplorationTaskGroup` defining the exploration of the next iteration. Should be `None` if the stage is converged.
        conf_selector: ConfSelector
            The configuration selector for the next iteration. Should be `None` if the stage is converged.


One may check more details on the exploratin task group and the configuration selector.

Exploration task groups

DPGEN2 defines a python class ExplorationTask to manage all necessry files needed to run a exploration task. It can be used as the example provided in the doc string.

class ExplorationTask():
    """Define the files needed by an exploration task.

    >>> # this example dumps all files needed by the task.
    >>> files = exploration_task.files()
    ... for file_name, file_content in files.items():
    ...     with open(file_name, 'w') as fp:
    ...         fp.write(file_content)


A collection of the exploration tasks is called exploration task group. All tasks groups are derived from the base class ExplorationTaskGroup. The exploration task group can be viewd as a list of ExplorationTasks, one may get the list by using property ExplorationTaskGroup.task_list. One may add tasks, or ExplorationTaskGroup to the group by methods ExplorationTaskGroup.add_task and ExplorationTaskGroup.add_group, respectively.

class ExplorationTaskGroup(Sequence):
    def task_list(self) -> List[ExplorationTask]:
        """Get the `list` of `ExplorationTask`"""

    def add_task(self, task: ExplorationTask):
        """Add one task to the group."""

    def add_group(
            group : 'ExplorationTaskGroup',
        """Add another group to the group."""

An example of generating a group of NPT MD simulations may illustrate how to implement the ExplorationTaskGroups.

Configuration selector

The configuration selectors are derived from the abstract base class ConfSelector

class ConfSelector(ABC):
    """Select configurations from trajectory and model deviation files.
    def select (
            trajs : List[Path],
            model_devis : List[Path],
            traj_fmt : str = 'deepmd/npy',
            type_map : List[str] = None,
    ) -> Tuple[List[ Path ], ExplorationReport]:

The abstractmethod to implement is trajs and model_devis are lists of files that recording the simulations trajectories and model deviations respectively. traj_fmt and type_map are parameters that may be needed for loading the trajectories by dpdata.

The returns a Path, each of which can be treated as a dpdata.MultiSystems, and a ExplorationReport.

An example of selecting configurations from LAMMPS trajectories may illustrate how to implement the ConfSelectors.

Implement a new exploration engine for dpgen2

1. Define a new exploration task group

First, we define a class XXXTaskGroup derived from ExplorationTaskGroup which implements the abstract method make_task. NPTTaskGroup, LmpTemplateTaskGroup and CalyTaskGroup can be referred as examples. XXXTaskGroup class records all combinations of parameters required for the exploration tasks. The make_task method converts each combination of parameters to a ExplorationTask object including input configuration file directly used in the exploration, and adds the ExplorationTask objects to the task group.

class XXXTaskGroup(ExplorationTaskGroup):
    def __init__(self, **kwargs):
        # set parameters

    def make_task(self) -> "XXXTaskGroup":
        Make the XXX task group.

        task_grp: XXXTaskGroup
            Return a XXX task group.
        # clear all existing tasks
        # loop over all settings to be used
        for params in params_list:
            task = ExplorationTask()
            file_name, file_content = _make_task_files(params)
            task.add_file(file_name, file_content)
        return self

The arguments that initialize a XXXTaskGroup are defined in the input files of json format. dpgen2 uses the dargs package to validate and normalize the arguments and to document them.

Then we define the exploration arguments schema in the dpgen2 input configuration. Add a function xxx_task_group_args specifying arguments related to the tasks explore.stages[*][*] by virtue of dargs (documents), a function xxx_normalize normalizing input values, and a function make_xxx_task_group_from_config converting input configuration to a XXXTaskGroup object in dpgen2/exploration/task/

def xxx_task_group_args():
    doc_xxx_task_grp = "XXX exploration tasks. dpgen2 will generate the XXX input script"
    return Argument(
            # Argument(...),

def xxx_normalize(config):
    args = xxx_task_group_args()
    config = args.normalize_value(config, trim_pattern="_*")
    args.check_value(config, strict=False)
    return config

def make_xxx_task_group_from_config(config):
    config = xxx_normalize(config)
    tgroup = XXXTaskGroup(**config)
    return tgroup

Add a new explore variant specifying other arguments in dpgen2/entrypoint/ which corresponds to explore in dpgen2 input configuration.

def xxx_args():
    return [
        # Argument(...),

def variant_explore():
    # ...
    doc_xxx = "The exploration by XXX"
    return Variant(
            # ...
            Argument("xxx", dict, xxx_args(), doc=doc_xxx),

2. Program a super OP for preparing and running exploration

We then implement an OP (usually a super OP) including the main logic of preparing and running the new exploration engine. PrepRunLmp and PrepRunCalypso can be referred as examples. A PrepRunExploration OP takes block_id, expl_task_grp, exploration_config, type_map and models as inputs, and gives trajs and model_devis as outputs. Write the super OP in the framework of dflow (documents).

class PrepRunXXX(Steps):
    def __init__(
        name: str,
        upload_python_packages: Optional[List[os.PathLike]] = None,
        self._input_parameters = {
            "block_id": InputParameter(type=str, value=""),
            "expl_task_grp": InputParameter(),
            "explore_config": InputParameter(),
            "type_map": InputParameter(),
        self._input_artifacts = {
            "models": InputArtifact(),
        self._output_parameters = {
        self._output_artifacts = {
            "trajs": OutputArtifact(),
            "model_devis": OutputArtifact(),
        self._keys = ...
        # call private function that implements the Steps
        self = _prep_run_xxx(

    def input_parameters(self):
        return self._input_parameters

    def input_artifacts(self):
        return self._input_artifacts

    def output_parameters(self):
        return self._output_parameters

    def output_artifacts(self):
        return self._output_artifacts

    def keys(self):
        return self._keys

def _prep_run_xxx(
    prep_run_xxx_steps: Steps,
    upload_python_packages: Optional[List[os.PathLike]] = None,
    """The Steps is implemented by this function.
    block_id = prep_run_xxx_steps.inputs.parameters["block_id"]
    expl_task_grp = prep_run_xxx_steps.inputs.parameters["expl_task_grp"]
    expl_config = prep_run_xxx_steps.inputs.parameters["explore_config"]
    type_map = prep_run_xxx_steps.inputs.parameters["type_map"]
    models = prep_run_xxx_steps.inputs.artifacts["models"]
    # construct the Steps OP
    some_step = Step(
        # ...
    # specify the source of outputs
    prep_run_xxx_steps.outputs.artifacts["trajs"]._from = # ...
    prep_run_xxx_steps.outputs.artifacts["model_devis"]._from = # ...
    return prep_run_xxx_steps

3. Integrate the new exploration engine into dpgen2 workflow

Specify new OP as the prep_run_explore_op in the function make_concurrent_learning_op in dpgen2/entrypoint/

def make_concurrent_learning_op(
    # ...
    # ...
    if explore_style == "lmp":
        # ...
    elif "calypso" in explore_style:
        # ...
    elif explore_style == "xxx":
        prep_run_explore_op = PrepRunXXX(
        raise RuntimeError(f"unknown explore_style {explore_style}")

Define a function make_xxx_naive_exploration_scheduler to make exploration scheduler in specific branch. If the format of the traj file and the model_devi file outputed by PrepRunXXX is identical to LAMMPS's, we can reuse the trajectory render of LAMMPS.

def make_naive_exploration_scheduler(
    explore_style = config["explore"]["type"]

    if explore_style == "lmp":
        return make_lmp_naive_exploration_scheduler(config)
    elif "calypso" in explore_style:
        return make_calypso_naive_exploration_scheduler(config)
    elif explore_style == "xxx":
        return make_xxx_naive_exploration_scheduler(config)
        raise KeyError(
            f"Unknown key `{explore_style}`, Only support `lmp`, `calypso`, `calypso:merge` and `calypso:default`."

def make_xxx_naive_exploration_scheduler(config):
    model_devi_jobs = config["explore"]["stages"]
    fp_task_max = config["fp"]["task_max"]
    max_numb_iter = config["explore"]["max_numb_iter"]
    fatal_at_max = config["explore"]["fatal_at_max"]
    convergence = config["explore"]["convergence"]
    output_nopbc = config["explore"]["output_nopbc"]
    scheduler = ExplorationScheduler()
    # report
    conv_style = convergence.pop("type")
    report = conv_styles[conv_style](**convergence)
    # trajectory render, the format of the output trajs are assumed to be lammps/dump
    render = TrajRenderLammps(nopbc=output_nopbc)
    # selector
    selector = ConfSelectorFrames(

    for job_ in model_devi_jobs:
        if not isinstance(job_, list):
            job = [job_]
            job = job_
        # stage
        stage = ExplorationStage()
        for jj in job:
            jconf = xxx_normalize(jj)
            # make task group
            tgroup = make_xxx_task_group_from_config(jconf)
            # add the list to task group
            tasks = tgroup.make_task()
        # stage_scheduler
        stage_scheduler = ConvergenceCheckStageScheduler(
        # scheduler

    return scheduler

Add all keys of related OPs to the list to show in dpgen2 showkey command. For example, if the prep-run-xxx super OP consists of OPs prep-xxx, run-xxx-0000, run-xxx-0001, etc, then you can add some code as follow in file dpgen2/entrypoint/

def get_resubmit_keys(
    all_step_keys = successful_step_keys(wf)
    step_keys = [
        # ...
    # ...

def get_superop(key):
    # ...
    elif "prep-xxx" in key:
        return key.replace("prep-xxx", "prep-run-explore")
    elif "run-xxx-" in key:
        return re.sub("run-xxx-[0-9]*", "prep-run-explore", key)
    return None

4. Add unit tests (UTs) and run them in debug mode

Write UT for each single OP. tests/op/, tests/op/ can be referred as examples.

Write UT for make_xxx_task_group_from_config in tests/exploration/

Write UT for super OP PrepRunXXX, i.e. construct and submit a workflow consisting of PrepRunXXX, and test the outputs of the workflow. tests/ and tests/ can be referred as examples.

Run new UTs in debug mode (set environmental variable DFLOW_DEBUG=1).

5. Run example workflows, add example configs and PR!

Submit dpgen2 workflows to or your own workflow server. If it works as expected, congratulations! Add an example input configuration to examples and add the example config to the automatic config test in tests/ Finally, submit a pull request (PR) and thank you for your contribution to the DeepModeling Community.