Source code for rid.superop.blocks

from typing import Dict, List, Optional, Union
from copy import deepcopy
import numpy as np
from dflow import (
    InputParameter,
    OutputParameter,
    Inputs,
    InputArtifact,
    Outputs,
    OutputArtifact,
    Workflow,
    Step,
    Steps,
    upload_artifact,
    argo_range,
    argo_len,
    argo_sequence,
)
from dflow.python import(
    PythonOPTemplate,
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    Slices,
)
from rid.utils import init_executor


[docs]class InitBlock(Steps): def __init__( self, name: str, exploration_op: OP, select_op: OP, label_op: OP, data_op: OP, train_op: OP, train_config: Dict, upload_python_package = None ): self._input_parameters = { "block_tag" : InputParameter(type=str, value=""), "walker_tags": InputParameter(type=List), "model_tags": InputParameter(type=List), "trust_lvl_1" : InputParameter(type=List[float], value=2.0), "trust_lvl_2": InputParameter(type=List[float], value=3.0), "exploration_config" : InputParameter(type=Dict), "cv_config" : InputParameter(type=Dict), "cluster_threshold": InputParameter(type=float, value=1.0), "angular_mask": InputParameter(type=Optional[Union[np.ndarray, List]]), "weights": InputParameter(type=Optional[Union[np.ndarray, List]]), "numb_cluster_upper": InputParameter(type=float), "numb_cluster_lower": InputParameter(type=float), "max_selection": InputParameter(type=int), "dt": InputParameter(type=float, value=0.02), "output_freq": InputParameter(type=float, value=2500), "slice_mode": InputParameter(type=str, value="gmx"), "label_config": InputParameter(type=Dict), "kappas": InputParameter(type=List[float]), "tail": InputParameter(type=float, value=0.9), "train_config": InputParameter(type=Dict) } self._input_artifacts = { "models" : InputArtifact(optional=True), "forcefield" : InputArtifact(optional=True), "topology" : InputArtifact(optional=True), "inputfile": InputArtifact(optional=True), "confs" : InputArtifact(), "index_file": InputArtifact(optional=True), "dp_files": InputArtifact(optional=True), "cv_file": InputArtifact(optional=True) } self._output_parameters = { "cluster_threshold": OutputParameter(type=int) } self._output_artifacts = { "exploration_md_log": OutputArtifact(), "exploration_trajectory": OutputArtifact(), "selection_index": OutputArtifact(), "models": OutputArtifact(), "data": OutputArtifact(), "conf_outs": OutputArtifact() } super().__init__( name=name, inputs=Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts ), ) self = _first_run_block( self, exploration_op, select_op, label_op, data_op, train_op, train_config, upload_python_package = upload_python_package, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
def _first_run_block( block_steps, exploration_op: OP, select_op: OP, label_op: OP, data_op: OP, train_op: OP, train_config : Dict, upload_python_package : str = None, ): exploration = Step( "Exploration", template=exploration_op, parameters={ "trust_lvl_1" : block_steps.inputs.parameters['trust_lvl_1'], "trust_lvl_2": block_steps.inputs.parameters['trust_lvl_2'], "exploration_config" : block_steps.inputs.parameters['exploration_config'], "cv_config" : block_steps.inputs.parameters['cv_config'], "task_names" : block_steps.inputs.parameters['walker_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'] }, artifacts={ "models" : block_steps.inputs.artifacts['models'], "forcefield" : block_steps.inputs.artifacts['forcefield'], "topology" : block_steps.inputs.artifacts['topology'], "inputfile": block_steps.inputs.artifacts['inputfile'], "confs" : block_steps.inputs.artifacts['confs'], "index_file": block_steps.inputs.artifacts['index_file'], "dp_files": block_steps.inputs.artifacts['dp_files'], "cv_file": block_steps.inputs.artifacts['cv_file'] }, key = '{}-exploration'.format(block_steps.inputs.parameters['block_tag']) ) block_steps.add(exploration) selection = Step( "Selection", template=select_op, parameters={ "trust_lvl_1" : block_steps.inputs.parameters["trust_lvl_1"], "trust_lvl_2": block_steps.inputs.parameters["trust_lvl_2"], "cluster_threshold": block_steps.inputs.parameters["cluster_threshold"], "angular_mask": block_steps.inputs.parameters["angular_mask"], "weights": block_steps.inputs.parameters["weights"], "numb_cluster_upper": block_steps.inputs.parameters["numb_cluster_upper"], "numb_cluster_lower": block_steps.inputs.parameters["numb_cluster_lower"], "max_selection": block_steps.inputs.parameters["max_selection"], "dt": block_steps.inputs.parameters["dt"], "output_freq": block_steps.inputs.parameters["output_freq"], "slice_mode": block_steps.inputs.parameters["slice_mode"], "if_make_threshold": True, "task_names" : block_steps.inputs.parameters['walker_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'], }, artifacts={ "models" : block_steps.inputs.artifacts["models"], "plm_out": exploration.outputs.artifacts["plm_out"], "xtc_traj": exploration.outputs.artifacts["trajectory"], "topology": block_steps.inputs.artifacts["confs"] }, key = '{}-selection'.format(block_steps.inputs.parameters['block_tag']), ) block_steps.add(selection) label = Step( "Label", template=label_op, parameters={ "angular_mask": block_steps.inputs.parameters['angular_mask'], "label_config": block_steps.inputs.parameters['label_config'], "cv_config": block_steps.inputs.parameters['cv_config'], "kappas": block_steps.inputs.parameters['kappas'], "tail": block_steps.inputs.parameters['tail'], "conf_tags" : selection.outputs.parameters['selected_conf_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'], }, artifacts={ "topology": block_steps.inputs.artifacts["topology"], "forcefield" : block_steps.inputs.artifacts['forcefield'], "confs": selection.outputs.artifacts["selected_confs"], "at": selection.outputs.artifacts["selected_cv_init"], "index_file": block_steps.inputs.artifacts['index_file'], "inputfile": block_steps.inputs.artifacts['inputfile'], "dp_files": block_steps.inputs.artifacts['dp_files'], "cv_file": block_steps.inputs.artifacts['cv_file'] }, key = '{}-label'.format(block_steps.inputs.parameters['block_tag']) ) block_steps.add(label) gen_data = Step( 'GenData', template=data_op, parameters={"block_tag" : block_steps.inputs.parameters['block_tag']}, artifacts={ "forces": label.outputs.artifacts["forces"], "centers": selection.outputs.artifacts["selected_cv_init"] }, key = '{}-gen-data'.format(block_steps.inputs.parameters['block_tag']), ) block_steps.add(gen_data) train_config = deepcopy(train_config) train_template_config = train_config.pop('template_config') train_executor = init_executor(train_config.pop('executor')) train = Step( "train", template=PythonOPTemplate( train_op, python_packages = upload_python_package, slices=Slices("{{item}}", input_parameter=["model_tag"], output_artifact=["model"]), **train_template_config, ), parameters={ "model_tag": block_steps.inputs.parameters["model_tags"], "angular_mask": block_steps.inputs.parameters["angular_mask"], "train_config": block_steps.inputs.parameters["train_config"], }, artifacts={ "data": gen_data.outputs.artifacts["data"], }, executor = train_executor, with_param=argo_range(argo_len(block_steps.inputs.parameters["model_tags"])), key = "{}-train".format(block_steps.inputs.parameters["block_tag"])+"-{{item}}", **train_config, ) block_steps.add(train) block_steps.outputs.artifacts["models"]._from = train.outputs.artifacts["model"] block_steps.outputs.artifacts["data"]._from = gen_data.outputs.artifacts["data"] block_steps.outputs.artifacts["conf_outs"]._from = exploration.outputs.artifacts["conf_outs"] block_steps.outputs.artifacts["exploration_trajectory"]._from = exploration.outputs.artifacts["trajectory"] block_steps.outputs.artifacts["exploration_md_log"]._from = exploration.outputs.artifacts["md_log"] block_steps.outputs.artifacts["selection_index"]._from = selection.outputs.artifacts["selected_indices"] block_steps.outputs.parameters["cluster_threshold"].value_from_parameter = selection.outputs.parameters["cluster_threshold"] return block_steps
[docs]class IterBlock(Steps): def __init__( self, name: str, exploration_op: OP, select_op: OP, label_op: OP, data_op: OP, adjust_lvl_op: OP, train_op: OP, adjust_lvl_config: Dict, train_config: Dict, upload_python_package = None ): self._input_parameters = { "block_tag" : InputParameter(type=str, value=""), "walker_tags": InputParameter(type=List), "model_tags": InputParameter(type=List), "trust_lvl_1" : InputParameter(type=List[float]), "trust_lvl_2": InputParameter(type=List[float]), "init_trust_lvl_1" : InputParameter(type=List[float]), "init_trust_lvl_2": InputParameter(type=List[float]), "exploration_config" : InputParameter(type=Dict), "cv_config" : InputParameter(type=Dict), "cluster_threshold": InputParameter(type=float, value=1.0), "angular_mask": InputParameter(type=Optional[Union[np.ndarray, List]]), "weights": InputParameter(type=Optional[Union[np.ndarray, List]]), "max_selection": InputParameter(type=int), "numb_cluster_threshold": InputParameter(type=float, value=30), "dt": InputParameter(type=float, value=0.02), "output_freq": InputParameter(type=float, value=2500), "slice_mode": InputParameter(type=str, value="gmx"), "label_config": InputParameter(type=Dict), "kappas": InputParameter(type=List[float]), "tail": InputParameter(type=float, value=0.9), "train_config": InputParameter(type=Dict), "adjust_amplifier": InputParameter(type=float, value=1.5), "max_level_multiple": InputParameter(type=float, value=8.0), } self._input_artifacts = { "models" : InputArtifact(optional=True), "forcefield" : InputArtifact(optional=True), "topology" : InputArtifact(optional=True), "inputfile": InputArtifact(optional=True), "confs" : InputArtifact(), "data_old": InputArtifact(), "index_file": InputArtifact(optional=True), "dp_files": InputArtifact(optional=True), "cv_file": InputArtifact(optional=True) } self._output_parameters = { "cluster_threshold": OutputParameter(type=int), "adjust_trust_lvl_1": OutputParameter(type=int), "adjust_trust_lvl_2": OutputParameter(type=int), } self._output_artifacts = { "exploration_md_log": OutputArtifact(), "exploration_trajectory": OutputArtifact(), "selection_index": OutputArtifact(), "models": OutputArtifact(), "data": OutputArtifact(), "conf_outs": OutputArtifact() } super().__init__( name=name, inputs=Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts ), ) self = _iter_block( self, exploration_op, select_op, label_op, data_op, adjust_lvl_op, train_op, adjust_lvl_config, train_config, upload_python_package = upload_python_package, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
def _iter_block( block_steps, exploration_op: OP, select_op: OP, label_op: OP, data_op: OP, adjust_lvl_op: OP, train_op: OP, adjust_lvl_config : Dict, train_config : Dict, upload_python_package : str = None, ): exploration = Step( "Exploration", template=exploration_op, parameters={ "trust_lvl_1" : block_steps.inputs.parameters['trust_lvl_1'], "trust_lvl_2": block_steps.inputs.parameters['trust_lvl_2'], "exploration_config" : block_steps.inputs.parameters['exploration_config'], "cv_config" : block_steps.inputs.parameters['cv_config'], "task_names" : block_steps.inputs.parameters['walker_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'], }, artifacts={ "models" : block_steps.inputs.artifacts['models'], "forcefield" : block_steps.inputs.artifacts['forcefield'], "topology" : block_steps.inputs.artifacts['topology'], "inputfile": block_steps.inputs.artifacts['inputfile'], "confs" : block_steps.inputs.artifacts['confs'], "index_file": block_steps.inputs.artifacts['index_file'], "dp_files": block_steps.inputs.artifacts['dp_files'], "cv_file": block_steps.inputs.artifacts['cv_file'] }, key = '{}-exploration'.format(block_steps.inputs.parameters['block_tag']) ) block_steps.add(exploration) selection = Step( "Selection", template=select_op, parameters={ "trust_lvl_1" : block_steps.inputs.parameters["init_trust_lvl_1"], "trust_lvl_2": block_steps.inputs.parameters["init_trust_lvl_2"], "cluster_threshold": block_steps.inputs.parameters["cluster_threshold"], "angular_mask": block_steps.inputs.parameters["angular_mask"], "weights": block_steps.inputs.parameters["weights"], "max_selection": block_steps.inputs.parameters["max_selection"], "dt": block_steps.inputs.parameters["dt"], "output_freq": block_steps.inputs.parameters["output_freq"], "slice_mode": block_steps.inputs.parameters["slice_mode"], "if_make_threshold": False, "task_names" : block_steps.inputs.parameters['walker_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'], }, artifacts={ "models" : block_steps.inputs.artifacts["models"], "plm_out": exploration.outputs.artifacts["plm_out"], "xtc_traj": exploration.outputs.artifacts["trajectory"], "topology": block_steps.inputs.artifacts["confs"] }, key = '{}-selection'.format(block_steps.inputs.parameters['block_tag']), ) block_steps.add(selection) adjust_lvl_config = deepcopy(adjust_lvl_config) adjust_lvl_template_config = adjust_lvl_config.pop('template_config') adjust_lvl_executor = init_executor(adjust_lvl_config.pop('executor')) adjust_lvl = Step( "adjust-level", template=PythonOPTemplate( adjust_lvl_op, python_packages = upload_python_package, slices=Slices("{{item}}", input_parameter=["trust_lvl_1", "trust_lvl_2", "numb_cluster", "init_trust_lvl_1", "init_trust_lvl_2"] ), **adjust_lvl_template_config, ), parameters={ "trust_lvl_1": block_steps.inputs.parameters["trust_lvl_1"], "trust_lvl_2": block_steps.inputs.parameters["trust_lvl_2"], "init_trust_lvl_1": block_steps.inputs.parameters["init_trust_lvl_1"], "init_trust_lvl_2": block_steps.inputs.parameters["init_trust_lvl_2"], "numb_cluster": selection.outputs.parameters["numb_cluster"], "numb_cluster_threshold": block_steps.inputs.parameters["numb_cluster_threshold"], "adjust_amplifier": block_steps.inputs.parameters["adjust_amplifier"], "max_level_multiple": block_steps.inputs.parameters["max_level_multiple"] }, artifacts={}, with_param=argo_range(argo_len(block_steps.inputs.parameters["trust_lvl_1"])), executor = adjust_lvl_executor, key = '{}-adjust-level'.format(block_steps.inputs.parameters['block_tag'])+"-{{item}}", **adjust_lvl_config, ) block_steps.add(adjust_lvl) label = Step( "Label", template=label_op, parameters={ "angular_mask": block_steps.inputs.parameters['angular_mask'], "label_config": block_steps.inputs.parameters['label_config'], "cv_config": block_steps.inputs.parameters['cv_config'], "kappas": block_steps.inputs.parameters['kappas'], "tail": block_steps.inputs.parameters['tail'], "conf_tags" : selection.outputs.parameters['selected_conf_tags'], "block_tag" : block_steps.inputs.parameters['block_tag'], }, artifacts={ "topology": block_steps.inputs.artifacts["topology"], "forcefield" : block_steps.inputs.artifacts['forcefield'], "confs": selection.outputs.artifacts["selected_confs"], "inputfile": block_steps.inputs.artifacts['inputfile'], "at": selection.outputs.artifacts["selected_cv_init"], "index_file": block_steps.inputs.artifacts['index_file'], "dp_files": block_steps.inputs.artifacts['dp_files'], "cv_file": block_steps.inputs.artifacts['cv_file'] }, key = '{}-label'.format(block_steps.inputs.parameters['block_tag']) ) block_steps.add(label) gen_data = Step( 'GenData', template=data_op, parameters={"block_tag" : block_steps.inputs.parameters['block_tag']}, artifacts={ "forces": label.outputs.artifacts["forces"], "centers": selection.outputs.artifacts["selected_cv_init"], "data_old": block_steps.inputs.artifacts['data_old'] }, key = '{}-gen-data'.format(block_steps.inputs.parameters['block_tag']), ) block_steps.add(gen_data) train_config = deepcopy(train_config) train_template_config = train_config.pop('template_config') train_executor = init_executor(train_config.pop('executor')) train = Step( "train", template=PythonOPTemplate( train_op, python_packages = upload_python_package, slices=Slices("{{item}}", input_parameter=["model_tag"], output_artifact=["model"]), **train_template_config, ), parameters={ "model_tag": block_steps.inputs.parameters["model_tags"], "angular_mask": block_steps.inputs.parameters["angular_mask"], "train_config": block_steps.inputs.parameters["train_config"], }, artifacts={ "data": gen_data.outputs.artifacts["data"], }, executor = train_executor, with_param=argo_range(argo_len(block_steps.inputs.parameters["model_tags"])), key = "{}-train".format(block_steps.inputs.parameters["block_tag"])+"-{{item}}", **train_config, ) block_steps.add(train) block_steps.outputs.artifacts["models"]._from = train.outputs.artifacts["model"] block_steps.outputs.artifacts["data"]._from = gen_data.outputs.artifacts["data"] block_steps.outputs.artifacts["conf_outs"]._from = exploration.outputs.artifacts["conf_outs"] block_steps.outputs.artifacts["exploration_trajectory"]._from = exploration.outputs.artifacts["trajectory"] block_steps.outputs.artifacts["exploration_md_log"]._from = exploration.outputs.artifacts["md_log"] block_steps.outputs.artifacts["selection_index"]._from = selection.outputs.artifacts["selected_indices"] block_steps.outputs.parameters["cluster_threshold"].value_from_parameter = selection.outputs.parameters["cluster_threshold"] block_steps.outputs.parameters["adjust_trust_lvl_1"].value_from_parameter = adjust_lvl.outputs.parameters["adjust_trust_lvl_1"] block_steps.outputs.parameters["adjust_trust_lvl_2"].value_from_parameter = adjust_lvl.outputs.parameters["adjust_trust_lvl_2"] return block_steps