Source code for fstio

"""
This module provides some utility input / output functions for use with the FEASST simulation program.
"""

import os
import time
import sys
import random
import json
#import socket
import subprocess
from multiprocessing import Pool
import multiprocessing
from itertools import repeat
import numpy as np
from pathlib import Path

[docs]def vector3d_to_list(vec): """ Converts a swig stl vector to python list >>> from pyfeasst import fstio >>> fstio.vector3d_to_list([[[[0]]]]) [[[[0]]]] """ lst = list() for _, vec1 in enumerate(vec): lst2 = list() for _, vec2 in enumerate(vec1): lst3 = list() for _, vec3 in enumerate(vec2): lst3.append(vec3) lst2.append(lst3) lst.append(lst2) return lst
[docs]def read_checkpoint(filename): """ Return contents of checkpoint file as a string >>> from pyfeasst import fstio >>> table = fstio.read_checkpoint('../../tests/tutorial_0_table.txt') >>> table[:13] '6867 11 11 11' """ with open (filename, "r") as myfile: checkpoint=myfile.readlines() assert(len(checkpoint) == 1) # checkpoint files should have only one line return checkpoint[0]
[docs]def all_sims_complete(filename, num_sims): """ Read filename and see if all sim ID's from [0, num_sims-1] are present (e.g., complete). If no file is present, also consider the simulation incomplete. >>> from pyfeasst import fstio >>> all_sims_complete('../../tests/lj_sim_ids.txt', 8) True >>> all_sims_complete('../../tests/lj_sim_ids2.txt', 8) False >>> all_sims_complete('../../tests/not_a_file.txt', 8) False """ if not os.path.isfile(filename): return False with open(filename, 'r') as file1: lines = file1.read().splitlines() ids = map(str, list(range(num_sims))) for line in lines: ids = [i for i in ids if i not in line] if len(ids) == 0: return True return False
[docs]def slurm_single_node(params): """ Write slurm script to fill one node. :param dict params: Must have the following keys: procs_per_node: number of processors per node, procs_per_sim: number of processors per simulation, minutes: maximum number of minutes for job in queue, prefix: prefix for all output file names, script: script file name, sim_id_file: filename to write simulation id's for later checking of status, max_restarts: maximum number of restarts, node: node index. Optional keys: scratch: location of local scratch space on HPC node (disabled by default), scratch_hours_per_sync: hours between synching scratch to destination (default: 5). This function also adds the key 'queue_command' to the params dictionary, which is assumed to output the job id. """ if not 'max_restarts' in params: params['max_restarts'] = '0' params['queue_command'] = "sbatch --array=0-" + str(params['max_restarts']) + "%1 " + params['prefix'] + "_slurm.txt" params['command_to_queue_id'] = " | tail -1 | awk '{print $4}'" if params['scratch'] == None: params['scratch_slurm_preamble'] = '' params['scratch_slurm_postamble'] = '' else: if 'scratch_hours_per_sync' not in params: params['scratch_seconds_per_sync'] = 5*60*60 else: params['scratch_seconds_per_sync'] = 60*60*params['scratch_hours_per_sync'] params['scratch_slurm_preamble'] = """original_dir=$PWD; echo $original_dir scratch={scratch}/$LOGNAME/${{SLURM_ARRAY_JOB_ID}}_${{SLURM_ARRAY_TASK_ID}}/; mkdir -p $scratch; cd $scratch; echo "scratch:$scratch" rsync -au $original_dir/* . echo "while [ 1 -le 2 ]; do sleep {scratch_seconds_per_sync}; rsync -au * $original_dir/; done" > {prefix}_sync.sh chmod a+x {prefix}_sync.sh ./{prefix}_sync.sh & ls""".format(**params) params['scratch_slurm_postamble'] = 'rsync -au . $original_dir/' if not 'queue_flags' in params: params['queue_flags'] = '' with open(params['prefix'] + '_slurm.txt', 'w', encoding='utf-8') as myfile: myfile.write("""#!/bin/bash #SBATCH -n {procs_per_node} #SBATCH -N 1 #SBATCH -t {minutes}:00 #SBATCH -o {prefix}_slurm_%A_%a.txt #SBATCH -e {prefix}_slurm_%A_%a.txt #SBATCH {queue_flags} echo "Running ID ${{SLURM_ARRAY_JOB_ID}}_${{SLURM_ARRAY_TASK_ID}} on $(hostname) at $(date) in $PWD" cd $PWD {scratch_slurm_preamble} export OMP_NUM_THREADS={procs_per_sim} python {script} --run_type 0 --node {node} --queue_id $SLURM_ARRAY_JOB_ID --queue_task $SLURM_ARRAY_TASK_ID {scratch_slurm_postamble} if [ $? == 0 ] && [ ! -f {sim_id_file} ]; then echo "Job is done" scancel $SLURM_ARRAY_JOB_ID else echo "Job is terminating, to be restarted again" fi echo "Time is $(date)" """.format(**params))
def server(params, args, sim_node_dependent_params, write_feasst_script): if 'port' not in params: params['port'] = 54321 if 'buffer_size' not in params: params['buffer_size'] = 1000 #print('starting server localhost:', params['port']) params['server_port'] = params['port'] + params['sim'] if write_feasst_script == None: syscode = subprocess.call( """echo "Server port {server_port} buffer_size {buffer_size}" |""".format(**params) + args.feasst_install+'bin/fst > '+params['prefix']+str(params['sim'])+'_run.log', shell=True, executable='/bin/bash') else: syscode = seed_param_write_run(params['sim'], params, args, sim_node_dependent_params, write_feasst_script) return syscode def reseed(params): #print('reseeding?', 'seed' in params) if 'seed' in params: if params['seed'] == -1: params['seed'] = random.randrange(int(1e9)) #print('reseeding', params['seed']) def seed_param_write_run(sim, params, args, sim_node_dependent_params, write_feasst_script): reseed(params) if sim_node_dependent_params: sim_node_dependent_params(params) file_name = params['prefix']+str(sim)+'_run' write_feasst_script(params, script_file=file_name+'.txt') syscode = subprocess.call( args.feasst_install+'bin/fst < '+file_name+'.txt > '+file_name+'.log', shell=True, executable='/bin/bash') return syscode
[docs]def run_single(sim, params, args, sim_node_dependent_params, write_feasst_script, post_process): """ Run a single simulation. If all simulations are complete, run PostProcess. """ if args.queue_task == 0: params['sim'] = sim syscode = seed_param_write_run(sim, params, args, sim_node_dependent_params, write_feasst_script) else: # if queue_task < 1, restart from checkpoint syscode = subprocess.call( args.feasst_install+'bin/rst '+params['prefix']+str(sim)+'_checkpoint.fst', shell=True, executable='/bin/bash') if syscode == 0: # if simulation finishes with no errors, write to sim id file with open(params['sim_id_file'], 'a', encoding='utf-8') as file1: file1.write(str(sim)+'\n') # if all sims are complete, post process or test once (by removing sim id file) if all_sims_complete(params['sim_id_file'], params['num_sims']): os.remove(params['sim_id_file']) if post_process is not None: post_process(params) return syscode
[docs]def split(a, n): """ From https://stackoverflow.com/questions/2130016/splitting-a-list-into-n-parts-of-approximately-equal-length >>> from pyfeasst import fstio >>> list(fstio.split(range(11), 3)) [range(0, 4), range(4, 8), range(8, 11)] """ k, m = divmod(len(a), n) return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))
def read_write_params(args, params): #print('queue id', args.queue_id) if args.queue_id != -1: # if run from queue #print('args.queue_task', args.queue_task) if args.queue_task == 0: # read param file if not checkpoint with open(params['prefix']+'_params'+str(args.queue_id)+'.json', 'r') as file1: params = json.load(file1) #print('params', params) else: with open(params['prefix']+'_params.json', 'w') as file1: file1.write(json.dumps(params, indent=2)) return params
[docs]def run_simulations(params, queue_function, args, write_feasst_script=None, client=None, sim_node_dependent_params=None, post_process=None): """ Run a simulation either locally in the shell or queue on HPC nodes :param dict params: Must have the following keys: sim_id_file: filename to write simulation id's for later checking of status, prefix: prefix for all output file names, max_restarts: maximum number of restarts, procs_per_node: number of processors per node, procs_per_sim: number of processors per sim, num_nodes: number of nodes, node: node index. :param function sim_node_dependent_params: The name of the function that assigns parameters based on the sim and node. The only argument is the parameters. :param function write_feasst_script: The name of the function to write the feasst text interface file, which has the first argment as the parameters and the second argument as the filename. :param function client: If write_feasst_script is None, instead run in server mode with default port 54321 and default buffer_size 1000. :param function post_process: The name of the function to post process all simulations once complete, and has the only argument as the params. :param function queue_function: The name of the function to queue one node and has the only argument as the params. :param namespace args: Arguments from argparse. """ with open(params['sim_id_file'], 'w') as file1: file1.close() # clear file, then append sim id when complete if args.run_type == 0: # run directly #print('getting params') params = read_write_params(args, params) #print('have params', params['beta']) if client is None: #print('Run text interface') with Pool(params['procs_per_node']) as pool: sims = list(split(range(params['num_sims']), params['num_nodes']))[params['node']] codes = pool.starmap(run_single, zip(sims, repeat(params), repeat(args), repeat(sim_node_dependent_params), repeat(write_feasst_script), repeat(post_process))) if np.count_nonzero(codes) > 0: sys.exit(1) else: #print('Run in server client mode') sims = list(split(range(params['num_sims']), params['num_nodes']))[params['node']] #procs = list() seed = 0 #print('beta', params['beta']) if 'seed' in params: seed = params['seed'] server_procs = list() client_procs = list() for s in sims: params['sim'] = s #print('s', s) proc = multiprocessing.Process(target=server, args=(params, args, sim_node_dependent_params, write_feasst_script)) proc.start() server_procs.append(proc) time.sleep(1) # wait for servers to start before connecting client for s in sims: params['sim'] = s #print('s', s) if seed == -1: params['seed'] = random.randrange(int(1e9)) proc = multiprocessing.Process(target=client, args=(params,)) proc.start() client_procs.append(proc) for proc in client_procs: proc.join() for proc in server_procs: proc.terminate() if post_process is not None: post_process(params) elif args.run_type == 1: # queue queue_id_file = params['prefix']+ '_queue_ids.txt' with open(queue_id_file, 'w') as file1: file1.close() # empty file contents for node in range(params['num_nodes']): params['node'] = node queue_function(params) subprocess.check_call(params['queue_command'] + params['command_to_queue_id'] + " >> " + queue_id_file, shell=True, executable='/bin/bash') with open(queue_id_file, 'r') as file1: queue_id = file1.read().splitlines()[-1] with open(params['prefix']+'_params'+queue_id+'.json', 'w') as file1: file1.write(json.dumps(params, indent=2)) elif args.run_type == 2: # post process post_process(params) else: assert False # unrecognized run_type
[docs]def combine_tables_two_rigid_body(prefix, suffix, num_procs, num_header_lines=6): """ Combine table files generated in parallel by TableTwoRigidBody3D.""" lines = list() for proc in range(num_procs): filename = prefix + str(proc) + suffix with open(filename, 'r') as file1: lines.append(file1.readlines()) with open(prefix + suffix, 'w') as file1: # write header for line in range(6): file1.write(lines[0][line]) proc = 0 done = 0 line = 6 itr = 0 while done == 0: if line >= len(lines[proc]): done = 1 else: file1.write(lines[proc][line]) #print('proc', proc, 'line', line) proc += 1 if proc == num_procs: proc = 0 line += 1 assert itr < 1e50
#def connect_to_port(params, max_attempts=5): # """Attempt to connect to port for server-client interface.""" # port = params['port']+params['sim'] # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # connected = False # attempts = 0 # max_attempts = 5 # while not connected: # try: # sock.connect(("localhost", port)) # connected = True # except: # time.sleep(1) # print('sim', params['sim'], 'failed to connect to port', port) # attempts += 1 # assert attempts < max_attempts, "Could not connect to port." # return sock if __name__ == "__main__": import doctest doctest.testmod()