from dflow.python import (
OP,
OPIO,
OPIOSign,
Artifact,
Parameter
)
from typing import List, Optional, Union
from pathlib import Path
import numpy as np
from rid.constants import (
force_out
)
from rid.utils import load_txt
[docs]class CalcMF(OP):
"""
Calculate mean force with the results of restrained MD.
.. math::
MeanForce = ( Average(CVs) - Initial(CVs) ) * kappa
CalcMF will handle periodic CVs by `angular_mask`.
To get the mean value of CVs near equilibrium state under restrained MD, only part of outputs of CV values
(the last `tail` part, for example, the last 90% CV values) are used.
"""
[docs] @classmethod
def get_output_sign(cls):
return OPIOSign(
{
"forces": Artifact(Path)
}
)
[docs] @OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
r"""Execute the OP.
Parameters
----------
op_in : dict
Input dict with components:
- `task_name`: (`str`) Task name, used to make sub-directory for the task.
- `plm_out`: (`Artifact(Path)`) Outputs of CV values from restrained MD simulations.
These outputs are generated by PLUMED2.
- `kappas`: (`List[float]`) Force constants of harmonic restraints.
- `at`: (`Artifact(Path)`) Files containing initial CV values, or CV centers.
- `tail`: (`float`) Only the last `tail` of CV values will be used to estimate mean force.
- `angular_mask`: (`int`) Mask list for periodic CVs. 1 represents periodic, 0 represents non-periodic.
length of angular_mask equals to the dimension of CVs.
Returns
-------
Output dict with components:
- `forces`: (`Artifact(Path)`) Files containing mean forces.
"""
data = load_txt(op_in["plm_out"])
data = data[:, 1:] # removr the first column(time index).
centers = load_txt(op_in["at"])
nframes = data.shape[0]
angular_boolean = (np.array(op_in["angular_mask"], dtype=int) == 1)
init_angle = data[0][angular_boolean]
for ii in range(1, nframes):
current_angle = data[ii][angular_boolean]
angular_diff = current_angle - init_angle
current_angle[angular_diff < -np.pi] += 2 * np.pi
current_angle[angular_diff >= np.pi] -= 2 * np.pi
data[ii][angular_boolean] = current_angle
start_f = int(nframes * (1-op_in["tail"]))
avgins = np.average(data[start_f:, :], axis=0)
diff = avgins - centers
angular_diff = diff[angular_boolean]
angular_diff[angular_diff < -np.pi] += 2 * np.pi
angular_diff[angular_diff > np.pi] -= 2 * np.pi
diff[angular_boolean] = angular_diff
ff = np.multiply(op_in["kappas"], diff)
task_path = Path(op_in["task_name"])
task_path.mkdir(exist_ok=True, parents=True)
np.savetxt(task_path.joinpath(force_out), np.reshape(ff, [1, -1]), fmt='%.10e')
op_out = OPIO(
{
"forces": task_path.joinpath(force_out)
}
)
return op_out