Expanded grand canonical ensemble transition-matrix Monte Carlo with RPMΒΆ

In this example, the RPM model is simulated with expanded grand canonical ensembles. See https://doi.org/10.1063/1.5123683

[4]:
import unittest
import feasst as fst

def rpm_egce(proc='', macro_min=0, macro_max=4):
    monte_carlo = fst.MonteCarlo()
    monte_carlo.set(fst.rpm(fst.args({
        "cubic_box_length": "12",
        "cutoff": "4.891304347826090",
        "alphaL": "6.87098396396261"})))
    temperature = 0.047899460618081
    beta_mu = -13.94
    monte_carlo.set(fst.MakeThermoParams(fst.args({"beta": str(1/temperature),
                  "chemical_potential0": str(beta_mu*temperature),
                  "chemical_potential1": str(beta_mu*temperature)})))
    criteria = fst.MakeFlatHistogram(
        fst.MakeMacrostateNumParticles(
            fst.Histogram(fst.args({"width": "1", "max": str(macro_max), "min": str(macro_min)}))),
        fst.MakeTransitionMatrix(fst.args({"min_sweeps": "10"})),
        fst.MakeAEqualB(fst.args({"extra_A": "1"})))
    monte_carlo.set(criteria)
    monte_carlo.add(fst.MakeTrialTranslate(fst.args({"weight": "0.25", "tunable_param": "0.1"})))
    monte_carlo.add(fst.MakeTrialTransfer(fst.args({"weight": "1", "particle_type": "0"})))
    monte_carlo.add(fst.MakeTrialTransfer(fst.args({"weight": "1", "particle_type": "1"})))
    steps_per = str(int(1e5))
    monte_carlo.add(fst.MakeCriteriaUpdater(fst.args({"steps_per": steps_per})))
    monte_carlo.add(fst.MakeCriteriaWriter(fst.args({"steps_per": steps_per, "file_name": "crit" + proc + ".txt"})))
    monte_carlo.add(fst.MakeLogAndMovie(fst.args({"file_name": "rpm" + proc, "steps_per": steps_per})))
    monte_carlo.add(fst.MakeCheckEnergyAndTune(fst.args({"steps_per": steps_per})))
    energy = fst.MakeEnergy(fst.args({"file_name": "rpm_fh_energy" + proc,
                                      "steps_per_update": "1",
                                      "steps_per_write": steps_per,
                                      "multistate": "true"}))
    monte_carlo.add(energy)
    return monte_carlo

class TestEGCERPM(unittest.TestCase):
    """Test flat histogram grand canonical ensemble Monte Carlo simulations"""
    def test_serial_4max(self):
        """Compare the free energies and potential energies with the previously
        published values: https://doi.org/10.1063/1.5123683
        """
        monte_carlo=rpm_egce(macro_min=0, macro_max=4)
        monte_carlo.run_until_complete()

        lnpi_previous = [
            [-1.2994315780357, 0.05],
            [-1.08646312498868, 0.05],
            [-0.941850889679828, 0.05]]
        energy_previous = [
            [0, 1e-14],
            [-0.115474, 1e-6],
            [-0.939408, 0.02],
            [-1.32485, 0.03],
            [-2.02625, 0.04]]

        # reduce the lnpi by skipping every other macrostate (when A = B + 1)
        fh = fst.FlatHistogram(monte_carlo.criteria())
        lnpi5 = fh.bias().ln_prob()
        lnpi3 = lnpi5.reduce(2)

        for macro in range(lnpi3.size()):
            self.assertAlmostEqual(
                lnpi_previous[macro][0],
                lnpi3.value(macro),
                delta=5.*lnpi_previous[macro][1])

        for macro in range(fh.num_states()):
            energy_analyzer = monte_carlo.analyze(monte_carlo.num_analyzers() - 1)
            energy_accumulator = energy_analyzer.analyze(macro).accumulator()
            stdev = (energy_previous[macro][1]**2 + energy_accumulator.block_stdev()**2)**(1./2.)
            self.assertAlmostEqual(
                energy_previous[macro][0],
                energy_accumulator.average(),
                delta=10*stdev)

    def test_parallel(self):
        windows=fst.WindowExponential(fst.args({
            "alpha": "2",
            "num": "4",
            "maximum": "650",
            "extra_overlap": "2"})).boundaries()
        #windows=[[0,3],[1,16],[14,25]]
        print(windows)

        clones = fst.Clones()
        for proc, win in enumerate(windows):
            clones.add(rpm_egce(macro_min=win[0], macro_max=win[1], proc=str(proc)))
        clones.initialize_and_run_until_complete()
        print(clones.ln_prob().values())
[5]:
%%time

def suite():
    suite = unittest.TestSuite()
    suite.addTest(TestEGCERPM('test_serial_4max'))
    # suite.addTest(TestEGCERPM('test_parallel'))
    return suite

runner = unittest.TextTestRunner()
runner.run(suite())
#unittest.main(argv=[''], verbosity=2, exit=False)
.
CPU times: user 24.4 s, sys: 51.5 ms, total: 24.4 s
Wall time: 24.3 s

----------------------------------------------------------------------
Ran 1 test in 24.298s

OK
[5]:
<unittest.runner.TextTestResult run=1 errors=0 failures=0>

Did this tutorial work as expected? Did you find any inconsistencies or have any comments? Please contact us. Any feedback is appreciated!