Source code for rid.op.calc_mf

from dflow.python import (
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    Parameter
)

from typing import List, Optional, Union
from pathlib import Path
import numpy as np
from rid.constants import (
        force_out
    )
from rid.utils import load_txt


[docs]class CalcMF(OP): """ Calculate mean force with the results of restrained MD. .. math:: MeanForce = ( Average(CVs) - Initial(CVs) ) * kappa CalcMF will handle periodic CVs by `angular_mask`. To get the mean value of CVs near equilibrium state under restrained MD, only part of outputs of CV values (the last `tail` part, for example, the last 90% CV values) are used. """
[docs] @classmethod def get_input_sign(cls): return OPIOSign( { "task_name": str, "plm_out": Artifact(Path), "kappas": List[float], "at": Artifact(Path), "tail": Parameter(float, default=0.9), "angular_mask": Optional[Union[np.ndarray, List]], } )
[docs] @classmethod def get_output_sign(cls): return OPIOSign( { "forces": Artifact(Path) } )
[docs] @OP.exec_sign_check def execute( self, op_in: OPIO, ) -> OPIO: r"""Execute the OP. Parameters ---------- op_in : dict Input dict with components: - `task_name`: (`str`) Task name, used to make sub-directory for the task. - `plm_out`: (`Artifact(Path)`) Outputs of CV values from restrained MD simulations. These outputs are generated by PLUMED2. - `kappas`: (`List[float]`) Force constants of harmonic restraints. - `at`: (`Artifact(Path)`) Files containing initial CV values, or CV centers. - `tail`: (`float`) Only the last `tail` of CV values will be used to estimate mean force. - `angular_mask`: (`int`) Mask list for periodic CVs. 1 represents periodic, 0 represents non-periodic. length of angular_mask equals to the dimension of CVs. Returns ------- Output dict with components: - `forces`: (`Artifact(Path)`) Files containing mean forces. """ data = load_txt(op_in["plm_out"]) data = data[:, 1:] # removr the first column(time index). centers = load_txt(op_in["at"]) nframes = data.shape[0] angular_boolean = (np.array(op_in["angular_mask"], dtype=int) == 1) init_angle = data[0][angular_boolean] for ii in range(1, nframes): current_angle = data[ii][angular_boolean] angular_diff = current_angle - init_angle current_angle[angular_diff < -np.pi] += 2 * np.pi current_angle[angular_diff >= np.pi] -= 2 * np.pi data[ii][angular_boolean] = current_angle start_f = int(nframes * (1-op_in["tail"])) avgins = np.average(data[start_f:, :], axis=0) diff = avgins - centers angular_diff = diff[angular_boolean] angular_diff[angular_diff < -np.pi] += 2 * np.pi angular_diff[angular_diff > np.pi] -= 2 * np.pi diff[angular_boolean] = angular_diff ff = np.multiply(op_in["kappas"], diff) task_path = Path(op_in["task_name"]) task_path.mkdir(exist_ok=True, parents=True) np.savetxt(task_path.joinpath(force_out), np.reshape(ff, [1, -1]), fmt='%.10e') op_out = OPIO( { "forces": task_path.joinpath(force_out) } ) return op_out