Source code for rid.flow.loop

from dflow import (
    InputParameter,
    Inputs,
    InputArtifact,
    Outputs,
    OutputArtifact,
    Step,
    Steps,
    if_expression,
)
from dflow.python import(
    PythonOPTemplate,
    OP
)
from typing import List, Optional, Dict, Union
import numpy as np
from rid.utils import init_executor
from rid.op.prep_rid import PrepRiD
from rid.op.recorder import Recorder
from copy import deepcopy


[docs]class ReinforcedDynamicsLoop(Steps): def __init__( self, name : str, block_op : Steps, step_config : dict, upload_python_package : str = None, ): self._input_parameters={ "numb_iters" : InputParameter(type=int), "last_iteration" : InputParameter(type=int), "block_tag" : InputParameter(type=str, value=""), "walker_tags": InputParameter(type=List), "model_tags": InputParameter(type=List), "trust_lvl_1" : InputParameter(type=List[float]), "trust_lvl_2": InputParameter(type=List[float]), "init_trust_lvl_1" : InputParameter(type=List[float]), "init_trust_lvl_2": InputParameter(type=List[float]), "exploration_config" : InputParameter(type=Dict), "cv_config" : InputParameter(type=Dict), "cluster_threshold": InputParameter(type=float, value=1.0), "angular_mask": InputParameter(type=Optional[Union[np.ndarray, List]]), "weights": InputParameter(type=Optional[Union[np.ndarray, List]]), "max_selection": InputParameter(type=int), "numb_cluster_threshold": InputParameter(type=float, value=30), "dt": InputParameter(type=float, value=0.02), "output_freq": InputParameter(type=float, value=2500), "slice_mode": InputParameter(type=str, value="gmx"), "label_config": InputParameter(type=Dict), "kappas": InputParameter(type=List[float]), "tail": InputParameter(type=float, value=0.9), "train_config": InputParameter(type=Dict), "adjust_amplifier": InputParameter(type=float, value=1.5), "max_level_multiple": InputParameter(type=float, value=8.0), } self._input_artifacts={ "models" : InputArtifact(optional=True), "forcefield" : InputArtifact(optional=True), "topology" : InputArtifact(optional=True), "inputfile": InputArtifact(optional=True), "confs" : InputArtifact(), "data_old": InputArtifact(), "index_file": InputArtifact(optional=True), "dp_files": InputArtifact(optional=True), "cv_file": InputArtifact(optional=True) } self._output_parameters={ } self._output_artifacts={ "exploration_trajectory": OutputArtifact(), "models": OutputArtifact(), "data": OutputArtifact(), "conf_outs": OutputArtifact() } super().__init__( name = name, inputs = Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) _step_keys = ['block', 'recorder'] step_keys = {} for ii in _step_keys: step_keys[ii] = '-'.join(["%s"%self.inputs.parameters["block_tag"], ii]) self = _loop( self, step_keys, name, block_op, step_config = step_config, upload_python_package = upload_python_package, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
def _loop ( steps, step_keys, name : str, block_op : OP, step_config : dict, upload_python_package : str = None, ): step_config = deepcopy(step_config) step_template_config = step_config.pop('template_config') step_executor = init_executor(step_config.pop('executor')) recorder_step = Step( name = name + '-recorder', template=PythonOPTemplate( Recorder, python_packages = upload_python_package, **step_template_config, ), parameters={ "iteration": steps.inputs.parameters['last_iteration'], }, artifacts={}, key = step_keys['recorder'], executor = step_executor, **step_config, ) steps.add(recorder_step) block_step = Step( name = name + '-block', template = block_op, parameters={ "block_tag": recorder_step.outputs.parameters["block_tag"], "walker_tags": steps.inputs.parameters["walker_tags"], "model_tags": steps.inputs.parameters["model_tags"], "exploration_config": steps.inputs.parameters["exploration_config"], "cv_config": steps.inputs.parameters["cv_config"], "trust_lvl_1" : steps.inputs.parameters["trust_lvl_1"], "trust_lvl_2": steps.inputs.parameters["trust_lvl_2"], "init_trust_lvl_1": steps.inputs.parameters["init_trust_lvl_1"], "init_trust_lvl_2": steps.inputs.parameters["init_trust_lvl_2"], "cluster_threshold": steps.inputs.parameters["cluster_threshold"], "angular_mask": steps.inputs.parameters["angular_mask"], "weights": steps.inputs.parameters["weights"], "max_selection": steps.inputs.parameters["max_selection"], "numb_cluster_threshold": steps.inputs.parameters["numb_cluster_threshold"], "dt": steps.inputs.parameters["dt"], "output_freq": steps.inputs.parameters["output_freq"], "slice_mode": steps.inputs.parameters["slice_mode"], "label_config": steps.inputs.parameters["label_config"], "kappas": steps.inputs.parameters["kappas"], "train_config": steps.inputs.parameters["train_config"] }, artifacts={ "models": steps.inputs.artifacts["models"], "forcefield" : steps.inputs.artifacts['forcefield'], "topology": steps.inputs.artifacts["topology"], "inputfile": steps.inputs.artifacts["inputfile"], "confs": steps.inputs.artifacts["confs"], "data_old": steps.inputs.artifacts["data_old"], "index_file": steps.inputs.artifacts["index_file"], "dp_files": steps.inputs.artifacts["dp_files"], "cv_file": steps.inputs.artifacts["cv_file"] }, key = step_keys['block'], ) steps.add(block_step) next_step = Step( name = name+'-next', template = steps, parameters={ "numb_iters": steps.inputs.parameters["numb_iters"], "last_iteration": recorder_step.outputs.parameters["next_iteration"], "block_tag": recorder_step.outputs.parameters["block_tag"], "walker_tags": steps.inputs.parameters["walker_tags"], "model_tags": steps.inputs.parameters["model_tags"], "exploration_config": steps.inputs.parameters["exploration_config"], "cv_config": steps.inputs.parameters["cv_config"], "trust_lvl_1": block_step.outputs.parameters["adjust_trust_lvl_1"], "trust_lvl_2": block_step.outputs.parameters["adjust_trust_lvl_2"], "init_trust_lvl_1": steps.inputs.parameters["init_trust_lvl_1"], "init_trust_lvl_2": steps.inputs.parameters["init_trust_lvl_2"], "cluster_threshold": block_step.outputs.parameters["cluster_threshold"], "angular_mask": steps.inputs.parameters["angular_mask"], "weights": steps.inputs.parameters["weights"], "max_selection": steps.inputs.parameters["max_selection"], "numb_cluster_threshold": steps.inputs.parameters["numb_cluster_threshold"], "dt": steps.inputs.parameters["dt"], "output_freq": steps.inputs.parameters["output_freq"], "slice_mode": steps.inputs.parameters["slice_mode"], "label_config": steps.inputs.parameters["label_config"], "kappas": steps.inputs.parameters["kappas"], "train_config": steps.inputs.parameters["train_config"] }, artifacts={ "models": block_step.outputs.artifacts["models"], "forcefield" : steps.inputs.artifacts['forcefield'], "topology": steps.inputs.artifacts["topology"], "inputfile": steps.inputs.artifacts["inputfile"], "confs": block_step.outputs.artifacts["conf_outs"], "data_old": block_step.outputs.artifacts["data"], "index_file": steps.inputs.artifacts["index_file"], "dp_files": steps.inputs.artifacts["dp_files"], "cv_file": steps.inputs.artifacts["cv_file"] }, when = "%s < %s" % (recorder_step.outputs.parameters['next_iteration'], steps.inputs.parameters["numb_iters"]), ) steps.add(next_step) steps.outputs.artifacts['exploration_trajectory'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= steps.inputs.parameters["numb_iters"]), _then = block_step.outputs.artifacts['exploration_trajectory'], _else = next_step.outputs.artifacts['exploration_trajectory'], ) steps.outputs.artifacts['conf_outs'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= steps.inputs.parameters["numb_iters"]), _then = block_step.outputs.artifacts['conf_outs'], _else = next_step.outputs.artifacts['conf_outs'], ) steps.outputs.artifacts['models'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= steps.inputs.parameters["numb_iters"]), _then = block_step.outputs.artifacts['models'], _else = next_step.outputs.artifacts['models'], ) steps.outputs.artifacts['data'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= steps.inputs.parameters["numb_iters"]), _then = block_step.outputs.artifacts['data'], _else = next_step.outputs.artifacts['data'], ) return steps
[docs]class ReinforcedDynamics(Steps): def __init__( self, name : str, init_block_op : Steps, block_op : Steps, step_config : dict, upload_python_package : str = None, ): self._input_parameters={} self._input_artifacts={ "models": InputArtifact(optional=True), "forcefield": InputArtifact(optional=True), "topology": InputArtifact(optional=True), "inputfile": InputArtifact(optional=True), "confs": InputArtifact(), "rid_config": InputArtifact(), "index_file": InputArtifact(optional=True), "dp_files": InputArtifact(optional=True), "cv_file": InputArtifact(optional=True) } self._output_parameters={ } self._output_artifacts={ "exploration_trajectory": OutputArtifact(), "models": OutputArtifact(), "data": OutputArtifact(), "conf_outs": OutputArtifact() } super().__init__( name = name, inputs = Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) _init_keys = ['recorder', 'block'] step_keys = {} for ii in _init_keys: step_keys[ii] = '--'.join(['init', ii]) self = _rid( self, step_keys, name, init_block_op, block_op, step_config = step_config, upload_python_package = upload_python_package, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def init_keys(self): return self._init_keys @property def loop_keys(self): return [self.loop_key] + self.loop.keys
def _rid( steps, step_keys, name, init_block_op, block_op, step_config : dict, upload_python_package : Optional[str] = None ): _step_config = deepcopy(step_config) step_template_config = _step_config.pop('template_config') step_executor = init_executor(_step_config.pop('executor')) prep_rid = Step( name = 'prepare-rid', template=PythonOPTemplate( PrepRiD, python_packages = upload_python_package, **step_template_config, ), parameters={}, artifacts={ "confs": steps.inputs.artifacts['confs'], "rid_config" : steps.inputs.artifacts['rid_config'], }, key = 'prepare-rid', executor = step_executor, **_step_config, ) steps.add(prep_rid) recorder_step = Step( name = name + '-recorder', template=PythonOPTemplate( Recorder, python_packages = upload_python_package, **step_template_config, ), parameters={ "iteration": None, }, artifacts={}, key = "init-recorder", executor = step_executor, **_step_config, ) steps.add(recorder_step) init_block = Step( name = name + '-block', template = init_block_op, parameters={ "block_tag": recorder_step.outputs.parameters["block_tag"], "walker_tags": prep_rid.outputs.parameters["walker_tags"], "model_tags": prep_rid.outputs.parameters["model_tags"], "exploration_config": prep_rid.outputs.parameters["exploration_config"], "cv_config": prep_rid.outputs.parameters["cv_config"], "trust_lvl_1" : prep_rid.outputs.parameters["trust_lvl_1"], "trust_lvl_2": prep_rid.outputs.parameters["trust_lvl_2"], "numb_cluster_upper": prep_rid.outputs.parameters['numb_cluster_upper'], "numb_cluster_lower": prep_rid.outputs.parameters['numb_cluster_lower'], "cluster_threshold": prep_rid.outputs.parameters["cluster_threshold"], "angular_mask": prep_rid.outputs.parameters["angular_mask"], "weights": prep_rid.outputs.parameters["weights"], "max_selection": prep_rid.outputs.parameters["max_selection"], "dt": prep_rid.outputs.parameters["dt"], "output_freq": prep_rid.outputs.parameters["output_freq"], "slice_mode": prep_rid.outputs.parameters["slice_mode"], "label_config": prep_rid.outputs.parameters["label_config"], "kappas": prep_rid.outputs.parameters["kappas"], "train_config": prep_rid.outputs.parameters["train_config"] }, artifacts={ "models": steps.inputs.artifacts["models"], "forcefield" : steps.inputs.artifacts['forcefield'], "topology": steps.inputs.artifacts["topology"], "inputfile": steps.inputs.artifacts["inputfile"], "confs": prep_rid.outputs.artifacts["confs"], "index_file": steps.inputs.artifacts["index_file"], "dp_files": steps.inputs.artifacts["dp_files"], "cv_file": steps.inputs.artifacts["cv_file"] }, key = "%s-init-block"%recorder_step.outputs.parameters["block_tag"] ) steps.add(init_block) loop_step = Step( name = name + '-loop', template = ReinforcedDynamicsLoop( name = "RiD-Loop", block_op = block_op, step_config = step_config, upload_python_package = upload_python_package ), parameters={ "numb_iters": prep_rid.outputs.parameters["numb_iters"], "last_iteration": recorder_step.outputs.parameters["next_iteration"], "block_tag": recorder_step.outputs.parameters["block_tag"], "walker_tags": prep_rid.outputs.parameters["walker_tags"], "model_tags": prep_rid.outputs.parameters["model_tags"], "exploration_config": prep_rid.outputs.parameters["exploration_config"], "cv_config": prep_rid.outputs.parameters["cv_config"], "trust_lvl_1" : prep_rid.outputs.parameters["trust_lvl_1"], "trust_lvl_2": prep_rid.outputs.parameters["trust_lvl_2"], "init_trust_lvl_1": prep_rid.outputs.parameters["trust_lvl_1"], "init_trust_lvl_2": prep_rid.outputs.parameters["trust_lvl_2"], "cluster_threshold": init_block.outputs.parameters["cluster_threshold"], "angular_mask": prep_rid.outputs.parameters["angular_mask"], "weights": prep_rid.outputs.parameters["weights"], "max_selection": prep_rid.outputs.parameters["max_selection"], "numb_cluster_threshold": prep_rid.outputs.parameters["numb_cluster_threshold"], "dt": prep_rid.outputs.parameters["dt"], "output_freq": prep_rid.outputs.parameters["output_freq"], "slice_mode": prep_rid.outputs.parameters["slice_mode"], "label_config": prep_rid.outputs.parameters["label_config"], "kappas": prep_rid.outputs.parameters["kappas"], "train_config": prep_rid.outputs.parameters["train_config"] }, artifacts={ "models": init_block.outputs.artifacts["models"], "forcefield" : steps.inputs.artifacts['forcefield'], "topology": steps.inputs.artifacts["topology"], "inputfile": steps.inputs.artifacts["inputfile"], "confs": init_block.outputs.artifacts["conf_outs"], "data_old": init_block.outputs.artifacts["data"], "index_file": steps.inputs.artifacts["index_file"], "dp_files": steps.inputs.artifacts["dp_files"], "cv_file": steps.inputs.artifacts["cv_file"] }, when = "%s < %s" % (recorder_step.outputs.parameters['next_iteration'], prep_rid.outputs.parameters["numb_iters"]), key = "rid-loop", ) steps.add(loop_step) steps.outputs.artifacts['exploration_trajectory'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= prep_rid.outputs.parameters["numb_iters"]), _then = init_block.outputs.artifacts['exploration_trajectory'], _else = loop_step.outputs.artifacts['exploration_trajectory'], ) steps.outputs.artifacts['conf_outs'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= prep_rid.outputs.parameters["numb_iters"]), _then = init_block.outputs.artifacts['conf_outs'], _else = loop_step.outputs.artifacts['conf_outs'], ) steps.outputs.artifacts['models'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= prep_rid.outputs.parameters["numb_iters"]), _then = init_block.outputs.artifacts['models'], _else = loop_step.outputs.artifacts['models'], ) steps.outputs.artifacts['data'].from_expression = \ if_expression( _if = (recorder_step.outputs.parameters['next_iteration'] >= prep_rid.outputs.parameters["numb_iters"]), _then = init_block.outputs.artifacts['data'], _else = loop_step.outputs.artifacts['data'], ) return steps