Source code for microcalorimetry.measurements.rfsweep._main

"""Parse, View, and Run functions for a microcalorimeter experiment."""

from rminstr.utilities.path import new_dir
from microcalorimetry.math import rfpower
from rmellipse.uobjects import RMEMeas
from rmellipse.propagators import RMEProp
from rminstr.data_structures import ExistingRecord, ExptParameters
from pathlib import Path
from os.path import join, dirname, basename
import microcalorimetry.measurements.rfsweep._parser as microparser
import microcalorimetry.measurements.rfsweep._runner as microrunner
import microcalorimetry.configs as configs
import microcalorimetry._helpers._intf_tools as clitools
from microcalorimetry._tkquick.dtypes import Folder
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
import json
import click
import time
from itertools import cycle
from decimal import Decimal
from fnmatch import fnmatch
from microcalorimetry.math.numbers import mean_unique_values
from matplotlib.cm import ScalarMappable

__all__ = [
    'run',
    'parse',
    'generate_settled_runlist',
    'runlist_from_loss',
    'reduce_initial_power',
    'reorder_runlist',
    'review_runlist',
]


def run_gui(
    output_dir: Folder,
    configs: list[Path],
    settings: Path,
    sensor_master_list: Path,
    name: str = 'rfsweep',
    no_confirm: bool = False,
    dry_run: bool = False,
):
    """
    RF Sweep GUI runner.

    Parameters
    ----------
    output_dir : Folder
        Directory to output data to.
    configs : list[Path]
        List of configuration files use to setup the measurement.
    settings : list[Path]
        RFSweep run list.
    sensor_master_list : Path
        Master list of sensor information for sanity checking, should conform
        to RFSensorMasterList spec.
    name : str, optional
        Measurement name. The default is 'rfsweep'.
    no_confirm : bool, optional
        Skip confirming instrument ID strings. The default is False.
    dry_run : bool, optional
        Try to load the settings, and do some validation. The default is False.

    Returns
    -------
    None.

    """

    commands = [
        'rfsweep',
        'run',
        f'"{Path(output_dir).resolve()}"',
    ]

    for c in configs:
        commands += ['--configs', f'"{Path(c).resolve()}"']

    for s in settings:
        commands += ['--settings', f'"{Path(s).resolve()}"']

    commands += ['--sensor-master-list', f'"{Path(sensor_master_list).resolve()}"']
    commands += ['--name', name]

    if no_confirm:
        commands.append('--no-confirm')
    if dry_run:
        commands.append('--dry-run')

    clitools.ucal_cli(commands)


@click.command(name='run')
@click.argument('output_dir', type=Path)
@click.option('--configs', '-c', type=Path, required=True, multiple=True)
@click.option('--settings', '-s', type=Path, required=True, multiple=True)
@click.option('--sensor-master-list', '-m', type=Path, required=True)
@click.option('--name', '-n', type=str)
@click.option('--no-confirm', is_flag=True, default=False)
@click.option('--dry-run', is_flag=True, default=False)
def _run_cli(*args, **kwargs):
    """
    Interface for the RF sweep run function.

    """
    # click inputs tuples, i need lists here.
    kwargs['configs'] = list(kwargs['configs'])
    kwargs['settings'] = list(kwargs['settings'])
    print(args)
    print(kwargs)
    return run(*args, **kwargs)


[docs] def run( output_dir: str, configs: list[Path], settings: list[Path], sensor_master_list: configs.RFSensorMasterList, name: str = 'rf_sweep', no_confirm: bool = False, dry_run: bool = False, validate: bool = True, ): """ Run a microcalorimetry RF Sweep experiment. Parameters ---------- output_directory : Path Directory to output from. The default is None. configs : list[Path], optional Config files for experiment settings / instruments. The default is None. settings : list[Path], optional Run settings files, looped over for experiment. The default is None. Each index i of a configs file corresponds to a index i of the configs file. sensor_master_list : configs.RFSensorMasterList, Master list of sensors and resitance/sensitivity values to use for sanity checking config files. name : str, optional Name of measurement. The default is None. no_confirm : bool, optional If True, skips any confirmations that may be asked when starting the run. dry_run : bool, optional If true, will try to load the configurations without actually running anything to do a dry-check - can be used to validate some basic type validation of the configuration. validate : bool, optional If true, attempts to validate configuration files. The default is True. """ priority = [0] * len(configs) if isinstance(settings, str) or isinstance(settings, Path): settings = [settings] if len(settings) > 1: raise NotImplementedError('Only on runlist can be supplied at a time.') for setting in settings: if not dry_run: output_dir = new_dir(output_dir, name) + r'//' with microrunner.MicrocalorimeterRunner( configs, setting, output_dir, sensor_master_list, priority, no_confirm=no_confirm, validate=validate, ) as runner: # opens visa resources for every instrument runner.initialize_instruments() runner.start_monitor_mode() while not runner.done: runner.iterate() # print(time.time() - time_0) runner.output() # for dry run, just try to initialize all the config files. # and don't do anythin with them. else: output_dir = new_dir(output_dir, name) + r'//' microrunner.MicrocalorimeterRunner( configs, setting, output_dir, sensor_master_list, priority, dry_run=True, validate=validate, )
_run_cli = clitools.format_from_npdoc(run)(_run_cli) @click.command(name='parse') @click.argument('metadata', type=Path, nargs=-1) @click.option('--output-file', '-o', type=Path) @click.option('--show-plots', is_flag=True) @click.option('--save-plots', type=Path) @click.option('--plot-ext', type=str) def _parse_cli( *args, show_plots: bool = False, output_file: Path = None, save_plots: Path = None, plot_ext: str = '.png', **kwargs, ): """ Interface for the command line for parsing DC sweeps. Parameters ---------- show_plots : bool, optional If true, shows the plots in a gui and freezes the terminal, by default False. output_file : Path, optional If provided, outputs any saveable objects to an HDF5 file, by default None. save_plots : Path, optional If provided, saves plots to this directory. The default is None. plot_ext : str, optional File extension to save plots as. The default is '.png.'. Returns ------- _type_ _description_ """ kwargs['make_plots'] = bool(show_plots or save_plots) print('Parse RFSWEEP from CLI:') full_output = dict( args=[str(a) for a in args], show_plots=show_plots, output_file=str(output_file), save_plots=str(save_plots), plot_ext=plot_ext, **{k: str(v) for k, v in kwargs.items()}, ) print(json.dumps(full_output, indent=True)) outputs = clitools.run_and_show_plots( parse, *args, show_plots=show_plots, save_plots=save_plots, plot_ext=plot_ext, **kwargs, ) if output_file: clitools.save_saveable_objects(outputs[0], output_file=output_file) return outputs
[docs] def parse( metadata: list[Path], verbose: bool = False, make_plots: bool = False, plot_segments_indexes: list[int] = [0, -1], plot_all_segments_analysis: bool = False, dataframe_results: Path = None, include_time_std: bool = True, analysis_config: configs.RFSweepParserConfig = None, DUT_power_analysis: dict = None, monitor_power_analysis: dict = None, calorimeter_power_analysis: dict = None, RF_source_power_analysis: dict = None, ) -> tuple[dict[RMEMeas], list[plt.Figure]]: """ Parse a microccalorimeter run to produce data with uncertainties. This parses the initial version of the DC sweep experiment. Parameters ---------- metadata : list[Path] Path(s) to metadata files or directories containing rfsweep runs. verbose : bool, optional If True, prints more verbose info about the analysis. Default is True make_plots : bool, optional Makes review plots if True. The default is True. plot_segments_indexes : list[int] What segment of each run to plot. Each interger represents the index of every segment for every run in order of runs. For example, index 0 is the first segment of the first run Index -1 is the last segment of the last run. The default is [0, -1]. plot_all_segments_analysis : bool If True, plot every single segment's review charts and ignore plot_segment_indexes. dataframe_results : Path, optional If provided, saves a csv of intermediate calculated values to the specified path. include_time_std : bool, optional It True, includes standard deviation of stable samples used to calculate the value of a particular column as an uncertainty mechanism. The default is True. analysis_config : RFSweepParserConfig, optional Supply a .yml or .csv file with "analysis_config" or "signal_config" fields. These will overwrite the signal_config present in each runs saved signal_config definition. DUT_power_analysis : dict, optional Set the analysis settings for the DUT. Format ------ { instr_timing_tolerance : float Max expected misalignment of instrument's time column to determine when RF is turned off. Default is 5. stats_window_override : float Override the defined stats window to change the window of samples averaged over to determine off/on values. The default is None. fast_off_analysis : bool Whether or not to use fast analysis. Only matters for thermoelectric sensors power sensors (which may or may not use the fast off analysis). The default is False. coeffs : Path Provided a path to thermoelectric fit coefficients you want to use. Must be provided for thermoelectric power sensors. V_off_delay : float Relative time from RF being turned off to on to use for the V off measurement. Only relevant if fast analysis is being used. V_off_function : str What V Off fit function to use, only required if doing a fast fit. Options Format -------------- { lin_plus_exp Fit fast off measurements to a line + an exponential. linear Fit fast off measurements to a line. single_sample Treat all fast off measurements in window as realizations of the same measurement and average over them. } V_off_fit_time_window : list[float] Relative time window from RF being turned on to fit to. For example, [1, 2] would fit between 1 and 2 seconds after RF power is turned off. } monitor_power_analysis : dict, optional Set the analysis settings for the monitor. Format ------ { instr_timing_tolerance : float Max expected misalignment of instrument's time column to determine when RF is turned off. Default is 5. stats_window_override : float Override the defined stats window to change the window of samples averaged over to determine off/on values. The default is None. fast_off_analysis : bool Whether or not to use fast analysis. Only matters for thermoelectric sensors power sensors (which may or may not use the fast off analysis). The default is False. coeffs : Path Provided a path to thermoelectric fit coefficients you want to use. Must be provided for thermoelectric power sensors. V_off_delay : float Relative time from RF being turned off to on to use for the V off measurement. Only relevant if fast analysis is being used. V_off_function : str What V Off fit function to use, only required if doing a fast fit. Options Format -------------- { lin_plus_exp Fit fast off measurements to a line + an exponential. linear Fit fast off measurements to a line. single_sample Treat all fast off measurements in window as realizations of the same measurement and average over them. } V_off_fit_time_window : list[float] Relative time window from RF being turned on to fit to. For example, [1, 2] would fit between 1 and 2 seconds after RF power is turned off. } calorimeter_power_analysis : dict, optional Set the analysis settings for the calorimeter. Format ------ { instr_timing_tolerance : float Max expected misalignment of instrument's time column to determine when RF is turned off. Default is 5. stats_window_override : float Override the defined stats window to change the window of samples averaged over to determine off/on values. The default is None. coeffs : Path Provided a path to thermoelectric fit coefficients you want to use. Must be provided for thermoelectric power sensors. } RF_source_power_analysis : dict, optional Set the analysis settings for the RF Source. Format ------ { instr_timing_tolerance : float Max expected misalignment of instrument's time column to determine when RF is turned off. Default is 5. stats_window_override : float Override the defined stats window to change the window of samples averaged over to determine off/on values. The default is None. V_off_delay : float Relative time from RF being turned off to use as the off measurement for a fast off. Only relevant if fast analysis is being used. V_off_function : str What off fit function to use, only required if doing a fast fit. Options Format -------------- { lin_plus_exp Fit fast off measurements to a line + an exponential. linear Fit fast off measurements to a line. single_sample Treat all fast off measurements in window as realizations of the same measurement and average over them. } V_off_fit_time_window : list[float] Relative time window from RF being turned on to fit to. For example, [1, 2] would fit between 1 and 2 seconds after RF power is turned off. } Returns ------- parsed_rf : dict[RMEMeas] Dictionary of RFSweep datasets in a parsed RF sweep configuration. figures : list[plt.Figure] Dictionary of RFSweep datasets in a parsed RF sweep configuration. """ # wrap propagator around functions basicprop = RMEProp( sensitivity=True, montecarlo_sims=0, ) zeta_dcsub = basicprop.propagate(rfpower.zeta_dcsub) zeta_general = basicprop.propagate(rfpower.zeta_general) dc_sub = basicprop.propagate(rfpower.dc_substituted_power) openloope_te_power = basicprop.propagate(rfpower.openloop_thermoelectric_power) # if an analysis config fils is provided, load that if analysis_config is None: analysis_config = {} else: analysis_config = configs.load_config(analysis_config) def overload_analysis_config_from_fvalues(signame, d): if d is None: return # pre fill dictionairys that might be missing if 'analysis_config' not in analysis_config: analysis_config['analysis_config'] = {} if signame not in analysis_config['analysis_config']: analysis_config['analysis_config'][signame] = {} # over load values that aren't None for ckey, value in d.items(): # there fit coeffs were specified, put those in the # right place. if ckey == 'coeffs' and value is not None: if 'signal_config' not in analysis_config: analysis_config['signal_config'] = {} if signame not in analysis_config['signal_config']: analysis_config['signal_config'][signame] = {} analysis_config['signal_config'][signame]['coeffs'] = value # other rise pass it in elif value is not None: analysis_config['analysis_config'][signame][ckey] = value # use any dictionaries passed in directly to modify the # configuration file at run time. This provides the GUI an # easier way to modify these config values. overload_analysis_config_from_fvalues('DUT_power', DUT_power_analysis) overload_analysis_config_from_fvalues( 'calorimeter_power', calorimeter_power_analysis ) overload_analysis_config_from_fvalues('monitor_power', monitor_power_analysis) overload_analysis_config_from_fvalues('RF_source_power', RF_source_power_analysis) # import json # print(json.dumps(analysis_config, indent = True)) # DUT, RF_source, and calorimeter_power should always be there for always_present in ['DUT_power', 'calorimeter_power', 'RF_source_power']: if always_present not in analysis_config['analysis_config']: analysis_config['analysis_config'][always_present] = {} # set up parser metadata_dict = {} figures = [] if type(metadata) is not list: metadata = [metadata] # if a folder is pointed to, use a filed with _metadata.csv # in the name as the metadata file, so folders can be pointed to adjusted_metadata = [] for md in metadata: md = Path(md) if not md.exists(): raise FileExistsError(f'{md} doesnt exist') elif md.is_file(): adjusted_metadata.append(md) # assume folders contain a single run # 1 metadata file elif md.is_dir(): new_md = [f for f in md.glob('*_metadata.csv')] if len(new_md) == 1: adjusted_metadata.append(new_md[0]) else: raise ValueError( f'{md} must contain exactly 1 *_metadata files to be pointed to by parser.' ) metadata = adjusted_metadata runs = [] # print(metadata) ts = time.time() for metadata_path in metadata: run_dir = Path(metadata_path).parent run_file = Path(metadata_path).name # fill path to metadata run_name = Path(run_file).resolve().as_posix() print('run_name', run_name) run = microparser.NewTypeRun( working_folder=str(run_dir) + '/', data_file=run_dir / run_file, name=run_file, **analysis_config, ) run.load() run.analyze() metadata_dict.update({str(k): v for k, v in run.expt.config.items()}) runs.append(run) print('\n parse time = ', time.time() - ts) print(' Making Noise Plots...') print(' ---------------------') # make detailed analysis plots # there are so many ts = time.time() if make_plots: for run in runs: for signal, analyzer in run.analyzers.items(): try: analyzer.plot_analysis except AttributeError: print(f'{signal} {analyzer} has no plot analysis method. Skipping') continue for si, segment in enumerate(run.segments): called_out = si in plot_segments_indexes is_end = si == len(run.segments) - 1 and ( -1 in plot_segments_indexes ) if called_out or is_end or plot_all_segments_analysis: # plot the analysis for a single step # may get multiple plots for step print( f'Making review figure segment: {si} : {signal} : {type(analyzer)}' ) try: new_figures = analyzer.plot_analysis(segment) except NotImplementedError: new_figures = None print(' Not implemented') if not isinstance(new_figures, list): new_figures = [new_figures] for figure in new_figures: if figure is not None: figure.suptitle( f'{signal} signal \n run {Path(metadata_path).parent.name} ; segment {si}' ) figures += new_figures # use the last signal config signal_config = configs.RFSweepSignalConfig(runs[-1].parsed_config['signal_config']) # format data output into rmellipse objects c = microparser.Campaign(runs, Path.cwd(), 'rfsweep') # generate noise plots if make_plots: # if make_plots: data_df = c.output_segments(fmt_for='pandas') for signal, sconfig in signal_config.items(): # don't make noise plots for RF Source power # because it doesn't make sense if signal == 'RF_source_power': continue input_signals = sconfig['input_signals'] if isinstance(input_signals, str): input_signals = [input_signals] for ins in input_signals: # plot the deviation of the RF on value as a function # of the step number fig, ax = plt.subplots(2, 1) try: column = sconfig[ins]['column'] fig.suptitle(f'{column} RF On standard deviation') cmap = plt.get_cmap('viridis') norm = plt.Normalize(0, len(data_df)) for mode in ['on']: freq = data_df['frequency'] points = np.arange(len(freq)) dev = data_df[f'{column}_{mode}_dev'] mean = data_df[f'{column}_{mode}'] ppm = dev / mean * 1e6 sc = ax[0].scatter(freq, dev, marker='o', c=points) ax[1].set_xlabel('Frequency (GHz)') ax[1].scatter(freq, ppm, marker='o', c=points) ax[0].set_ylabel(f'Column RF On std ({sconfig[ins]["units"]})') ax[1].set_ylabel('Column RF On std (ppm)') sm = ScalarMappable(norm=norm, cmap=cmap) sm.set_array([]) cbar = fig.colorbar(sm, ax=ax) cbar.ax.set_title('Point Number') figures.append(fig) except Exception as e: plt.close(fig) print( f'Encountered error plotting signal {signal}:{ins} std \n {type(e)}: {e}' ) plt.show() fig, ax = plt.subplots(2, 1) try: column = sconfig[ins]['column'] fig.suptitle(f'{signal} \n {column} RF Off standard deviation') ax[1].set_xlabel('Before (left) and After (right)') ax[0].set_ylabel(f'STD of {ins} ({sconfig[ins]["units"]})') ax[1].set_ylabel(f'STD of {ins} (ppm)') for ir, run in enumerate(c.run_list): for iseg, segment in enumerate(run.segments): i_off = segment.results[f'{column}_off_i'] f_off = segment.results[f'{column}_off_f'] i_off_dev = segment.results[f'{column}_off_i_dev'] f_off_dev = segment.results[f'{column}_off_f_dev'] rname = Path(run.working_folder) # label relative to the CWD if possible, # otherwise use the full path try: rname = ( rname.absolute().relative_to(Path.cwd()).as_posix() ) rname = './' + rname except ValueError: rname = rname.as_posix() label = f'{rname} : segment {iseg}' ax[0].plot( [0, 1], [i_off_dev, f_off_dev], 'o--', label=label ) ax[1].plot( [0, 1], [i_off_dev / i_off * 1e6, f_off_dev / f_off * 1e6], 'o--', ) ax[0].legend( bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0 ) fig.tight_layout() figures.append(fig) except Exception as e: plt.close(fig) print( f'Encountered error plotting {signal}:{ins} segment off \n {type(e)}: {e}' ) print('plot time = ', time.time() - ts) # output a the dataframe results df = c.output_dataframe() if dataframe_results: print(' Outputing intermediate csv results') print(' ----------------------------------') df.to_csv(dataframe_results) # format fata for rmellipse calculataions print('generating RMEMeas of raw data...') print('---------------------------------') ts = time.time() data = c.output_segments( fmt_for='rmellipse', include_specs=True, include_time_std=include_time_std ) ep = run.expt.config print('rme time = ', time.time() - ts) # variablize the column names to make it a bit easier assert signal_config['calorimeter_power']['type'] == 'thermoelectric' e_col = signal_config['calorimeter_power']['e']['column'] source_col = signal_config['RF_source_power']['power']['column'] # do a little bit of post processing to calculate power # This calculates the inferred power flowing through the thermopile print('calculating sensor powers...') print('----------------------------') outputs = {} cal_coeffs = configs.ThermoelectricFitCoefficients( signal_config['calorimeter_power']['coeffs'] ).load() try: p_of_e_calorimeter = cal_coeffs.attrs['p_of_e'] E = openloope_te_power( cal_coeffs, data.sel(col=e_col + '_on') - data.sel(col=e_col + '_off_slow'), p_of_e=p_of_e_calorimeter, ) outputs.update({'E': E}) except AttributeError: print( "Warning: Can't compute power inferred by calorimeter, skipping. Likely a full data model of the sensitivity wasn't supplied." ) outputs.update( { 'e_on': data.sel(col=e_col + '_on'), 'e_off': data.sel(col=e_col + '_off_slow'), } ) outputs.update({'RF_source_on': data.sel(col=source_col + '_on')}) # this part of the code is trying to turn the voltage/current # measurements into power measurements of the sensor inside # calorimeter and/or on the sidearm. # this should be a class insttead of a bunch of if statements # if there is a voltage column and no thermoelectric column # it's a dc substitution sensor inside the calorimeter (port 2) # so use that. if signal_config['DUT_power']['type'] == 'bolometer': # possible no current was provided if measured on a type 4 s_v_col = signal_config['DUT_power']['vdc']['column'] if 'idc' in signal_config['DUT_power']: s_i_col = signal_config['DUT_power']['idc']['column'] I_on = (data.sel(col=s_i_col + '_on'),) I_off_slow = (data.sel(col=s_i_col + '_off_slow'),) I_off_fast = (data.sel(col=s_i_col + '_off_fast'),) p2dc_on = data.sel(col=s_v_col + '_on') * I_on p2dc_off_slow = data.sel(col=s_v_col + '_off_slow') * I_off_slow p2dc_off_fast = data.sel(col=s_v_col + '_off_fast') * I_off_fast else: s_resistance = signal_config['DUT_power']['resistance'] I_on = None I_off_slow = None I_off_fast = None p2dc_on = data.sel(col=s_v_col + '_on') ** 2 / s_resistance p2dc_off_slow = data.sel(col=s_v_col + '_off_slow') ** 2 / s_resistance p2dc_off_fast = data.sel(col=s_v_col + '_off_fast') ** 2 / s_resistance p2_fast = dc_sub( data.sel(col=s_v_col + '_on'), data.sel(col=s_v_col + '_off_fast'), I_on=I_on, I_off=I_off_fast, R=s_resistance, ) p2_slow = dc_sub( data.sel(col=s_v_col + '_on'), data.sel(col=s_v_col + '_off_slow'), I_on=I_on, I_off=I_off_slow, R=s_resistance, ) zeta = zeta_dcsub( data.sel(col=e_col + '_on'), data.sel(col=e_col + '_off_slow'), data.sel(col=s_v_col + '_on'), data.sel(col=s_v_col + '_off_fast'), data.sel(col=s_v_col + '_off_slow'), I_on, I_off_fast, I_off_slow, ) outputs.update( { 'p2_fast': p2_fast, 'p2_slow': p2_slow, 'p2dc_on': p2dc_on, 'p2dc_off_slow': p2dc_off_slow, 'p2dc_off_fast': p2dc_off_fast, 'zeta': zeta, } ) # this is a thermoelectric power sensor, so lets do that elif signal_config['DUT_power']['type'] == 'thermoelectric': # get sensor coefficients # and the calorimeter coefficients s_coeffs = configs.ThermoelectricFitCoefficients( signal_config['DUT_power']['coeffs'] ).load() dut_signals = signal_config['DUT_power'] s_e_col = dut_signals['e']['column'] # thermometer model do a temperature correction if 'therm_v' in dut_signals and 'col' in s_coeffs.dims: s_e_const = 1.0 therm_v_col = dut_signals['therm_v']['column'] therm_i_col = dut_signals['therm_i']['column'] therm_v = data.sel(col=therm_v_col + '_on') therm_i = data.sel(col=therm_i_col + '_on') temperature = therm_v / therm_i outputs.update({'temperature_p2': temperature}) # this is a polyomial fit # check if the slope of the sensor equals the slope of the # coefficients, if not then the thermoelectric sensor's RF # side has a negative polarity to the srf side and the voltage # measured needs to be multiplied by -1.\ else: temperature = None s_e_const = 1.0 measured_slope_sign = np.sign(data.nom.sel(col=s_e_col + '_on')[0]) coeff_sign = np.sign(s_coeffs.nom.sel(deg=1)) if measured_slope_sign != coeff_sign: s_e_const = -1.0 p2_slow = openloope_te_power( s_coeffs, s_e_const * (data.sel(col=s_e_col + '_on') - data.sel(col=s_e_col + '_off_slow')), p_of_e=cal_coeffs.attrs['p_of_e'], temperature=temperature, ) # if a fast off is available, use that try: p2_fast = openloope_te_power( s_coeffs, s_e_const * (data.sel(col=s_e_col + '_on') - data.sel(col=s_e_col + '_off_fast')), p_of_e=cal_coeffs.attrs['p_of_e'], temperature=temperature, ) outputs.update({'e_p2_off_fast': data.sel(col=s_e_col + '_off_fast')}) zeta = zeta_general( data.sel(col=e_col + '_on'), data.sel(col=e_col + '_off_slow'), cal_coeffs, cal_coeffs.attrs['p_of_e'], p2_slow, ) except KeyError: print('No fast off analysis for {DUT_power}') p2_fast = p2_slow outputs.update({'e_p2_off_fast': data.sel(col=s_e_col + '_off_slow')}) zeta = zeta_general( data.sel(col=e_col + '_on'), data.sel(col=e_col + '_off_slow'), cal_coeffs, cal_coeffs.attrs['p_of_e'], p2_fast, ) outputs.update( { 'e_p2_on': data.sel(col=s_e_col + '_on'), 'e_p2_off_slow': data.sel(col=s_e_col + '_off_slow'), 'p2_fast': p2_fast, 'p2_slow': p2_slow, 'zeta': zeta, } ) # dummy sensors don't do anything elif signal_config['DUT_power']['type'] == 'special': pass else: msg = f'{signal_config["DUT_power"]["type"]} not recognized' raise ValueError(msg) # now calculate the relevant powers for the sidearm # it may n ot be peresent, check for it in the signal config # first. try: signal_config['monitor_power'] calculate_monitor = True except KeyError: calculate_monitor = False if calculate_monitor and signal_config['monitor_power']['type'] == 'bolometer': s3_v_col = signal_config['monitor_power']['columns']['v_col'] if 'idc' in signal_config['monitor_power']: s3_i_col = signal_config['monitor_power']['columns']['i_col'] I3_on = (data.sel(col=s3_i_col + '_on'),) I3_off_slow = (data.sel(col=s3_i_col + '_off_slow'),) I3_off_fast = (data.sel(col=s3_i_col + '_off_fast'),) p3_fast = dc_sub( data.sel(col=s3_v_col + '_on'), data.sel(col=s3_v_col + '_off_fast'), I_on=I3_on, I_off=I3_off_fast, ) p3_slow = dc_sub( data.sel(col=s3_v_col + '_on'), data.sel(col=s3_v_col + '_off_slow'), I_on=I3_on, I_off=I3_off_slow, ) else: s3_resistance = signal_config['monitor_power']['resistance'] p3_fast = dc_sub( data.sel(col=s3_v_col + '_on'), data.sel(col=s3_v_col + '_off_fast'), ) p3_slow = dc_sub( data.sel(col=s3_v_col + '_on'), data.sel(col=s3_v_col + '_off_slow'), R=s3_resistance, ) outputs.update( { 'p3_fast': p3_fast, 'p3_slow': p3_slow, } ) # no distinguishing between a fast and slow analysis # for a commercial power meter elif calculate_monitor and signal_config['monitor_power']['type'] == 'commercial': s3_p_col = signal_config['monitor_power']['power']['column'] p3_slow = data.sel(col=s3_p_col + '_on') p3_fast = data.sel(col=s3_p_col + '_on') outputs.update( { 'p3_fast': p3_fast, 'p3_slow': p3_slow, } ) outputs.update() elif calculate_monitor and signal_config['monitor_power']['type'] == 'special': pass elif calculate_monitor: msg = f'{signal_config["monitor_power"]["type"]} not recognized' raise ValueError(msg) # plot parsed components if make_plots: plot_outputs = {k: v for k, v in outputs.items() if k in ['zeta']} for k, v in plot_outputs.items(): fig, ax = plt.subplots(1, 1) unc = v.stdunc(k=2).cov ax.errorbar( v.nom.frequency, v.nom, yerr=unc, capsize=3, label='Uncertainty k=2', ls='', ) ax.set_ylabel(k.replace('zeta', r'Uncorrected $\eta$')) ax.set_xlabel('Frequency (GHz)') ax.legend(loc='best') sidearm_name = None try: sidearm_name = ep['measurement_description']['sidearm_name'] except KeyError: pass try: fig.suptitle( rf'Internal mount: {ep["measurement_description"]["mount_name"]}, external mount: {sidearm_name} \n Uncorrected $\eta$' ) except KeyError: fig.suptitle(r'Parsed RF Sweep: Uncorrected $\eta$') fig.tight_layout() figures.append(fig) return outputs, figures
_parse_cli = clitools.format_from_npdoc(parse)(_parse_cli) def mean_last_of_point_dBm(signal, points, i: int, n_samples: int): nearest = np.searchsorted(signal[0], points[0][i]) avg = float( np.mean( signal[1][:nearest][ abs(np.diff(signal[1][:nearest], append=signal[1][-1])) > 0 ][-n_samples] ) ) return 10 * np.log10(avg) + 30
[docs] def reduce_initial_power( runlist: Path, reduce_by_dB: float, output_path: Path = None, decimals: int = 4 ): """ Reduce a runlist's initial power setting. Parameters ---------- runlist : Path Path to runlist to modify. reduce_by_dB : float How much to reduce the initial power by. Negative values will increase the initial power. output_path : Path, optional If None, uses the same name as input file with '_min{num}dB.csv' decimals : int, optional Number of decimals to use for frequency points. Default is 4. """ runlist = Path(runlist) data = pd.read_csv(runlist) ind = data.Frequency_GHz > 0 new_init = data.Initial_source_power_dBm[ind] - reduce_by_dB data.loc[ind, 'Initial_source_power_dBm'] = np.round(new_init, decimals) if output_path is None: output_path = runlist.parent / (runlist.stem + f'_min{reduce_by_dB}dB.csv') data.to_csv(output_path, index=False)
[docs] def reorder_runlist( runlist: Path, output_path: Path = None, segment_size: int = 10, off_step_length: int = 2, freq_ordering: str = 'interleave', make_plots: bool = True, ): """ Reorder a runlist. Can modify the number of and size of segments, and reorder frequencies. Parameters ---------- runlist : Path Frequency list to reorder. output_path : Path, optional Directory to output runfile. The default is the same path as the input file with '_reordered' appended to the name. segment_size : int, optional Number of steps per segment (maximum). The default is 10. off_step_length : int, optional How many steps each off period should be. Typically 2, the default is 2. freq_ordering : str, optional Determines how frequencies are sorted before being split into segments. Options Format -------------- { monotonic_increasing Sort frequencies monitonically increasing (e.g. 1,2,3,5,6). interleave_increasing Sort frequencies into 2 monitonically increasing, interleaved lists. For example: 1,2,3,4,5,6 becomes 1,3,5,2,4,6. } make_plots : bool, optional Generate review plots of runlist. The default is True. """ # get runlist, remove off points, sort by # frequency runlist = Path(runlist) if output_path is None: output_path = runlist.parent / (runlist.stem + '_reordered.csv') data = pd.read_csv(runlist) ind = data.Frequency_GHz > 0 new = data[ind].sort_values('Frequency_GHz') # seperate out columns frequency = new['Frequency_GHz'] starting_powers = new['Initial_source_power_dBm'] target_power = new['Target_source_power_dBm'] limit_powers = new['Source_power_limit_dBm'] # interleave if asked to match freq_ordering: case 'monotonic_increasing': initial_source = starting_powers.values.copy() frequencies = frequency.values.copy() targets = target_power.values.copy() limits = limit_powers.values.copy() case 'interleave_increasing': initial_source = _interleave(starting_powers.values) frequencies = _interleave(frequency.values) targets = _interleave(target_power.values) limits = _interleave(limit_powers.values) case _: raise ValueError('freq_ordering not a recognized value') output_df = { 'Frequency_GHz': [], 'Initial_source_power_dBm': [], 'Target_source_power_dBm': [], 'Source_power_limit_dBm': [], } for i, fi in enumerate(frequencies): # insert zero rows if i % segment_size == 0: for n in output_df: output_df[n] += [0] * off_step_length output_df['Frequency_GHz'].append(fi) output_df['Initial_source_power_dBm'].append(initial_source[i]) output_df['Target_source_power_dBm'].append(targets[i]) output_df['Source_power_limit_dBm'].append(limits[i]) # append with a zero row for n in output_df: output_df[n] += [0] * off_step_length output_df = pd.DataFrame(output_df) output_df.to_csv(output_path, index=False) figures = [] if make_plots: figures = review_runlist(output_path) return figures
[docs] def review_runlist(runlist: Path) -> list[plt.Figure]: """ Review a runlist for an rf sweep measurement. Parameters ---------- runlist : Path Path to runlist. Returns ------- plots : list[plt.Figures] List of review charts. """ runlist = Path(runlist) data = pd.read_csv(runlist) # srtd with 0 rows removed ind_on = data.Frequency_GHz > 0 ind_off = data.Frequency_GHz == 0 srtd = data[ind_on].sort_values('Frequency_GHz') fig1, ax = plt.subplots(1, 1) ax.plot(srtd.Frequency_GHz, srtd.Initial_source_power_dBm, label='Initial') ax.plot(srtd.Frequency_GHz, srtd.Target_source_power_dBm, label='Target') ax.plot(srtd.Frequency_GHz, srtd.Source_power_limit_dBm, label='Limit') ax.set_xlabel('Frequency GHz') ax.set_ylabel('Power (dBm)') ax.set_title(f'Runlist Review: \n {str(runlist)}') ax.legend(loc='best') fig2, ax = plt.subplots(1, 1) ax.plot(data.Frequency_GHz[ind_on], 'o', label='Power On') ax.plot(data.Frequency_GHz[ind_off], 'o', label='Power Off') ax.set_xlabel('Step Number') ax.set_ylabel('Frequency (GHz)') ax.set_title(f'Runlist Review: \n {str(runlist)}') return [fig1, fig2]
[docs] def runlist_from_loss( parsed_rf: configs.ParsedRFSweep, DUT_power: float, output_path: Path = Path('.') / 'runlist.csv', segment_size: int = 10, off_step_length: int = 2, safety_backoff_dBm: float = 3, source_hard_limit_buffer_dBm: float = 1, interleave: bool = True, ) -> list[plt.Figure]: """ Make a runlist from a parsed measurement. Estimates the approximate loss from the source to the DUT and makes a new runlist that attempts to level the source to the DUT_power as the initial power. Parameters ---------- parsed_rf : configs.ParsedRFSweep A parsed rfsweep measurement. DUT_power : float DUT power in dBm to try and level the source to. output_path : Path, optional Path to output the generate runfile. The default is 'runlist.csv'. segment_size : int, optional Number of frequency points per segment. The default is 5. off_step_length : int, optional How many steps each off period should be. Typically 2, the default is 2. safety_backoff_dBm : float, optional Back off the inintial source value by this amount to avoid starting the source at too high of a level. The default is 3.0. source_hard_limit_buffer_dBm : float, optional Add this amount of power to the estimated required power and set it as the source limit per frequency point. The default is 1 dBm. interleave : bool, optional Interleave frequency points. The default is True. Returns ------- figures : list[plt.Figure] List of generated figures. """ parsed_rf = configs.ParsedRFSweep(parsed_rf) # read in, average repeat frequencies, sort by freuqency # and convert to dBm source_power = mean_unique_values( parsed_rf['RF_source_on'].load().nom, dim='frequency' ).sortby('frequency') dut_power = 10 * np.log10( mean_unique_values(parsed_rf['p2_fast'].load().nom, dim='frequency').sortby( 'frequency' ) * 1000 ) # array of target DUT powers target_power = dut_power * 0 + DUT_power def frmt(*args): args = [float(a) for a in args] return '{:6.3f} | {:6.3f} | {:6.3f} | {:6.3f}'.format(*args) # calculate RF loss loss = source_power - dut_power # calculate required power required_power = target_power + loss # back off safely starting_powers = required_power - safety_backoff_dBm limit_powers = required_power + source_hard_limit_buffer_dBm figs = [] # make some plots fig, ax = plt.subplots() ax.set_xlabel('Frequency (GHz)') ax.set_ylabel('Power (dBm)') ax.set_title('Power Table Calculation Summary') ax.plot( loss.frequency, target_power, 'g', label='Target Power', ) ax.plot( loss.frequency, required_power, 'b--', label='Required Power', ) ax.plot(loss.frequency, starting_powers, label='New Starting Power', color='k') ax.plot(loss.frequency, limit_powers, label='Source Limit', color='r') ax.legend(loc='best') figs.append(fig) # # if no maximums hit, build a run list # # interleave the frequency points if interleave: initial_source = _interleave(starting_powers.values) frequencies = _interleave(loss.frequency.values) targets = _interleave(target_power.values) limits = _interleave(limit_powers.values) else: initial_source = starting_powers.values.copy() frequencies = loss.frequency.values.copy() targets = target_power.values.copy() limits = limit_powers.values.copy() output_df = { 'Frequency_GHz': [], 'Initial_source_power_dBm': [], 'Target_source_power_dBm': [], 'Source_power_limit_dBm': [], } for i, fi in enumerate(frequencies): # insert zero rows if i % segment_size == 0: for n in output_df: output_df[n] += [0] * off_step_length output_df['Frequency_GHz'].append(fi) output_df['Initial_source_power_dBm'].append(initial_source[i]) output_df['Target_source_power_dBm'].append(targets[i]) output_df['Source_power_limit_dBm'].append(limits[i]) # append with a zero row for n in output_df: output_df[n] += [0] * off_step_length output_df = pd.DataFrame(output_df) output_df.to_csv(output_path, index=False) return figs
def _interleave(a): out = np.append(a[0 : len(a) : 2], a[1 : len(a) : 2]) assert len(out) == len(a) return out def _smallest_neighbour(x_interp: np.array, x: np.array, y: np.array): """ Picks the smallest neighbour of y when interpolating x_interp to x. """ # make sure input arrays are sorted # and unique sort_ind = np.argsort(x) x = x[sort_ind] y = y[sort_ind] x, uniq_index = np.unique(x, return_index=True) y = y[uniq_index] # pick neighbour with smalles y value out = np.zeros(len(x_interp)) for i, xi in enumerate(x_interp): closest_i = np.argmin(abs(x_interp[i] - x)) f_closest = x[closest_i] if f_closest < xi: other_neighbour = closest_i + 1 else: other_neighbour = closest_i - 1 ytest_1 = y[closest_i] try: ytest_2 = y[other_neighbour] except IndexError: ytest_2 = np.inf out[i] = min(ytest_1, ytest_2) return out
[docs] def generate_settled_runlist( metadata: Path, analysis_config: configs.RFSweepParserConfig = None, output_dir: Folder = Path('.'), output_name: str = None, n_samples: int = 7, ): """ Generate a run list with settled source powers. Copies the run settings (frequency list, target power, initial source power, and source limit) and replaces the initial source power with the settled value from the provided run. This function does NOT check if the source was actually settled, it just assumes it was right before power was turned off. Unfinshed runs can be provided, but they may provide bad settings for the final points if the experiment never actually settled, and the frequency list may be incomplete. Parameters ---------- metadata : Path Path to the metadatafile of a measurement. output_dir : Folder, optional Folder to output new file in. The default is the current directory. output_name : str, optional What to name new file. The default is the provided metadata name + '_settled_runlist.csv' n_samples : int, optional Number of samples to average for final source value. """ dr = ExistingRecord(metadata) d_full = dr.batch_read( ['rf_power_setting', 'frequency', 'power_on', 'point_counter'] ) # get columns I care about src = d_full['rf_power_setting'] points_dr = d_full['point_counter'] def frmt(*args): args = [float(a) for a in args] return '{:6.3f} | {:6.3f} | {:6.3f} | {:6.3f}'.format(*args) # read in settings file try: settings = pd.read_csv(dr.metadata['settings_file']) except FileNotFoundError: try_file = Path(metadata).parent / Path(dr.metadata['settings_file']).name settings = pd.read_csv(try_file) # the data record counts the initial warm up as a point. # experiment needs to be finished for this if len(settings) + 1 != len(points_dr[1]): print( 'Incomplete measurement passed to metadata. Meas list may be incomplete or have bad source settings on last frequency.' ) # for each completed measurment # change the initial source to the final source points = (points_dr[0][1:], points_dr[1][1:]) new_list = {c: [] for c in settings.columns} for i, (tp, p) in enumerate(zip(points[0], points[1])): # copy row fset = settings.iloc[i] for c in settings.columns: new_list[c].append(fset[c]) # leave the zero rows alone, otherwise update initial source if not all([fs == 0 for fs in fset]): # go up a point to find end of source if i < len(points[0]): # go up a point to find end of source nearest = np.searchsorted(src[0], points[0][i]) # this is silly, but finds the start of the next point # with nearest, then looks for where the source changes state # closest to that point, and averages the previous 7 samples # to get the source value right before the fast off # happened. new = float( np.mean( src[1][:nearest][ abs(np.diff(src[1][:nearest], append=src[1][-1])) > 0 ][-n_samples] ) ) new_list['Initial_source_power_dBm'][-1] = new pass line = frmt(*[fs[-1] for fs in new_list.values()]) print(line) print('-' * len(line)) # save new dataframe df = pd.DataFrame(new_list) output_dir = Path(output_dir) if output_name is None: output_name = os.path.basename(dr.metadata['settings_file']).split('.')[0] output_name += '_settled.csv' df.to_csv(output_dir / output_name, index=False)