"""
This module analyzes and manipulates macrostate probability distributions generated by FEASST
flat-histogram simulations.
See more information on FEASST at https://doi.org/10.18434/M3S095
"""
import copy
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.optimize import minimize_scalar
from scipy.optimize import minimize
[docs]
class MacrostateDistribution:
"""
Contains macrostates, the natural logarithm of the macrostate probability and other
per-macrostate quantities generated by a FEASST flat histogram simulation.
This may include canonical ensemble averages and blocks.
"""
[docs]
def __init__(self, file_name, macrostate_header='state', ln_prob_header='ln_prob', comment="#"):
"""
Constructs all the necessary attributes for a MacrostateDistribution.
:param str file_name:
The name of the csv file which stores per-macrostate quantities.
If None, do not read a file, and instead use set_dataframe later.
:param str macrostate_header:
In the grand canonical ensemble, the macrostates are the number of particles.
This is the header for that column in a pandas data frame.
:param str ln_prob_header:
The natural logarithm of the probability of observing each macrostate.
>>> from pyfeasst import macrostate_distribution
>>> import pandas as pd
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
"""
self._macrostate_header = macrostate_header
self._ln_prob_header = ln_prob_header
if file_name is not None:
self.set_dataframe(pd.read_csv(file_name, comment=comment))
self.set_minimum_smoothing()
[docs]
def set_dataframe(self, dataframe, normalize=True):
"""
Set the dataframe.
:param bool normalize: normalize the ln_prob after setting the dataframe.
"""
self._dataframe = dataframe.copy()
if normalize:
self.normalize()
[docs]
def concat_dataframe(self, dataframe, add_prefix):
"""Concatenate dataframe (e.g., for average energy per macrostate)."""
df2 = dataframe.add_prefix(add_prefix)
self.set_dataframe(pd.concat([self._dataframe, df2], axis=1))
[docs]
def dataframe(self):
"""Return the dataframe."""
return self._dataframe
[docs]
def set_macrostates(self, macrostates):
"""Set the macrostates in the dataframe."""
self._dataframe[self._macrostate_header] = macrostates
[docs]
def macrostates(self):
"""Return the macrostates."""
return self._dataframe[self._macrostate_header]
[docs]
def set_ln_prob(self, ln_prob):
"""Set the natural logarithm of the probability."""
self._dataframe[self._ln_prob_header] = ln_prob
[docs]
def ln_prob(self):
"""Return the natural logarithm of the probability."""
return self._dataframe[self._ln_prob_header]
[docs]
def set_minimum_smoothing(self, minimum_smoothing=10):
"""
During the search for the minimum, the number of macrostates that the minimum must be
smaller than in both directions.
Increase this if the data is noisey or if there are undulations in the ln_prob.
"""
self._minimum_smoothing = minimum_smoothing
[docs]
def minimum_smoothing(self):
'''Return the minimum smoothing.'''
return self._minimum_smoothing
[docs]
def normalize(self, *args):
"""
Set the sum of the probabilities of a series to one.
If no arguments provided, use the internal ln_prob.
If ln_prob is provided as a series, return the normalized series.
"""
if len(args) == 0:
self._dataframe[self._ln_prob_header] = self.normalize(self.ln_prob())
lnpi = self.ln_prob()
elif len(args) == 1:
lnpi = args[0]
mx = np.max(lnpi) # avoid exp overflow by shifting mx
lnpi -= np.log(sum(np.exp(args[0] - mx))) + mx
else:
assert False # unrecognized number of arguments.
return lnpi
[docs]
def average_macrostate(self):
"""
Return the average macrostate.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi.average_macrostate()), 8)
437.68207872
"""
return (np.exp(self.ln_prob()) * self.macrostates()).sum()
[docs]
def ensemble_average(self, header):
"""
Return the grand canonical ensemble average of the data present in header,
for example, canonical averages.
>>> import pandas as pd
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi.ensemble_average(header='energy')), 8)
-2701.39300435
"""
return (np.exp(self.ln_prob()) * self._dataframe[header]).sum()
[docs]
def plot(self, show=False, label=None, fontsize=16):
'''Create a matplotlib.pyplot plot of the ln_prob.'''
plt.plot(self.macrostates(), self.ln_prob(), label=label)
plt.xlabel('macrostate', fontsize=fontsize)
plt.ylabel(r'$\ln\Pi$', fontsize=fontsize)
if show:
plt.show()
[docs]
def minimums(self):
"""
Return the minimums of the ln_prob.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> mins = lnpi.minimums()
>>> len(mins)
1
>>> int(mins.values[0])
150
"""
lnp = self.ln_prob()
mns = lnp[(lnp.shift(1) > lnp) & (lnp.shift(-1) > lnp)]
for shift in range(2, self._minimum_smoothing+1):
mns = lnp[mns.notna() &
(lnp.shift(shift) > lnp) &
(lnp.shift(-shift) > lnp)]
return self.macrostates()[mns.notna().index]
[docs]
def split(self, minimum=-1):
"""
Return two MacrostateDistribution by splitting at the last minimum in the ln_prob.
Both MacrostateDistribution will contain the minimum.
:param int minimum: If -1, find the last minimum. Otherwise, supply it.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> vapor, liquid = lnpi.split()
>>> round(float(vapor.average_macrostate()), 8)
1.4212647
>>> round(float(vapor.ensemble_average('energy')), 8)
-0.04911667
>>> round(float(liquid.average_macrostate()), 8)
437.68207872
>>> round(float(liquid.ensemble_average('energy')), 8)
-2701.39300435
"""
if minimum == -1:
minimum = self.minimums().values[-1]
lower = copy.deepcopy(self)
upper = copy.deepcopy(self)
lower.set_dataframe(self._dataframe[:minimum].copy())
lower.normalize()
upper.set_dataframe(self._dataframe[minimum:].copy())
upper.normalize()
return [lower, upper]
[docs]
def reweight(self, delta_beta_mu, inplace=False):
"""
Reweight the ln_prob by delta_beta_mu.
If inplace, replace current ln_prob with the reweighted ones.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> reweight = lnpi.reweight(-1)
>>> round(float(reweight.dataframe()['ln_prob'][0]), 8)
-0.50108687
>>> reweight = lnpi.reweight(1.5)
>>> round(float(reweight.dataframe()['ln_prob'][0]), 8)
-811.74010444
"""
# avoid negative log by subtracting min
lnpi_rw = self.ln_prob() + self.macrostates()*delta_beta_mu - self.ln_prob().min()
if lnpi_rw.max() > 0:
lnpi_rw -= lnpi_rw.max()
lnpi_rw = self.normalize(lnpi_rw)
if inplace:
self.set_ln_prob(lnpi_rw)
self_rw = copy.deepcopy(self)
self_rw.set_ln_prob(lnpi_rw)
return self_rw
def _macrostate_objective(self, target_macrostate, delta_beta_mu):
"""
Return an object function to minimize in order to find delta_beta_mu that,
upon reweighting, results in an average macrostate equal to target_macrostate.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi._macrostate_objective(target_macrostate=10, delta_beta_mu=-1)), 8)
90.1413155
"""
lnpi_rw = self.reweight(delta_beta_mu)
return (target_macrostate - lnpi_rw.average_macrostate())**2
[docs]
def reweight_to_macrostate(self, target_macrostate, delta_beta_mu_guess=0, tol=1e-6):
"""
Reweight ln_prob and return the delta_beta_mu for an average target_macrostate.
:param float delta_beta_mu_guess: Provide a guess for delta_beta_mu which brings the ln_prob
closer to equilibrium.
:param float tol: Tolerance of minimization algorithm.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi.reweight_to_macrostate(target_macrostate=10)), 8)
-0.32218376
>>> round(float(lnpi.average_macrostate()), 5)
10.0
"""
res = minimize(lambda delta_beta_mu: self._macrostate_objective(
target_macrostate=target_macrostate,
delta_beta_mu=delta_beta_mu[0]), delta_beta_mu_guess, tol=tol)
delta_beta_mu_final = res.x[0]
self.reweight(delta_beta_mu_final, inplace=True)
return delta_beta_mu_final
def _equilibrium_objective(self, delta_beta_mu):
"""
Return an objective function to minimize in order to find equilibrium.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi._equilibrium_objective(-1)), 8)
3.81828183
"""
#print('delta_beta_mu', delta_beta_mu)
lnpi_rw = self.reweight(delta_beta_mu)
mins = lnpi_rw.minimums()
#print('num mins', len(mins))
if len(mins) == 0:
return_val = 2.+(lnpi_rw.ln_prob()[0] - lnpi_rw.ln_prob().values[-1])**2
elif len(mins) == 1:
mns = mins.values[0]
p_lo = np.exp(lnpi_rw.ln_prob()[:mns]).sum()
p_hi = np.exp(lnpi_rw.ln_prob()[mns+1:]).sum()
# if one side is greatly favored over the other, numerical pecision
# becomes an issue. Instead, nudge toward conjugate that alters this disaparity
if abs(p_lo - 1) < 1e-6:
return_val = 1.1+np.exp(-delta_beta_mu)
elif abs(p_hi - 1) < 1e-6:
return_val = 1.1+np.exp(delta_beta_mu)
else:
return_val = abs(p_lo-p_hi)
else:
#lnpi_rw.plot()
#plt.show()
assert False # more than one minima
return return_val
[docs]
def equilibrium(self, delta_beta_mu_guess=-0.1, tol=1e-6):
"""
Reweight ln_prob to equilibrium and return the delta_beta_mu of the equilibrium condition.
:param float delta_beta_mu_guess: Provide a guess for delta_beta_mu which brings the ln_prob
closer to equilibrium.
:param float tol: Tolerance of minimization algorithm.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.MacrostateDistribution(file_name='../../tests/lnpi.csv')
>>> round(float(lnpi.equilibrium(delta_beta_mu_guess=-1, tol=1e-4)), 8)
-0.31323799
>>> vapor, liquid = lnpi.split()
>>> round(float(vapor.average_macrostate()/8**3), 8)
0.00200035
>>> round(float(liquid.average_macrostate()/8**3), 8)
0.84304803
"""
res = minimize_scalar(self._equilibrium_objective,
tol=tol,
bracket=(0, delta_beta_mu_guess))
delta_beta_mu_equilibrium = res["x"]
self.reweight(delta_beta_mu_equilibrium, inplace=True)
return delta_beta_mu_equilibrium
[docs]
def splice(windows, extra_overlap=0):
"""
Return a MacrostateDistribution that is spliced together from multiple MacrostateDistribution.
Each MacrostateDistribution must overlap by atleast one macrostate with its neighbor.
If macrostates overlap by more than one (extra_overlap), simply drop the
macrostates in the upper MacrostateDistribution.
>>> from pyfeasst import macrostate_distribution
>>> lnpis=list()
>>> for i in range(4):
... lnpis.append(macrostate_distribution.MacrostateDistribution(
... file_name='../../tests/lnpin'+str(i)+'.csv'))
>>> lnpi = macrostate_distribution.splice(lnpis)
>>> lnpi.set_minimum_smoothing(50)
>>> round(float(lnpi.average_macrostate()), 8)
596.29289902
>>> len(lnpi.minimums())
0
>>> round(float(lnpi.equilibrium(delta_beta_mu_guess=-1.5)), 8)
-1.2768126
>>> vapor, liquid = lnpi.split()
>>> round(float(vapor.average_macrostate()), 8)
0.03172081
>>> round(float(liquid.average_macrostate()), 8)
523.35530815
"""
macros = list()
max_minimum_smoothing = 0
for index, lnpi in enumerate(windows):
if max_minimum_smoothing < lnpi.minimum_smoothing():
max_minimum_smoothing = lnpi.minimum_smoothing()
if index != 0:
prev_last_index = len(windows[index-1].ln_prob()) - 1
shift = windows[index-1].ln_prob().values[prev_last_index-extra_overlap] - \
lnpi.ln_prob().values[0]
lnpi.set_ln_prob(lnpi.ln_prob() + shift)
for _ in range(1 + extra_overlap):
lnpi.set_dataframe(lnpi.dataframe().iloc[1:, :], normalize=False)
lnpi.set_macrostates(lnpi.macrostates() + windows[index-1].macrostates().values[-1])
macros.append(lnpi.dataframe())
macros_concat = pd.concat(macros)
macros_concat.reset_index(inplace=True)
combined = copy.deepcopy(windows[0])
combined.set_dataframe(macros_concat.copy())
combined.set_minimum_smoothing(max_minimum_smoothing)
return combined
[docs]
def splice_files(prefix, suffix, ln_prob_header=None, extra_overlap=0):
"""
As above, Return a MacrostateDistribution that is spliced together.
But this time, provide the prefix and suffix of the filenames and search the files.
>>> from pyfeasst import macrostate_distribution
>>> lnpi = macrostate_distribution.splice_files(prefix='../../tests/lj_lnpin', suffix='.txt')
>>> round(float(lnpi.equilibrium()), 8)
-0.31402411
>>> vapor, liquid = lnpi.split()
>>> round(float(vapor.average_macrostate()/8**3), 8)
0.00199501
>>> round(float(liquid.average_macrostate()/8**3), 8)
0.84318346
"""
lnpis = list()
for filename in sorted(Path('.').glob(prefix+'*'+suffix)):
if ln_prob_header is None:
lnpis.append(MacrostateDistribution(file_name=filename))
else:
lnpis.append(MacrostateDistribution(file_name=filename, ln_prob_header=ln_prob_header))
return splice(lnpis, extra_overlap)
[docs]
def read_appended(file_name, num_states):
"""
Read an appended ln_prob file, and return a list of pairs of MacrostateDistributions and
parameters from a required first line comment.
>>> from pyfeasst import macrostate_distribution
>>> dists = macrostate_distribution.read_appended(
... '../../tests/lj_lnpi_appended.txt', num_states=2)
>>> dists[10][1]['cpu_hours']
0.117464
>>> round(float(dists[10][0].ln_prob()[0]), 8)
-4.68014742
"""
with open(file_name, 'r') as file1:
lines = file1.readlines()
num = int(len(lines)/(num_states+2))
dists = list()
for i in range(num):
exec('iprm={' + lines[i*(num_states+2)][1:] + '}', globals())
dist = MacrostateDistribution(file_name=None)
dist.set_dataframe(pd.read_csv(file_name, comment='#', skiprows=i*(num_states+2), nrows=num_states))
dists.append([dist,iprm])
return dists
[docs]
def splice_collection_matrix(prefix, suffix, use_soft=False):
"""
Splice collection matrix of files and return a MacrostateDistribution with ln_prob computed
from the P_up and P_down matrix elements.
:param bool use_soft: use soft_min and soft_max to trim the file before concatenation.
>>> from pyfeasst import macrostate_distribution
>>> lnp = macrostate_distribution.splice_collection_matrix(prefix="../../tests/lj_crit",
... suffix=".txt")
>>> round(float(lnp.ln_prob()[0]), 8)
-12.27534328
"""
lnpis = list()
for filename in sorted(Path('.').glob(prefix+'*'+suffix)):
frame = pd.read_csv(filename, comment="#")
if use_soft:
with open(filename, 'r') as file1:
lines = file1.readlines()
exec('iprm={' + lines[0][1:] + '}', globals())
frame = frame[iprm['soft_min']:iprm['soft_max']+1]
lnpis.append(frame)
lnp = MacrostateDistribution(file_name=None)
df = pd.concat(lnpis)
df['ln_prob'] = df['ln_prob'].astype(float)
df.sort_values(by=['state'], inplace=True)
df.reset_index(inplace=True)
df['delta_ln_prob'] = np.log(df['P_up'].shift(1)/df['P_down'])
df['delta_ln_prob'] = df['delta_ln_prob'].fillna(0.0)
for index,_ in enumerate(df['delta_ln_prob']):
if index == 0:
df.loc[index, 'ln_prob'] = 0.0
else:
df.loc[index, 'ln_prob'] = df['delta_ln_prob'].values[index] + \
df['ln_prob'].values[index-1]
df.loc[0, 'delta_ln_prob'] = float('nan')
lnp.set_dataframe(df)
return lnp
if __name__ == "__main__":
import doctest
doctest.testmod()