"""Discrete Variable Representation Operator Class"""
from __future__ import annotations
import _pickle
import math
import os
import pickle
from collections import Counter
from itertools import combinations, product
from typing import Callable
import numpy as np
import polars as pl
import scipy.linalg
from discvar.abc import DVRPrimitivesMixin
from loguru import logger
from pytdscf import units
from pytdscf._helper import from_dbkey, to_dbkey
from pytdscf._mpo_cls import to_mpo
try:
from ase.db import connect
from ase.units import Hartree
except ImportError:
from loguru import logger as _logger
logger = _logger.bind(name="main")
logger.warning("Failed to import ase. You cannot use database.")
[docs]
def deepcopy(item):
"""copy.deepcopy() is too lazy"""
return _pickle.loads(_pickle.dumps(item, -1))
logger = logger.bind(name="main")
debye_in_ase = units.au_in_angstrom / units.au_in_debye
[docs]
class TensorOperator:
r""" Tensor Operator class
Attributes:
shape (Tuple[int]) : Tensor shape
tensor_orig (np.ndarray) : Original tensor
tensor_decomposed (np.ndarray) : Decomposed tensor
tensor_full (np.ndarray) : if ``only_diag==False``, \
``tensor_orig`` is zero-filled.
only_diag (bool) : Only diagonal terms are non-zero. \
Defaults to ``False``.
legs (Tuple[int]) : Tensor legs
name (str) : Tensor Operator name. Defaults to ``None``.
Tensor Example:
.. math::
\begin{array}{c}
j_0^\prime & & j_1^\prime & & & & j_3^\prime \\
| & & | & & & & | \\
W(0) & - & W(1) & - & - & - & W(3) \\
| & & | & & & & | \\
j_0 & & j_1 & & & & j_3 \\
\end{array}\\
.. math::
j_0 = 0, 1, .. n-1,
j_1 = 0, 1, .. m-1,
j_3 = 0, 1, .. l-1
``tensor = np.array([W0, W1, W3])``.
When ``only_diag=False``, above example is ``shape = (n, n, m, m, l, l)``,\
``legs=(0, 0, 1, 1, 3, 3)``, (even index is bra, odd index is ket).
Otherwise, ``only_diag=True``, ``shape=(n,m,l)``, ``legs=(0, 1, 3)``.
``W_1[beta_01,j_1^prime,j_1,beta_13] =``
:math:`W{\small \beta_{0,1}}_{j_1}^{j_1^\prime}{\small \beta_{1,3}}`
"""
legs: tuple[int, ...]
tensor_orig: np.ndarray
shape: tuple[int, ...]
only_diag: bool
tensor_decomposed: list[np.ndarray]
tensor_full: np.ndarray
name: str
bond_dimension: list[int]
full_bond_dimension: list[int]
def __init__(
self,
*,
shape: tuple[int, ...] | None = None,
tensor: np.ndarray | None = None,
only_diag: bool = False,
legs: tuple[int, ...] | None = None,
name: str | None = None,
mpo: list[np.ndarray] | None = None,
) -> None:
if name is not None:
self.name = name
else:
if mpo is not None:
assert isinstance(mpo, list)
only_diag = all(len(core.shape) == 3 for core in mpo)
bond_dimension = [1]
for core in mpo:
bond_dimension.append(core.shape[-1])
bond_dimension.append(1)
self.bond_dimension = bond_dimension
shape = tuple([i for _core in mpo for i in _core.shape[1:-1]])
self.tensor_decomposed = mpo
if shape is None and tensor is None:
raise ValueError(
"You must give argument either shape or tensor"
)
if tensor is None:
assert isinstance(shape, tuple)
self.shape = shape
if mpo is None:
self.tensor_orig = np.zeros(shape)
else:
self.shape = tensor.shape
self.tensor_orig = tensor
self.only_diag = only_diag
if legs is None:
if isinstance(mpo, list):
_legs = []
for i, core in enumerate(mpo):
if core.ndim == 3:
_legs.append(i)
elif core.ndim == 4:
_legs.extend([i, i])
else:
raise ValueError(
f"core.ndim must be 3 or 4, but {core.ndim}"
)
self.legs = tuple(_legs)
else:
if only_diag:
self.legs = tuple([i for i in range(len(self.shape))])
else:
raise ValueError(
"leg is ambiguous. Please give leg argument."
)
else:
self.legs = legs
assert len(self.legs) == len(self.shape), (
f"Tensor shape {self.shape} and legs {self.legs} are different"
)
@property
def dtype(self) -> np.dtype:
try:
return self.tensor_orig.dtype
except AttributeError:
return self.tensor_decomposed[0].dtype
def __str__(self) -> str:
dum = ""
if hasattr(self, "tensor_orig"):
dum += self.tensor_orig.__str__()
if hasattr(self, "tensor_decomposed"):
dum += "\n" + self.tensor_decomposed.__str__()
return dum
def __repr__(self) -> str:
return self.__str__()
def __add__(self, tensor_op) -> TensorOperator:
"""merge tensor operatos"""
tensor_op1 = self.add_dammy_legs(
add_legs=tensor_op.legs, add_shape=tensor_op.shape
)
tensor_op2 = tensor_op.add_dammy_legs(
add_legs=self.legs, add_shape=self.shape
)
if tensor_op1.only_diag ^ tensor_op2.only_diag:
raise NotImplementedError
else:
if (
tensor_op1.shape == tensor_op2.shape
and tensor_op1.legs == tensor_op2.legs
):
return TensorOperator(
tensor=tensor_op1.tensor_orig + tensor_op2.tensor_orig,
only_diag=tensor_op1.only_diag,
legs=tensor_op1.legs,
)
else:
raise ValueError(
"Tensor shapes or legs are different."
+ f" shapes : {tensor_op1.shape} vs {tensor_op2.shape}"
+ f" legs : {tensor_op1.legs} vs {tensor_op2.legs}"
)
[docs]
def add_dammy_legs(
self, add_legs: tuple[int, ...], add_shape: tuple[int, ...]
) -> TensorOperator:
r"""Dammy legs addition
Args:
add_legs(Tuple[int]) : additional tensor legs. \
e.g. add :math:`j_1 ` is ``add_legs=(1,)``.
add_shape(Tuple[int]) : additional tensor shape. \
e.g. add :math:`j_1 = 0,1,\ldots,3` is ``add_shape=(4,)``.
Returns:
TensorOperator : Filled Tensor
e.g.
``from legs = (0,1)`` to ``legs=(0,1,2)``, where dammy legs is 2.
"""
if len(add_legs) != len(add_shape):
raise ValueError("additional legs and shape is not the same")
_after_legs = list(set(self.legs + add_legs))
_after_legs.sort()
after_legs = tuple(_after_legs)
if not self.only_diag:
after_legs = self._repeat_leg(after_legs)
if tuple(after_legs) == self.legs:
return deepcopy(self)
add_index: list[int] = []
add_index_not_orig: list[int] = []
orig_index: list[int] = []
for ind, leg in enumerate(after_legs):
if leg in self.legs:
orig_index.append(ind)
else:
add_index_not_orig.append(ind)
if leg in add_legs:
add_index.append(ind)
after_shape: list[int] = [None for _ in range(len(after_legs))] # type: ignore
for shape, ind in zip(add_shape, add_index, strict=True):
after_shape[ind] = shape
for shape, ind in zip(self.shape, orig_index, strict=True):
after_shape[ind] = shape
to = TensorOperator(
shape=tuple(after_shape), legs=after_legs, only_diag=self.only_diag
)
add_shape_diag = add_shape
if len(add_legs) % 2 == 0:
if add_legs[::2] == add_legs[1::2]:
add_shape_diag = add_shape[0::2]
_iter = [range(add_shape_diag[p]) for p in add_index_not_orig]
for add_leg in product(*_iter):
after_leg = [slice(s) for s in after_shape]
if self.only_diag:
for leg, ind in zip(add_leg, add_index_not_orig, strict=True):
after_leg[ind] = leg
else:
for leg, ind in zip(add_leg, add_index_not_orig, strict=True):
"""maybe wrong (not debugging)"""
after_leg[ind] = leg
after_leg[ind + 1] = leg
to.tensor_orig[tuple(after_leg)] = deepcopy(self.tensor_orig)
return to
[docs]
def get_tensor_full(self) -> np.ndarray:
"""fill off-diagonal term zero if only_diag=True"""
if not hasattr(self, "tensor_full"):
if self.only_diag:
dum = np.zeros(self._repeat_leg(self.shape))
for leg in product(*[range(n) for n in self.shape]):
dum[self._repeat_leg(leg_diag=leg)] += self.tensor_orig[leg]
self.tensor_full = dum
else:
self.tensor_full = deepcopy(self.tensor_orig)
return self.tensor_full
def _repeat_leg(self, leg_diag: tuple[int, ...]) -> tuple[int, ...]:
dum = [(l1, l2) for l1, l2 in zip(leg_diag, leg_diag, strict=True)]
leg2: tuple[int, ...] = ()
for d in dum:
leg2 += d
return leg2
def _trans_J_to_1d(self, J: tuple[int, ...]) -> int:
"""
Args:
Tensor-index J (tuple) : (j_1, j_2, ..., j_f)
Returns:
int : 1d array index
"""
dum = 0
digits = np.flip(np.append(1, np.cumprod(self.shape[:0:-1])))
for j, d in zip(J, digits, strict=True):
dum += j * d
return int(dum)
def _iter_J(self, N: tuple[int, ...] | None = None):
"""
Iterate all patterns of tensor index J = (j_1, j_2, ..., j_f)
"""
if N is None:
N = self.shape
for J in product(*[[k for k in range(n)] for n in N]):
yield J
def _reshape_row_to_col(self, matrix: np.ndarray, index: int):
"""
Reshape
A(n^k, n^l) -> A(n^{k-1}, n^{l+1})
Args:
matrix (np.ndarray) : Original matrix
index (int) : mode which move row to column (0-index)
Returns:
np.ndarray : Transformed matrix
"""
row = int(self.shape[index] * matrix.shape[0])
col = int(np.prod(self.shape[index + 1 :]))
new_matrix = np.zeros((row, col))
for j in range(self.shape[index]):
new_matrix[matrix.shape[0] * j : matrix.shape[0] * (j + 1), :] = (
matrix[:, col * j : col * (j + 1)]
)
return new_matrix
def _QRD(self) -> list[np.ndarray]:
"""QR decomposition-based MPO"""
self.tensor_decomposed = []
if self.only_diag:
r = np.zeros((1, np.prod(self.shape)), dtype=np.float64)
for J in self._iter_J():
r[0, self._trans_J_to_1d(J)] = self.tensor_orig[J]
for i in range(len(self.shape)):
r = self._reshape_row_to_col(
r[: self.full_bond_dimension[i], :], i
)
q, r = scipy.linalg.qr(r)
self.tensor_decomposed.append(
np.array(
[
q[
tau :: self.full_bond_dimension[i],
: self.full_bond_dimension[i + 1],
]
for tau in range(self.full_bond_dimension[i])
],
dtype=np.float64,
)
)
self.tensor_decomposed[-1] *= r[0, 0]
else:
raise NotImplementedError("non-diag case")
return self.tensor_decomposed
def _SVD(self):
"""Singular Value Decomposition-based MPO"""
self.tensor_decomposed = []
if self.only_diag:
r = np.zeros((1, np.prod(self.shape)))
for J in self._iter_J():
r[0, self._trans_J_to_1d(J)] = self.tensor_orig[J]
for i in range(len(self.shape)):
r = self._reshape_row_to_col(r[: self.bond_dimension[i], :], i)
if (
1 < self.bond_dimension[i + 1] < min(r.shape)
and min(r.shape) > 10000
and not hasattr(self, "rate")
):
logger.warning("matrix is too large to execute full-SVD")
U, s, Vh = scipy.sparse.linalg.svds(
r, k=self.bond_dimension[i + 1]
)
else:
try:
U, s, Vh = scipy.linalg.svd(r, full_matrices=False)
except np.linalg.LinAlgError:
U, s, Vh = scipy.linalg.svd(
r, full_matrices=False, lapack_driver="gesvd"
)
if hasattr(self, "rate"):
if self.square_sum:
total_val = np.tensordot(
r, r.T, axes=[[1, 0], [0, 1]]
)
else:
total_val = np.sum(s)
cum_s_val = 0.0
rank = 0
while (
cum_s_val / total_val < self.rate
and rank < self.bond_dimension[i + 1]
):
if self.square_sum:
cum_s_val += s[rank] ** 2
else:
cum_s_val += s[rank]
rank += 1
self.bond_dimension[i + 1] = rank
self.tensor_decomposed.append(
np.array(
[
U[
tau :: self.bond_dimension[i],
: self.bond_dimension[i + 1],
]
for tau in range(self.bond_dimension[i])
]
)
)
dim = min(self.bond_dimension[i + 1], Vh.shape[0])
r = np.diag(s[:dim]) @ Vh[:dim, :]
self.tensor_decomposed[-1] *= r[0, 0]
else:
raise NotImplementedError("non-diag case")
return self.tensor_decomposed
[docs]
def decompose(
self,
bond_dimension: list[int] | int | None = None,
decompose_type: str = "SVD",
rate: float | None = None,
square_sum: bool = True,
overwrite: bool = False,
) -> list[np.ndarray]:
r"""MPO Decomposition
Args:
bond_dimension (List[int] or int) : MPO bond dimension
decompose_type (str) : Tensor Train Decomposition \
Type ``'QRD'`` or ``'SVD'``. Defaults to ``'SVD'``.
rate (float) : SVD decomposition contribution rate truncation. \
Defaults to ``None``.
square_sum (bool) : Whether ``rate`` is based on the sum of \
squares of the eigenvalues, or simply sum of the eigenvalues. \
Square sum can calculate easily by trace of matrix, \
but need high ``rate`` percentage. We recommend
``rate=0.99999999999``, if ``square_sum=True``.
Returns:
List[np.ndarray] : MPO
"""
if not overwrite and hasattr(self, "tensor_decomposed"):
return self.tensor_decomposed
if (not hasattr(self, "bond_dimension")) or bond_dimension is not None:
if decompose_type.lower() in ["qrd", "qr"]:
bond_dimension = None
self.get_bond_dimension(bond_dimension=bond_dimension)
if rate is not None:
if 0.0 < rate < 1.0:
self.rate = rate
self.square_sum = square_sum
else:
raise ValueError(
f"Contribution rate must be in (0.0, 1.0), but {rate}"
)
if len(set(self.legs)) >= 2:
if decompose_type.lower() in ["qrd", "qr"]:
return self._QRD()
elif decompose_type.lower() in ["svd", "sv"]:
return self._SVD()
else:
raise ValueError('decompose_type must be "QRD" or "SVD"')
else:
if self.only_diag:
dum = np.zeros(
(1, self.tensor_orig.shape[0], 1),
dtype=self.tensor_orig.dtype,
)
dum[0, :, 0] = deepcopy(self.tensor_orig)
else:
dum = np.zeros(
(1, *self.tensor_orig.shape[0:2], 1),
dtype=self.tensor_orig.dtype,
)
dum[0, :, :, 0] = deepcopy(self.tensor_orig)
self.tensor_decomposed = [dum]
return self.tensor_decomposed
[docs]
def get_bond_dimension(self, bond_dimension: list[int] | int | None = None):
"""get MPO bond-dimension
Args:
bond_dimension(List[int] | int) : MPO bond dimension
"""
from_left = list(np.cumprod([1] + list(self.shape)))
if not self.only_diag:
from_left = from_left[0::2]
from_right = list(reversed(from_left))
self.full_bond_dimension = [
min(m1, m2) for m1, m2 in zip(from_left, from_right, strict=True)
]
if bond_dimension is None:
self.bond_dimension = deepcopy(self.full_bond_dimension)
elif isinstance(bond_dimension, int):
self.bond_dimension = [
min(m, bond_dimension) for m in self.full_bond_dimension
]
elif isinstance(bond_dimension, list):
if len(bond_dimension) != len(from_left):
raise ValueError(
"bond_dimension length is wrong. \
Note that terminal bond dimension are fixed to 1."
)
self.bond_dimension = [
min(m1, m2)
for m1, m2 in zip(
self.full_bond_dimension, bond_dimension, strict=True
)
]
else:
raise ValueError(
f"bond dimension type is wrong {type(bond_dimension)}"
)
return self.bond_dimension
[docs]
def get_num_of_elements(self) -> tuple[int, int | None]:
"""Get number of decomposed tensor operator element
Returns:
Tuple[int, int] : Number of tensor elements \
(before, after) decomposition
"""
before = self.tensor_orig.size
if hasattr(self, "bond_dimension"):
after = sum(
[
self.shape[i]
* self.bond_dimension[i]
* self.bond_dimension[i + 1]
for i in range(len(self.shape))
]
)
else:
after = None
return (before, after)
[docs]
def restore_from_decoposed(self) -> np.ndarray:
"""Restore ``tensor_orig`` from MPO (`tensor_decomposed`).
This routine is almost the same as ``mps_cls.buid_CICoef``
"""
dum = np.array(self.tensor_decomposed[0])
for isite in range(1, len(self.tensor_decomposed)):
dum = np.tensordot(dum, self.tensor_decomposed[isite], axes=[-1, 0])
return dum[0, ..., 0]
[docs]
def get_frobeinus_norm(self) -> float:
"""Get Frobenius norm of tensor operator"""
return float(np.linalg.norm(self.tensor_orig))
[docs]
def estimate_error(self, tensor: np.ndarray | None = None):
"""
Estimate Frobenius norm between self and given-tensor \
or renormalized tensor
Args:
tensor (np.ndarray) : given tensor (option)
Returns:
float : Error %
"""
if tensor is None:
tensor = self.restore_from_decoposed()
if tensor.shape == self.shape:
dif = self.tensor_orig - tensor
error_percent = (
1.0 - np.linalg.norm(dif) / np.linalg.norm(self.tensor_orig)
) * 100
return error_percent
else:
raise ValueError(
f"Tensor Shape if different {tensor.shape} VS {self.shape}"
)
def _default_name(self) -> str:
name = "legs"
for leg in self.legs:
name += f"_{leg}"
return name
[docs]
def save(self, name: str | None = None) -> None:
"""
Save Tensor Operator Object as binary
file in ./tensor_operators directory
Args:
name(str) : file name not including extension
"""
if name is None:
name = self._default_name()
self.name = name
if not os.path.exists("./tensor_operators"):
os.makedirs("./tensor_operators")
with open("./tensor_operators/" + name + ".pkl", "wb") as f:
pickle.dump(self, f)
[docs]
@classmethod
def load(cls, name) -> TensorOperator:
"""
Load Tensor Operator Object from binary
file in ./tensor_operators directory
"""
with open("./tensor_operators/" + name + ".pkl", "rb") as f:
TO = pickle.load(f)
return cls(
shape=TO.shape,
tensor=TO.tensor_orig,
only_diag=TO.only_diag,
legs=TO.legs,
)
# self.__init__(
# shape=TO.shape,
# tensor=TO.tensor_orig,
# only_diag=TO.only_diag,
# legs=TO.legs,
# )
[docs]
class PotentialFunction:
"""Analytical Model Potential from polynomial-based PES
Args:
DOFs (List[int]) : Degree of freedoms used for potential function. \
1-index.
polynomial_coef (Dict[Tuple[int], float]) : Polynomial PES coefficient.
cut_off (float) : potential coefficient cut-off. defaults to no-cutoff.
div_factorial(bool) : Whether or not divided by factorial term. \
Defaults to ``True``.
dipole (Optional[bool]) : Use dipole moment surface (3D vector).
efiled (Optional[tuple[float, float, float]]) : Electronic field. (only dipole)
"""
def __init__(
self,
DOFs: list[int],
polynomial_coef: dict[tuple[int, ...], float],
cut_off: float = -1.0,
div_factorial: bool = True,
dipole: bool = False,
efield: tuple[float, float, float] = (1.0e-02, 1.0e-02, 1.0e-02),
) -> None:
if polynomial_coef is None:
raise NotImplementedError
else:
self.DOFs_0index = [p - 1 for p in DOFs] # 0-index
self.DOFs_1index = DOFs # 1-index
self.mode_index = {}
for i, p in enumerate(self.DOFs_0index):
self.mode_index[p] = i
self.cut_off = cut_off
self.div_factorial = div_factorial
self.dipole = dipole
self.efield = efield
self.polynomial = dict()
for key, coef in polynomial_coef.items():
if set(key) & set(self.DOFs_1index) == set(key):
c_key = Counter(key)
if dipole:
coef = np.inner(coef, efield)
if self.div_factorial:
for order in c_key.values():
coef /= math.factorial(order)
if abs(coef) > self.cut_off:
self.polynomial[key] = coef
def __call__(self, *args) -> float:
dum = 0.0
if () in self.polynomial:
dum += self.polynomial[()]
for key, coef in self.polynomial.items():
term = deepcopy(coef)
c_key = Counter(key)
for idof, order in c_key.items():
term *= pow(args[self.mode_index[idof - 1]], order)
dum += term
return dum
[docs]
def construct_nMR_recursive(
dvr_prims: list[DVRPrimitivesMixin],
nMR: int = 3,
ndof: int | None = None,
func: dict[tuple[int, ...], Callable] | None = None,
db: str | None = None,
df: pl.DataFrame | None = None,
active_dofs: list[int] | None = None,
site_order: dict[int, int] | None = None,
zero_indices: list[int] | None = None,
return_tensor: bool = False,
include_const_in_mpo: bool = False,
ref_ene: float | None = None,
dipole: bool = False,
efield: tuple[float, float, float] = (1.0, 1.0, 1.0),
rate: float = 1.0,
k: int = 200,
nsweep: int = 4,
) -> list[np.ndarray] | dict[tuple[int, ...], TensorOperator]:
r"""Construct n-Mode Representation Operator
n-Mode Representation (nMR) are used for reducing grid points
for ab initio calculation.
To avoid duplicate addition of the same coordinates,
use principle of inclusion and exclusion.
(It is easy to understand by Venn-diagram)
Args:
dvr_prims (List[DVRPrimitivesMixin]) : DVR functions
nMR (Optional, int) : Mode Representation Number. Defaults to ``3``.
ndof (Optional, int) : number of mode including in database.
func (Optional, Dict[Tuple[int], Callable]) : E.g. Potential energy function
db (Optional, str) : Electronic structure calculation database path, \
such as 'foo/hoge.db'
df (Optional, pl.DataFrame) : Electronic structure calculation dataframe.
active_dofs (Optional, List[int]) : Active modes Defaults to ALL.
site_order (Optional, Dict[int, int]) : MPO site order. \
Defaults to DOF index order.
zero_indices (Optional, List[int]) : nMR criteria coordinate grid.
return_tensor (bool) : Return before decomposition. Defaults to False.
include_const_in_mpo (bool) : Include scalar constant value (such as refernce energy) in MPO. Defaults to False.
ref_ene (float) : Opt coord energy in a.u., if you need \
subtract from function or database energy. Defaults to `0.0`
dipole (bool) : Get dipole moment from database. \
Defaults to `False`
efield (List[float]) : \
Electronic field for inner products with dipole moment.\
Defaults to [1.0e-02 , 1.0e-02, 1.0e-02]
rate (float) : SVD contribution rate. Defaults to 1.0
k (int) : SVD bond dimension. Defaults to 200
Returns:
List[np.ndarray] : nMR MPO
Example:
.. math::
V &:= \sum_{\boldsymbol{x}\in \mathrm{coord.}} V(x_\alpha,x_\beta) \\
&= V_0 + V_{\mathrm{1MR}} + V_{\mathrm{2MR}}\\
where,
.. math::
V_0 &= V(0,0)\\
V_{\mathrm{1MR}} &= \sum_{x_\alpha} \left(V(x_\alpha,0) - V_0\right)
+ \sum_{x_\beta} \left(V(0, x_\beta) - V_0\right)\\
V_{\mathrm{2MR}} &= \sum_{x_\alpha}\sum_{x_\beta} \
\left(V(x_\alpha,x_\beta) - V(x_\alpha,0)- V(0, x_\beta) - V_0\right)
"""
if ndof is None:
if zero_indices is not None:
ndof = len(zero_indices)
else:
ndof = len(dvr_prims)
elif ndof < len(dvr_prims):
raise TypeError("ndof must be equal to or larger than dvr_prims length")
if active_dofs is None:
active_dofs = [idof for idof in range(ndof)]
elif len(active_dofs) != len(dvr_prims):
raise TypeError("active dofs length must be equal to dvr_prims length")
elif max(active_dofs) > ndof - 1:
raise TypeError("active_dofs `f` must be in 0 <= f < ndof")
if site_order is None:
site_order = {}
for isite, dof in enumerate(active_dofs):
site_order[dof] = isite
elif len(site_order) != ndof:
raise TypeError
elif min(site_order.values()) != 0 or max(site_order.values()) != ndof - 1:
raise TypeError
dvr_prims_site_order = [dvr_prims[site_order[p]] for p in active_dofs]
ngrids = [dvr_prims_site_order[p].ngrid for p in range(ndof)]
nMR_operators: dict[tuple[int, ...], TensorOperator | float] = dict()
if func is None and db is not None and df is None:
if zero_indices is None:
zero_indices = [None for _ in range(ndof)] # type: ignore
for i, prim in enumerate(dvr_prims):
grids = prim.get_grids()
for j, grid in enumerate(grids):
if abs(grid) < 1.0e-10:
zero_indices[i] = j
break
if None in zero_indices:
raise ValueError(
"No zero point grid in DVR grids."
+ "You cannot use n-Mode Representation approx."
+ "in this DVR primitives"
)
with connect(db) as _db:
logger.info("connected database")
if dipole:
dipole_ref = np.inner(
next(_db.select(grids=to_dbkey(tuple(zero_indices)))).dipole
/ debye_in_ase,
efield,
)
if include_const_in_mpo:
nMR_operators[()] = dipole_ref
else:
nMR_operators[()] = 0.0
logger.info(f"reference permanent dipole moment: {dipole_ref}")
else:
if ref_ene is None:
ref_ene = (
next(
_db.select(grids=to_dbkey(tuple(zero_indices)))
).energy
/ Hartree
)
nMR_operators[()] = 0.0
else:
nMR_operators[()] = (
next(
_db.select(grids=to_dbkey(tuple(zero_indices)))
).energy
/ Hartree
- ref_ene
)
logger.info(
f"reference energy: {ref_ene} [a.u.], scalar term: {nMR_operators[()]} [a.u.]"
)
for iMR in range(1, nMR + 1):
logger.info(f"read database: {iMR}-mode representation")
for mode_pair in combinations(active_dofs, iMR):
_site_pair = np.array([site_order[p] for p in mode_pair])
arg_sort = np.argsort(_site_pair)
site_pair: tuple[int, ...] = tuple(_site_pair[arg_sort])
shape = tuple(
[dvr_prims_site_order[p].ngrid for p in site_pair]
)
op = TensorOperator(
legs=site_pair, shape=shape, only_diag=True
)
for row in _db.select(dofs=to_dbkey(mode_pair)):
full_index = from_dbkey(row.key_value_pairs["grids"])
tensor_index: tuple[int, ...] = tuple(
np.array([full_index[p] for p in mode_pair])[
arg_sort
]
)
if dipole:
op.tensor_orig[tensor_index] = np.inner(
row.dipole / debye_in_ase, efield
)
if not include_const_in_mpo:
op.tensor_orig[tensor_index] -= dipole_ref
else:
op.tensor_orig[tensor_index] = (
row.energy / Hartree - ref_ene
)
nMR_operators[site_pair] = op
"""separation"""
logger.info("START: separate nMR tensor operators")
nMR_operators = _separate_recursive(nMR, ngrids, nMR_operators)
elif func is None and db is None and df is not None:
if dipole:
dipole_ref = np.inner(
df.filter(pl.col("distance") == 0)["dipole"][0], efield
)
if include_const_in_mpo:
nMR_operators[()] = dipole_ref
else:
nMR_operators[()] = 0.0
logger.info(f"reference permanent dipole moment: {dipole_ref}")
else:
if ref_ene is None:
if "energies" in df.schema:
ref_ene = df.filter(pl.col("distance") == 0)["energies"][0]
else:
ref_ene = df.filter(pl.col("distance") == 0)["energy"][0]
nMR_operators[()] = 0.0
else:
if "energies" in df.schema:
nMR_operators[()] = (
df.filter(pl.col("distance") == 0)["energies"][0]
- ref_ene
)
else:
nMR_operators[()] = (
df.filter(pl.col("distance") == 0)["energy"][0]
- ref_ene
)
logger.info(
f"reference energy: {ref_ene} [a.u.], scalar term: {nMR_operators[()]} [a.u.]"
)
for iMR in range(1, nMR + 1):
for mode_pair in combinations(active_dofs, iMR):
_site_pair = np.array([site_order[p] for p in mode_pair])
arg_sort = np.argsort(_site_pair)
site_pair = tuple(_site_pair[arg_sort])
shape = tuple(
[dvr_prims_site_order[p].ngrid for p in site_pair]
)
op = TensorOperator(legs=site_pair, shape=shape, only_diag=True)
for row in df.filter(
pl.col("dofs_db") == to_dbkey(mode_pair)
).iter_rows(named=True):
full_index = tuple(row["grids"])
tensor_index = tuple(
np.array([full_index[p] for p in mode_pair])[arg_sort]
)
if dipole:
op.tensor_orig[tensor_index] = np.inner(
row["dipole"], efield
)
if not include_const_in_mpo:
op.tensor_orig[tensor_index] -= dipole_ref
else:
if "energy" in row:
op.tensor_orig[tensor_index] = (
row["energy"] - ref_ene
)
else:
op.tensor_orig[tensor_index] = (
row["energies"][0] - ref_ene
)
nMR_operators[site_pair] = op
"""separation"""
logger.info("START: separate nMR tensor operators")
nMR_operators = _separate_recursive(nMR, ngrids, nMR_operators)
elif func is not None and db is None and df is None:
if tuple() in func:
nMR_operators[()] = func[()]()
else:
nMR_operators[()] = 0.0
for iMR in range(1, nMR + 1):
for mode_pair in combinations(active_dofs, iMR):
if mode_pair not in func:
continue
site_pair = tuple(site_order[p] for p in mode_pair)
grids = [enumerate(dvr_prims[p].get_grids()) for p in mode_pair]
shape = tuple([dvr_prims[p].ngrid for p in mode_pair])
op = TensorOperator(legs=mode_pair, shape=shape, only_diag=True)
for q_pair in product(*grids):
arg_pes = [q[1] for q in q_pair]
tensor_index = tuple([q[0] for q in q_pair])
op.tensor_orig[tensor_index] = func[mode_pair](*arg_pes)
nMR_operators[mode_pair] = op
else:
raise ValueError(
"Required either Callable Function or Calculated Data Base"
)
"""merge"""
scalar_term, nMR_operators_merged = _merge_nMR_operator_subspace(
nMR_operators
)
if return_tensor:
return nMR_operators_merged
if not include_const_in_mpo:
logger.info(f"scalar term {scalar_term} is excluded from MPO.")
scalar_term = 0.0
assert isinstance(scalar_term, float)
mpo = to_mpo(
nMR_operators=nMR_operators_merged, # type: ignore
ngrids=ngrids,
scalar_term=scalar_term,
rate=rate,
k=k,
nsweep=nsweep,
)
return mpo
[docs]
def tensor_dict_to_tensor_op(
tensor_dict: dict[tuple[int, ...], np.ndarray],
) -> dict[tuple[int, ...], TensorOperator]:
"""Convert tensor dictionary to TensorOperator dictionary
Args:
tensor_dict (Dict[Tuple[int, ...], np.ndarray]) : Tensor dictionary
Returns:
Dict[Tuple[int, ...], TensorOperator] : TensorOperator dictionary
"""
tensor_op = {}
for dof_pair, tensor in tensor_dict.items():
if dof_pair == ():
continue
tensor_op[dof_pair] = TensorOperator(
legs=dof_pair, tensor=tensor, only_diag=True
)
return tensor_op
[docs]
def tensor_dict_to_mpo(
tensor_dict: dict[tuple[int, ...], np.ndarray],
rate: float = 1.0,
nsweep: int = 4,
) -> list[np.ndarray]:
"""Convert tensor dictionary to MPO
Args:
tensor_dict (Dict[Tuple[int, ...], np.ndarray]) : Tensor dictionary
rate (float) : SVD contribution rate. Defaults to 1.0.\
If ``rate < 1.0``, MPO bond dimension is reduced.\
Typically, ``rate = 0.999999999`` is enough.
nsweep (int) : Number of sweep in SVD compression. Defaults to 4
Returns:
List[np.ndarray] : MPO
Notes:
scalar term ``tensor_dict[()]`` is not included in MPO.
"""
if not (0.0 < rate <= 1.0):
raise ValueError("rate must be 0.0 < rate <= 1.0")
ngrids = []
dof = 0
while (dof,) in tensor_dict:
ngrids.append(tensor_dict[(dof,)].shape[0])
dof += 1
tensor_op = tensor_dict_to_tensor_op(tensor_dict)
mpo = to_mpo(
nMR_operators=tensor_op, # type: ignore
ngrids=ngrids,
scalar_term=0.0,
rate=rate,
nsweep=nsweep,
)
return mpo
def _separate_recursive(
nMR: int,
ngrids: list[int],
nMR_operators: dict[tuple[int, ...], float | TensorOperator],
) -> dict[tuple[int, ...], float | TensorOperator]:
ndof = len(ngrids)
for iMR in range(1, nMR + 1):
for mode_pair in combinations(range(ndof), iMR):
shape = tuple([range(ngrids[p]) for p in mode_pair])
nMR_target = nMR_operators[mode_pair]
assert isinstance(nMR_target, TensorOperator)
for q_indices in product(*shape):
if mode_pair not in nMR_operators:
continue
nMR_target.tensor_orig[q_indices] -= nMR_operators[()]
for jMR in range(1, iMR):
for mode_pair_sub, q_indices_sub in zip(
combinations(mode_pair, jMR),
combinations(q_indices, jMR),
strict=True,
):
nMR_sub = nMR_operators[mode_pair_sub]
assert isinstance(nMR_sub, TensorOperator)
nMR_target.tensor_orig[q_indices] -= (
nMR_sub.tensor_orig[q_indices_sub]
)
return nMR_operators
[docs]
def construct_fulldimensional(
dvr_prims: list[DVRPrimitivesMixin],
func: Callable | None = None,
db: str | None = None,
ref_ene: float = 0.0,
dipole: bool = False,
efield: tuple[float, float, float] = (1.0e-02, 1.0e-02, 1.0e-02),
) -> dict[tuple[int, ...], TensorOperator]:
"""Construct full-dimensional Operator from DVR grid-based PES
Args:
dvr_prims (List[DVRPrimitivesMixin]) : DVR functions. Sorted in Database order.
func (Optional,Callable) : Potential energy function
db (Optional,str) : Electronic structure calculation database path, \
such as 'foo/hoge.db'
ref_ene (Optional, float) : Opt coord energy in a.u., if you need \
subtract from function or database energy. \
Defaults to `0.0`
dipole (Optional, bool) : Get dipole moment from database. \
Defaults to `False`
efield (Optional, List[float, float, float]) : Electronic field for \
inner products with dipole moment.\
Defaults to [1.0,1.0,1.0]
Returns:
Dict[Tuple[int], TensorOperator] : full-dimensional tensor operator
"""
ndof = len(dvr_prims)
grids = [enumerate(dvr_prims[p].get_grids()) for p in range(ndof)]
shape = tuple([dvr_prims[p].ngrid for p in range(ndof)])
dofs = tuple([idof for idof in range(ndof)])
op = TensorOperator(shape=shape, only_diag=True)
if db is None:
assert func is not None
for q_pair in product(*grids):
arg_pes = tuple([q[1] for q in q_pair])
q_indices = tuple([q[0] for q in q_pair])
op.tensor_orig[q_indices] = func(*arg_pes) - ref_ene
elif func is None:
with connect(db) as _db:
for row in _db.select():
q_indices = from_dbkey(row.key_value_pairs["grids"])
if dipole:
op.tensor_orig[q_indices] = np.inner(
row.dipole / debye_in_ase, efield
)
else:
op.tensor_orig[q_indices] = row.energy / Hartree - ref_ene
else:
raise TypeError
dic = {dofs: op}
return dic
[docs]
def construct_kinetic_operator(
dvr_prims: list[DVRPrimitivesMixin],
coefs: list[float] | None = None,
forms: str = "mpo",
) -> dict[tuple[tuple[int, int], ...], TensorOperator]:
r"""
.. note::
The off-diagonal terms of the kinetic operator are not zero in DVR basis.
Kinetic energy operator (KEO) in linear coordinate is represented by following matrix product operator (MPO):
.. math::
\begin{pmatrix}
-\frac{\hat{P}_1^2}{2} & 1
\end{pmatrix}
\begin{pmatrix}
1 & 0 \\
-\frac{\hat{P}_2^2}{2} & 1
\end{pmatrix}
\begin{pmatrix}
1 & 0 \\
-\frac{\hat{P}_3^2}{2} & 1
\end{pmatrix}
\begin{pmatrix}
1 \\
-\frac{\hat{P}_4^2}{2}
\end{pmatrix}
Args:
dvr_prims(List[DVRPrimitivesMixin]) : DVR functions
coefs (List[float]) : Coefficients of kinetic operator. \
Defaults to ``[1.0, 1.0, ...]``, i.e., :math:`\sum_i -\frac{1}{2}\frac{d^2}{dQ_i^2}` is given. \
For example, \
when one uses dimensionless coordinate and rotational coordinate, \
the kinetic operator is given by \
:math:`\hat{T} = -\frac{\omega}{2} \frac{d^2}{dx^2} - \frac{1}{2I} \frac{d^2}{d\theta^2}`,\
then ``coefs`` should be given by :math:`[\omega, 1/I]`.
forms (str) : Either 'sop' or 'mpo'. Defaults to 'mpo'.
"""
ndof = len(dvr_prims)
kinetic_operators = {}
if coefs is None:
coefs = [1.0 for _ in range(ndof)]
if forms.lower() == "mpo":
operator_key = tuple([(idof, idof) for idof in range(ndof)])
mpo = []
for i, (dvr_prim, coef) in enumerate(
zip(dvr_prims, coefs, strict=True)
):
if len(dvr_prims) == 1:
# 1-dimensional case
matrix = np.zeros(
(1, dvr_prim.ngrid, dvr_prim.ngrid, 1), dtype=np.complex128
)
matrix[0, :, :, 0] = (
-1 / 2 * dvr_prim.get_2nd_derivative_matrix_dvr() * coef
)
else:
if i == 0:
# [[-1/2 * d^2/dQ^2, 1]]
matrix = np.zeros(
(1, dvr_prim.ngrid, dvr_prim.ngrid, 2),
dtype=np.complex128,
)
matrix[0, :, :, 0] = (
-1 / 2 * dvr_prim.get_2nd_derivative_matrix_dvr() * coef
)
matrix[0, :, :, 1] = np.eye(dvr_prim.ngrid)
elif i == ndof - 1:
# [[1 ],
# [-1/2 * d^2/dQ^2]]
matrix = np.zeros(
(2, dvr_prim.ngrid, dvr_prim.ngrid, 1),
dtype=np.complex128,
)
matrix[0, :, :, 0] = np.eye(dvr_prim.ngrid)
matrix[1, :, :, 0] = (
-1 / 2 * dvr_prim.get_2nd_derivative_matrix_dvr() * coef
)
else:
# [[1, 0],
# [-1/2 * d^2/dQ^2, 1]]
matrix = np.zeros(
(2, dvr_prim.ngrid, dvr_prim.ngrid, 2),
dtype=np.complex128,
)
matrix[0, :, :, 0] = np.eye(dvr_prim.ngrid)
matrix[1, :, :, 0] = (
-1 / 2 * dvr_prim.get_2nd_derivative_matrix_dvr() * coef
)
matrix[1, :, :, 1] = np.eye(dvr_prim.ngrid)
mpo.append(matrix)
kinetic_operators[operator_key] = TensorOperator(mpo=mpo)
elif forms.lower() == "sop":
for idof, (dvr_prim, coef) in enumerate(
zip(dvr_prims, coefs, strict=True)
):
kinetic_operator = TensorOperator(
tensor=-1 / 2 * dvr_prim.get_2nd_derivative_matrix_dvr() * coef,
only_diag=False,
legs=(idof, idof),
)
kinetic_operators[((idof, idof),)] = kinetic_operator
else:
raise ValueError("forms must be 'sop' or 'mpo'")
return kinetic_operators
def _merge_nMR_operator_subspace(
nMR_operators: dict[tuple[int, ...], float | TensorOperator],
only_diag: bool = True,
) -> tuple[float | None, dict[tuple[int, ...], TensorOperator]]:
"""
Merge nMR operator which has same legs, \
such as V_123 += V_1 + V_2 + V_3 + V_12 + V_13 + V_23
Args:
nMR_operators (Dict[Tuple[int], TensorOperator]) : \
nMR operator dictionary
only_diag (bool) : Defaults to ``True``
Returns:
Tuple[float or complex, Dict[Tuple[int], TensorOperator]] : \
(scalar term, summed up nMR operator)
"""
scalar_term = None
nMR_items = list(nMR_operators.items())
nMR_items.sort(key=lambda x: -len(x[0]))
parent = dict()
sum_op_dict = dict()
different_only_diag_found = False
for mode_pair, operator in nMR_items:
if mode_pair == ():
assert isinstance(operator, float)
scalar_term = operator
continue
else:
assert isinstance(operator, TensorOperator)
if operator.only_diag == only_diag:
my_parent = mode_pair
for p_key in parent.keys():
if set(mode_pair) < set(p_key):
my_parent = parent[p_key]
if my_parent == mode_pair:
sum_op_dict[mode_pair] = operator
else:
sum_op_dict[my_parent] += operator
parent[mode_pair] = my_parent
else:
different_only_diag_found = True
if different_only_diag_found:
logger.error("different `only_diag` found.")
return (scalar_term, sum_op_dict)
[docs]
def database_to_dataframe(
db: str,
reference_id: int | None = None,
reference_grids: tuple[int, ...] | None = None,
) -> pl.DataFrame:
"""ASE Database to Polars DataFrame
DataFrame takes more memory than Database, \
but it is faster than Database.
Args:
db (str) : ASE Database path such as "foo/hoge.db". Extension must be ".db"
reference_id (int) : Database id of reference geometry
reference_grids (Optional, List[int]) : Grid indices of reference geometry
Returns:
pl.DataFrame : Polars DataFrame
- Dipole is saved in Debye.
- Energy is saved in Hartree.
"""
schema = [
"id",
"grids",
"grids_db",
"dofs",
"dofs_db",
"nMR",
"energy",
"dipole",
"distance",
]
if reference_grids is None and reference_id is None:
raise ValueError(
"Either `reference_id` or `reference_grid` must be specified."
)
data = []
with connect(db) as _db:
if reference_grids is None:
reference_grids = from_dbkey(
_db.get(id=reference_id).key_value_pairs["grids"]
)
if reference_id is None:
if isinstance(reference_grids, str):
reference_grids = from_dbkey(reference_grids)
reference_id = next(_db.select(grids=to_dbkey(reference_grids))).id
reference_atoms = _db.get(id=reference_id).toatoms()
reference_energy = _db.get(id=reference_id).energy / Hartree
logger.info(
f"reference_id: {reference_id}"
+ f" reference_grids: {reference_grids}"
+ f" reference_atoms: {reference_atoms}"
+ f" reference_energy: {reference_energy}"
)
for row in _db.select():
grids = from_dbkey(row.grids)
dofs = from_dbkey(row.dofs)
nMR = _get_mode_representation(grids, reference_grids)
_data_row = [row.id, grids, row.grids, dofs, row.dofs, nMR]
try:
_data_row.append(row.energy / Hartree)
except Exception:
_data_row.append(None)
try:
_data_row.append((row.dipole / debye_in_ase).tolist())
except Exception:
_data_row.append(None)
_data_row.append(_get_manhattan_distance(grids, reference_grids))
data.append(_data_row)
df = pl.DataFrame(data, schema, orient="row")
"""polars does not support pickle"""
df.write_parquet(db.replace(".db", ".parquet"))
"""When you want to load parquet file, use
df = pl.parquet(db.replace(".db", ".parquet"))
"""
logger.info(f"DataFrame is saved as {db.replace('.db', '.parquet')}")
return df
def _get_grids_df(df, grids, dofs):
"""This method is too lazy but I have no idea how to check filtering of list[int64]"""
for dof in dofs:
df = df.filter(pl.col("grids").apply(lambda x: x[dof] == grids[dof]))
return df
def _get_manhattan_distance(array1, array2) -> int:
"""Get Manhattan distance between two arrays"""
return int(np.sum(np.abs(np.array(array1) - np.array(array2))))
def _get_mode_representation(array, ref_array) -> int:
"""Get number ofmode representation
Args:
array (_ArrayLike) : Array to be compared
ref_array (_ArrayLike) : Reference array
Returns:
int : Number of mode representation
If you give (1,2,3) and (2,2,2) as array and ref_array, \
this function returns 2 because (1,2,3) and (2,2,2) have two different modes.
"""
return int(np.sum(np.array(array) != np.array(ref_array)))