"""Property handling module"""
import math
import os
import netCDF4 as nc
import numpy as np
from loguru import logger as _logger
import pytdscf._helper as helper
from pytdscf import units
from pytdscf._const_cls import const
from pytdscf._mps_cls import MPSCoef
from pytdscf._mps_parallel import MPSCoefParallel
from pytdscf.model_cls import Model
from pytdscf.wavefunction import WFunc
logger = _logger.bind(name="main")
[docs]
class Properties:
"""Structure class of calculated property
Attributes:
wf (WFunc): wave function
model (Model): model
time (float): time in atomic units
t2_trick (bool): whether to use so-called T/2 trick
autocorr (complex): auto-correlation function
energy (float): energy
pops (List[float]): populations of each state
expectations (List[float]): observables of given operator
"""
def __init__(
self,
wf: WFunc,
model: Model,
time=0.0,
t2_trick=True,
wf_init=None,
reduced_density=None,
):
self.wf = wf
self.model = model
self.time = time
self.nstep = 0
self.nc_row = 0
self.t2_trick = t2_trick
self.autocorr: complex | None = None
self.energy: float | None = None
self.norm: float | None = None
self.auto_logger = _logger.bind(name="autocorr")
self.pop_logger = _logger.bind(name="populations")
self.exp_logger = _logger.bind(name="expectations")
self.bonddim_logger = _logger.bind(name="bonddim")
self.pops = None
self.expectations = {}
self.wf_zero = wf_init
self.nc_file: str | None = None
assert t2_trick or (wf_init is None)
if reduced_density is not None:
self.rd_step = reduced_density[1]
self.remain_legs: list[tuple[int, ...]] | None = []
self.rd_keys = []
for key in reduced_density[0]:
rd_points = sorted(key, reverse=True)
_remain_legs = [0 for isite in range(rd_points[0] + 1)]
isite = 0
while rd_points:
if isite == rd_points[-1]:
_remain_legs[isite] += 1
rd_points.pop()
else:
isite += 1
assert all([0 <= leg <= 2 for leg in _remain_legs])
self.remain_legs.append(tuple(_remain_legs))
self.rd_keys.append(key)
if self.nc_file is None:
self.nc_file = self._create_nc_file(reduced_density)
else:
self.rd_step = None
self.remain_legs = None
[docs]
def get_properties(
self,
*,
autocorr=True,
energy=True,
norm=True,
populations=True,
observables=True,
bonddim=True,
autocorr_per_step=1,
energy_per_step=1,
norm_per_step=1,
populations_per_step=1,
observables_per_step=1,
bonddim_per_step=1,
):
if autocorr and self.nstep % autocorr_per_step == 0:
self._get_autocorr()
if energy and self.nstep % energy_per_step == 0:
self._get_energy()
if norm and self.nstep % norm_per_step == 0:
self._get_norm()
if populations and self.nstep % populations_per_step == 0:
self._get_pops()
if observables and self.nstep % observables_per_step == 0:
self._get_observables()
if self.remain_legs is not None:
if self.nstep % self.rd_step == 0:
self._export_reduced_density()
if bonddim and self.nstep % bonddim_per_step == 0 and const.adaptive:
self._get_bonddim()
def _export_reduced_density(self):
# Get reduced densities using all ranks
all_densities = []
assert isinstance(self.remain_legs, list)
if const.mpi_size == 1:
for remain_leg in self.remain_legs:
densities = self.wf.get_reduced_densities(remain_leg)
all_densities.append(densities)
else:
for base_tag, rd_key in enumerate(self.rd_keys):
assert isinstance(self.wf.ci_coef, MPSCoefParallel)
densities = self.wf.ci_coef.get_reduced_densities(
base_tag, rd_key
)
all_densities.append(densities)
# Only rank 0 writes to file
if const.mpi_rank == 0:
assert isinstance(self.nc_file, str)
complex128 = np.dtype([("real", np.float64), ("imag", np.float64)])
with nc.Dataset(self.nc_file, "a") as f:
# Maybe we should keep files open while the simulation is running.
f.variables["time"][self.nc_row] = self.time * units.au_in_fs
for densities, key in zip(
all_densities, self.rd_keys, strict=True
):
for istate in range(self.model.nstate):
data = np.empty(densities[istate].shape, complex128)
data["real"] = densities[istate].real
data["imag"] = densities[istate].imag
f.variables[f"rho_{key}_{istate}"][self.nc_row] = data
self.nc_row += 1
@helper.rank0_only
def _create_nc_file(
self, reduced_density: tuple[list[tuple[int, ...]], int]
) -> str:
jobname = const.jobname
path_to_nc = f"{jobname}/reduced_density.nc"
if os.path.exists(path_to_nc):
os.remove(path_to_nc)
with nc.Dataset(
f"{jobname}/reduced_density.nc", "w", format="NETCDF4"
) as f:
f.createDimension("step", None)
f.createDimension("state", self.model.nstate)
complex128 = np.dtype([("real", np.float64), ("imag", np.float64)])
complex128_t = f.createCompoundType(complex128, "complex128")
modes = set()
for key in reduced_density[0]:
# key must be ascending order.
assert key == tuple(sorted(key)), (
f"Reduced density key {key} must be ascending order"
)
modes = modes.union(set(key))
for idof in key:
if f"Q{idof}" not in f.dimensions:
if const.space == "hilbert":
f.createDimension(
f"Q{idof}",
self.model.basinfo.get_ngrid(0, idof),
)
elif const.space == "liouville":
f.createDimension(
f"Q{idof}",
math.isqrt(
self.model.basinfo.get_ngrid(0, idof)
),
)
else:
raise ValueError(f"Invalid space: {const.space}")
f.createVariable("time", "f8", ("step",))
for key in reduced_density[0]:
if len(key) > 3:
logger.warning(
f"key {key} is too large to be saved in netCDF4"
)
for istate in range(self.model.nstate):
dimensions = ("step",) + tuple(f"Q{idof}" for idof in key)
f.createVariable(
f"rho_{key}_{istate}", complex128_t, dimensions
)
return path_to_nc
def _get_autocorr(self):
if self.t2_trick:
if (
const.standard_method
and isinstance(self.wf.ci_coef, MPSCoef)
and const.mpi_size == 1
):
self.autocorr = self.wf._ints_wf_ovlp_mpssm(
self.wf.ci_coef, conj=False
)
else:
self.autocorr = self.wf.autocorr()
else:
if const.mpi_size > 1:
raise NotImplementedError(
"MPI is not implemented in the non-T/2 trick"
)
if const.doDVR:
if const.standard_method:
self.autocorr = self.wf_zero._ints_wf_ovlp_mpo(
self.wf.ci_coef
)
else:
raise NotImplementedError(
"DVR-MCTDH with MPO is not implemented"
)
else:
self.autocorr = self.wf_zero._ints_wf_ovlp_sop(
self.wf.ci_coef, self.wf.spf_coef, self.model.hamiltonian
)
def _get_energy(self):
self.energy = self.wf.expectation(self.model.hamiltonian)
def _get_norm(self):
self.norm = self.wf.norm()
def _get_pops(self):
self.pops = self.wf.pop_states()
def _get_observables(self):
for obs_key, matOp in self.model.observables.items():
self.expectations[obs_key] = self.wf.expectation(matOp)
def _get_bonddim(self):
self.bonddim = self.wf.bonddim()
[docs]
def export_properties(
self,
*,
autocorr_per_step=1,
populations_per_step=1,
observables_per_step=1,
bonddim_per_step=1,
):
if self.nstep % autocorr_per_step == 0:
self._export_autocorr()
if self.nstep % populations_per_step == 0:
self._export_populations()
if self.nstep % observables_per_step == 0:
self._export_expectations()
if const.adaptive and self.nstep % bonddim_per_step == 0:
self._export_bonddim()
self._export_properties()
@helper.rank0_only
def _export_autocorr(self):
if self.autocorr is None:
return
if self.time == 0.0:
self.auto_logger.debug("# time [fs]\t auto-correlation")
if self.t2_trick:
time_fs = self.time * units.au_in_fs * 2
else:
time_fs = self.time * units.au_in_fs
self.auto_logger.debug(
f"{time_fs:6.9f}\t"
+ f"{self.autocorr.real: 6.9f}{self.autocorr.imag:+6.9f}j"
)
@helper.rank0_only
def _export_populations(self):
if self.pops is None:
return
if self.time == 0.0:
self.pop_logger.debug(
"# time [fs]\t"
+ "\t".join(
[
f"pop_{i}" + " " * (11 - len(f"pop_{i}"))
for i in range(len(self.pops))
]
)
)
pop_msg = f"{self.time * units.au_in_fs:6.9f}\t"
for pop in self.pops:
pop_msg += f"{pop:6.9f}\t"
pop_msg.rstrip("\t")
self.pop_logger.debug(pop_msg)
@helper.rank0_only
def _export_expectations(self):
if self.expectations == {}:
return
if self.time == 0.0:
self.exp_logger.debug(
"# time [fs]\t"
+ "\t".join(
[
f"{obs_key}" + " " * (11 - len(f"{obs_key}"))
for obs_key in self.expectations.keys()
]
)
)
exp_msg = f"{self.time * units.au_in_fs:6.9f}\t"
for exp in self.expectations.values():
exp_msg += f"{exp:6.9f}\t"
exp_msg.rstrip("\t")
self.exp_logger.debug(exp_msg)
@helper.rank0_only
def _export_bonddim(self):
if self.bonddim is None:
return
if self.time == 0.0:
self.bonddim_logger.debug(
"# time [fs]\t"
+ "\t".join(f"{i}" for i in range(len(self.bonddim)))
)
bonddim_msg = f"{self.time * units.au_in_fs:6.9f}\t"
for bonddim in self.bonddim:
bonddim_msg += f"{bonddim}\t"
bonddim_msg.rstrip("\t")
self.bonddim_logger.debug(bonddim_msg)
@helper.rank0_only
def _export_properties(self):
time_fs = self.time * units.au_in_fs
norm = self.norm
pop_states = self.pops
energy = self.energy
autocorr = self.autocorr
if norm is not None:
threshold = (
1.0e-02 if const.adaptive or const.mpi_size > 1 else 1e-06
)
if abs(norm - 1.0) > threshold:
logger.warning(
f"Wave Function norm is not 1.0, but {norm} when {time_fs} fs"
)
message = ""
if const.verbose > 1 and autocorr is not None:
message += (
f"| autocorr: {autocorr.real: 6.4f}{autocorr.imag:+6.4f}i"
)
if const.verbose > 1:
if pop_states is not None:
message += "| pop" + (" {:6.4f} " * len(pop_states[:3])).format(
*pop_states[:3]
)
if energy is not None:
message += f"| ene[eV]: {energy * units.au_in_eV:10.7f} "
message += (
f"| time[fs]: {time_fs:8.3f} "
+ f"| elapsed[sec]:{helper._ElpTime.steps:9.2f} "
)
if const.verbose == 4:
mflops = (
(
helper._NFlops.ci_expo
+ helper._NFlops.ci_renm
+ helper._NFlops.ci_mfop
)
/ pow(10, 6)
/ max(0.01, helper._ElpTime.zgemm)
)
if not const.standard_method:
message += (
f"| spf:{helper._ElpTime.spf:5.1f} "
+ f"| mfop:{helper._ElpTime.mfop:5.1f} "
)
message += (
f"| ci:{helper._ElpTime.ci:5.1f} "
+ f" (ci_exp:{helper._ElpTime.ci_exp:5.1f}"
+ f"|ci_rnm:{helper._ElpTime.ci_rnm:5.1f}"
+ f"|ci_etc:{helper._ElpTime.ci_etc:5.1f} d) "
+ f"|{mflops:5.0f} MFLOPS "
+ f"({helper._ElpTime.zgemm:5.1f} s) "
)
logger.debug(message)
[docs]
def update(self, delta_t):
"""update time"""
self.time += delta_t
self.nstep += 1
if const.doTDHamil:
raise NotImplementedError
self.model.hamiltonian = self.model.build_td_hamiltonian(
time_fs=self.time * units.au_in_fs
)