"""Parse, View, and Run functions for a microcalorimeter experiment."""
from rminstr.utilities.path import new_dir
from microcalorimetry.math import rfpower
from rmellipse.uobjects import RMEMeas
from rmellipse.propagators import RMEProp
from rminstr.data_structures import ExistingRecord, ExptParameters
from pathlib import Path
from os.path import join, dirname, basename
import microcalorimetry.measurements.rfsweep._parser as microparser
import microcalorimetry.measurements.rfsweep._runner as microrunner
import microcalorimetry.configs as configs
import microcalorimetry._helpers._intf_tools as clitools
from microcalorimetry._tkquick.dtypes import Folder
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
import json
import click
from itertools import cycle
from decimal import Decimal
from fnmatch import fnmatch
__all__ = ['run', 'parse', 'view', 'generate_settled_runlist', 'runlist_from_loss']
[docs]
def view(
metadata: Path,
signal_config: configs.RFSweepSignalConfig = None,
time_window: list[float] = None,
plot_p_est: bool = True,
plot_sensor_raw: bool = True,
down_sample_n: int = 1,
power_units: str = 'W',
time_units: str = 'hrs',
) -> tuple[plt.Figure]:
"""
View an ongoing rfsweep experiment.
Parameters
----------
metadata : Path
Path to the metadata file of an active experiment
signal_config : RFSweepSignalConfig, optional
Path to a signal configuration of the measurement.
Will attempt to read the signal config from the measurement
files if not provided, if provided will overide what is in
the metadata files.
time_window : list[float], optional
Time window to look at (in units of time_units),
as [min, max]. If
not provided will plot entire time series. by default None
plot_p_est : bool, optional
Plots the estimate of each power signal, by default True
plot_sensor_raw : bool, optional
Plots the raw data that composes each signal, by default True
down_sample_n : int, optional
Down sample time series by n, by default 1
power_units : str, optional
Plot units for power, by default 'W'
time_units : str, optional
Plot units for time, by default 'hrs'
Returns
-------
figures : tuple[Figure]
Tuple of output figures.
"""
def down_sample(ts, n=down_sample_n):
index = np.arange(0, len(ts[0]), down_sample_n)
t = ts[0][index]
y = ts[1][index]
return t, y
def zero_hour(ts, t0=None):
return ts / 3600
def punit(vals, origin: str = 'W'):
if origin == power_units:
return vals
if origin == 'W':
if power_units == 'mW':
return vals * 1000
elif power_units == 'dBm':
return 10 * np.log10(vals) + 30
elif origin == 'dBm':
if power_units == 'mW':
return 10 ** (vals / 10)
elif power_units == 'W':
return 10 ** (vals / 10) / 1000
# check that powerlevelling is functioning properly
# this is just for debugging inline
# mpl.use('tkagg')
# plt.close('all')
# metadata = r"O:\67201\Power\24Calor\rawdata\C24N129\040\cstd_run_0_incomplete\20250115_metadata.csv"
# it will break the app if run normally
# if a folder is pointed to, use a filed with _metadata.csv
# in the name as the metadata file, so folders can be pointed to
# adjusted_metadata = []
# for md in [metadata]:
# md = Path(md)
# if not md.exists():
# raise FileExistsError(f'{md} doesnt exist')
# elif md.is_file():
# adjusted_metadata.append(md)
# # assume folders contain a single run
# # 1 metadata file
# elif md.is_dir():
# new_md = [f for f in md.glob('*_metadata.csv')]
# if len(new_md) == 1:
# adjusted_metadata.append(new_md[0])
# else:
# raise ValueError(f'{md} must contain exactly 1 *_metadata files to be pointed to by parser.')
# metadata = adjusted_metadata[0]
# if power levelling is happening, plot a summary of that
dr = ExistingRecord(metadata)
d_full = dr.batch_read()
try:
ep = ExptParameters(dr.metadata['config_file'], dr.metadata['settings_file'])
except FileNotFoundError:
newdir = dirname(metadata)
_config_file = join(newdir, basename(dr.metadata['config_file']))
_run_settings_file = join(newdir, basename(dr.metadata['settings_file']))
ep = ExptParameters(_config_file, _run_settings_file)
if time_window is not None:
raise NotImplementedError(
"Haven't added time windowing, zoom in to full plot for now."
)
cmm = signal_config
if cmm is None:
cmm = ep.config['signal_config']
time_coeffs = {'hrs': 1 / 3600, 'min': 1 / 60, 's': 1}
tcoeff = time_coeffs[time_units]
fig_ts = None
if plot_p_est:
fig_ts, ax_ts = plt.subplots(1, 1)
for ai, sensor in enumerate(dict(cmm)):
try:
# the e mapping corresponds to the thermopile voltage, it
# doesnt have a power estimate
pest = d_full[microrunner.format_pmeter_est_column((sensor))]
ax_ts.plot(
pest.t * tcoeff, punit(pest.values, origin='W'), 'o-', label=sensor
)
except KeyError:
print(
f'Missing Estimated Signal power from {sensor}. Possibly from an older version of the runner.'
)
ax_ts.set_ylabel(f'Estimated Metered Power {power_units}')
ax_ts.set_xlabel('Time (' + time_units + ')')
ax_ts.legend(loc='best')
fig_ts.suptitle('Estimated Metered Power of Sensors')
fig_ts.tight_layout()
# otherwise, plot each sensors raw time series in a seperate window
sensor_figs = []
if plot_sensor_raw:
for ai, sensor in enumerate(dict(cmm)):
sensor_map = cmm[sensor]
sensor_cols = {}
for k, v in dict(sensor_map).items():
try:
sensor_cols.update({k: v['column']})
except (TypeError, KeyError):
pass
N = len(sensor_cols)
fig_i, axs_i = plt.subplots(N, 1, sharex=True)
if N == 1:
axs_i = [axs_i]
fig_i.suptitle(f'{sensor} Raw Data')
for i, sc in enumerate(sensor_cols):
try:
ts = d_full[sensor_cols[sc]]
axs_i[i].plot(ts.t * tcoeff, ts.values, 'o-', ds='steps-post')
axs_i[i].set_ylabel(sensor_cols[sc])
except KeyError as e:
msg = f'{sensor_cols[sc]} not in data record, are the input_signals for {sensor} correct? maybe one of {list(d_full.keys())}'
raise KeyError(msg) from e
axs_i[-1].set_xlabel('Time (' + time_units + ')')
sensor_figs.append(fig_i)
return fig_ts, *sensor_figs
def run_gui(
output_dir: Folder,
configs: list[Path],
settings: list[Path],
sensor_master_list: Path,
name: str = 'rfsweep',
no_confirm: bool = False,
dry_run: bool = False,
repeats: int = 1,
):
"""
RF Sweep GUI runner.
Parameters
----------
output_dir : Folder
Directory to output from. The default is the current working directory.
configs : list[Path]
Configuration powers to set up instruments and other measurement
settings.
settings : list[Path]
Settings file(s) for frequency points and power levels.
sensor_master_list : Path
Master list of sensor information for sanity checking, should conform
to RFSensorMasterList spec.
name : str, optional
Measurement name. The default is 'rfsweep'.
no_confirm : bool, optional
Skip confirming instrument ID strings. The default is False.
dry_run : bool, optional
Try to load the settings, and do some validation. The default is False.
repeats : int, optional
Number of repeats to perform on all settings files. The default is 1.
Returns
-------
None.
"""
commands = [
'rfsweep',
'run',
f'"{Path(output_dir).resolve()}"',
]
for c in configs:
commands += ['--configs', f'"{Path(c).resolve()}"']
for s in settings:
commands += ['--settings', f'"{Path(s).resolve()}"']
commands += ['--sensor-master-list', f'"{Path(sensor_master_list).resolve()}"']
commands += ['--name', name]
if no_confirm:
commands.append('--no-confirm')
if dry_run:
commands.append('--dry-run')
commands += ['--repeats', f'{repeats}']
clitools.ucal_cli(commands)
@click.command(name='run')
@click.argument('output_dir', type=Path)
@click.option('--repeats', type=int, default=1)
@click.option('--configs', '-c', type=Path, required=True, multiple=True)
@click.option('--settings', '-s', type=Path, required=True, multiple=True)
@click.option('--sensor-master-list', '-m', type=Path, required=True)
@click.option('--name', '-n', type=str)
@click.option('--no-confirm', is_flag=True, default=False)
@click.option('--dry-run', is_flag=True, default=False)
def _run_cli(*args, **kwargs):
"""
Interface for the RF sweep run function.
"""
# click inputs tuples, i need lists here.
kwargs['configs'] = list(kwargs['configs'])
kwargs['settings'] = list(kwargs['settings'])
print(args)
print(kwargs)
return run(*args, **kwargs)
[docs]
def run(
output_dir: str,
repeats: int,
configs: list[Path],
settings: list[Path],
sensor_master_list: configs.RFSensorMasterList,
name: str = 'rf_sweep',
no_confirm: bool = False,
dry_run: bool = False,
):
"""
Run a microcalorimetry RF Sweep experiment.
Parameters
----------
output_directory : Path, optional
Directory to output from. The default is None.
repeats : int, optional
Number of repeats to perform on all settings files. The default is None.
configs : list[Path], optional
Config files for experiment settings / instruments. The default is None.
settings : list[Path], optional
Run settings files, looped over for experiment. The default is None. Each
index i of a configs file corresponds to a index i of the configs file.
sensor_master_list : configs.RFSensorMasterList,
Master list of sensors and resitance/sensitivity values to use for
sanity checking config files.
name : str, optional
Name of measurement. The default is None.
no_confirm : bool, optional
If True, skips any confirmations that may be asked when starting
the run.
dry_run : bool, optional
If true, will try to load the configurations without actually running
anything to do a dry-check - can be used to validate some basic type validation
of the configuration.
"""
priority = [0] * len(configs)
if isinstance(settings, str) or isinstance(settings, Path):
settings = [settings]
for i in range(repeats):
for setting in settings:
if not dry_run:
output_dir = new_dir(output_dir, name) + r'//'
with microrunner.MicrocalorimeterRunner(
configs,
setting,
output_dir,
sensor_master_list,
priority,
no_confirm=no_confirm,
) as runner:
# opens visa resources for every instrument
runner.initialize_instruments()
runner.start_monitor_mode()
while not runner.done:
runner.iterate()
# print(time.time() - time_0)
runner.output()
# for dry run, just try to initialize all the config files.
# and don't do anythin with them.
else:
output_dir = new_dir(output_dir, name) + r'//'
microrunner.MicrocalorimeterRunner(
configs,
setting,
output_dir,
sensor_master_list,
priority,
dry_run=True,
)
_run_cli = clitools.format_from_npdoc(run)(_run_cli)
@click.command(name='parse')
@click.argument('metadata', type=Path, nargs=-1)
@click.option('--output-file', '-o', type=Path)
@click.option('--show-plots', is_flag=True)
@click.option('--save-plots', type=Path)
@click.option('--plot-ext', type=str)
def _parse_cli(
*args,
show_plots: bool = False,
output_file: Path = None,
save_plots: Path = None,
plot_ext: str = '.png',
**kwargs,
):
"""
Interface for the command line for parsing DC sweeps.
Parameters
----------
show_plots : bool, optional
If true, shows the plots in a gui and freezes the terminal, by default False.
output_file : Path, optional
If provided, outputs any saveable objects to an HDF5 file, by default None.
save_plots : Path, optional
If provided, saves plots to this directory. The default is None.
plot_ext : str, optional
File extension to save plots as. The default is '.png.'.
Returns
-------
_type_
_description_
"""
kwargs['make_plots'] = bool(show_plots or save_plots)
print('Parse RFSWEEP from CLI:')
full_output = dict(
args=[str(a) for a in args],
show_plots=show_plots,
output_file=str(output_file),
save_plots=str(save_plots),
plot_ext=plot_ext,
**{k: str(v) for k, v in kwargs.items()},
)
print(json.dumps(full_output, indent=True))
outputs = clitools.run_and_show_plots(
parse,
*args,
show_plots=show_plots,
save_plots=save_plots,
plot_ext=plot_ext,
**kwargs,
)
if output_file:
clitools.save_saveable_objects(outputs[0], output_file=output_file)
return outputs
[docs]
def parse(
metadata: list[Path],
analysis_config: configs.RFSweepParserConfig = None,
verbose: bool = False,
make_plots: bool = False,
plot_segments_analysis: list[int] = [0, -1],
plot_all_segments_analysis: bool = False,
dataframe_results: Path = None,
format_matlab: Path = None,
) -> tuple[dict[RMEMeas], list[plt.Figure]]:
"""
Parse a microccalorimeter run to produce data with uncertainties.
Parameters
----------
metadata : list[Path]
Path to metadata file for experient. Can be multiples.
analysis_config : RFSweepParserConfig, optional
Path to sensor configuration file, overrides any sensor configuration in the metadata if provided.
verbose : bool, optional
If True, prints info about the analysis. Default is True
make_plots : bool, optional
Makes plots if True.
plot_segments_analysis : list[int]
What segment of each run to plot.
plot_all_segments_analysis : bool
If True, when making plots plot every segments
analysis.
dataframe_results : Path, optional
If provided, saves a csv of intermediate calculated values.
format_matlab : Path, optional
If provided, saves a matlab version of the output results.
Returns
-------
parsed_rf : dict[RMEMeas]
Dictionairy of RFSweep datasets in a parsed RF sweep configuration.
figures : list[plt.Figure]
Dictionairy of RFSweep datasets in a parsed RF sweep configuration.
"""
# wrap propagator around functions
basicprop = RMEProp(
sensitivity=True,
montecarlo_sims=0,
)
zeta_dcsub = basicprop.propagate(rfpower.zeta_dcsub)
zeta_general = basicprop.propagate(rfpower.zeta_general)
dc_sub = basicprop.propagate(rfpower.dc_substituted_power)
openloope_te_power = basicprop.propagate(rfpower.openloop_thermoelectric_power)
if analysis_config is None:
analysis_config = {}
else:
analysis_config = configs.load_config(analysis_config)
# set up parser
metadata_dict = {}
figures = []
if type(metadata) is not list:
metadata = [metadata]
# if a folder is pointed to, use a filed with _metadata.csv
# in the name as the metadata file, so folders can be pointed to
adjusted_metadata = []
for md in metadata:
md = Path(md)
if not md.exists():
raise FileExistsError(f'{md} doesnt exist')
elif md.is_file():
adjusted_metadata.append(md)
# assume folders contain a single run
# 1 metadata file
elif md.is_dir():
new_md = [f for f in md.glob('*_metadata.csv')]
if len(new_md) == 1:
adjusted_metadata.append(new_md[0])
else:
raise ValueError(
f'{md} must contain exactly 1 *_metadata files to be pointed to by parser.'
)
metadata = adjusted_metadata
runs = []
for metadata_path in metadata:
run_dir = Path(metadata_path).parent
run_file = Path(metadata_path).name
run = microparser.NewTypeRun(
str(run_dir) + '/', run_dir / run_file, run_file, **analysis_config
)
run.load()
run.analyze()
metadata_dict.update({str(k): v for k, v in run.expt.config.items()})
# make detailed analysis plots
if make_plots:
for signal, analyzer in run.analyzers.items():
try:
analyzer.plot_analysis
except AttributeError:
print(f'{signal} {analyzer} has no plot analysis method. Skipping')
continue
for si, segment in enumerate(run.segments):
called_out = si in plot_segments_analysis
is_end = si == len(run.segments) - 1 and (
-1 in plot_segments_analysis
)
if called_out or is_end or plot_all_segments_analysis:
# plot the analysis for a single step
figure = analyzer.plot_analysis(segment)
figure.suptitle(
f'{signal} signal \n run {Path(metadata_path).parent.name} ; segment {si}'
)
figures.append(figure)
runs.append(run)
# use the last signal config
signal_config = configs.RFSweepSignalConfig(runs[-1].parsed_config['signal_config'])
# format data output into rmellipse objects
c = microparser.Campaign(runs, Path.cwd(), 'rfsweep')
# generate noise plots
if make_plots:
# if make_plots:
data_df = c.output_segments(fmt_for='pandas')
for signal, sconfig in signal_config.items():
input_signals = sconfig['input_signals']
if isinstance(input_signals, str):
input_signals = [input_signals]
for ins in input_signals:
# plot the deviation of the RF on value as a function
# of the step number
fig, ax = plt.subplots(2, 1)
try:
column = sconfig[ins]['column']
fig.suptitle(f'{column} RF On standard deviation')
for mode in ['on']:
dev = data_df[f'{column}_{mode}_dev']
mean = data_df[f'{column}_{mode}']
ppm = dev / mean * 1e6
ax[0].plot(dev, 'o')
ax[1].set_xlabel('Step Number')
ax[1].plot(ppm, 'o')
ax[0].set_ylabel(f'Column RF On std ({sconfig[ins]["units"]})')
ax[1].set_ylabel('Column RF On std (ppm)')
figures.append(fig)
except Exception as e:
plt.close(fig)
if verbose:
print(
f'Encountered error plotting signal {signal}:{ins} std \n {type(e)}: {e}'
)
fig, ax = plt.subplots(2, 1, figsize=(8, 8))
try:
column = sconfig[ins]['column']
fig.suptitle(f'{signal} \n {column} standard deviation')
ax[1].set_xlabel('Before (left) and After (right)')
ax[0].set_ylabel(f'STD of {ins} ({sconfig[ins]["units"]})')
ax[1].set_ylabel(f'STD of {ins} (ppm)')
for ir, run in enumerate(c.run_list):
for iseg, segment in enumerate(run.segments):
i_off = segment.results[f'{column}_off_i']
f_off = segment.results[f'{column}_off_f']
i_off_dev = segment.results[f'{column}_off_i_dev']
f_off_dev = segment.results[f'{column}_off_f_dev']
label = f'{run.name} : segment {iseg}'
ax[0].plot(
[0, 1], [i_off_dev, f_off_dev], 'o--', label=label
)
ax[1].plot(
[0, 1],
[i_off_dev / i_off * 1e6, f_off_dev / f_off * 1e6],
'o--',
)
ax[0].legend(
bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0
)
fig.tight_layout()
figures.append(fig)
except Exception as e:
plt.close(fig)
if verbose:
print(
f'Encountered error plotting {signal}:{ins} segment off \n {type(e)}: {e}'
)
# Plot if the source think it is in
# compression
compression = {}
freq = np.array([])
try:
for run in c.run_list:
for segment in run.segments:
if segment.results['complete']:
for step in segment.steps:
signals = [
k
for k in step.raw_data.keys()
if fnmatch(k, '*power_signal*')
and 'timestamp' not in k
and 'calorimeter' not in k
]
source = step.raw_data['RF_source_power_signal (W)']
source = source[2:].astype(float)
pow_signals = {
s: step.raw_data[s]
for s in signals
if 'source' not in s
}
# calculate compression
good_freq = False
for k, sig in pow_signals.items():
sig = sig[2:].astype(float)
max_i = np.argmax(sig)
sig_dB_change = 10 * np.log10(sig[max_i] / sig)
# closest to 1dB of change
sig_1dB_closest_i = np.argmin(abs(sig_dB_change - 1))
sig_1dB_closest = sig_dB_change[sig_1dB_closest_i]
# if the smalles changes was < 0.4, skip it
# is that a good number? idk
if sig_1dB_closest < 0.5:
continue
# estimate the compression
max_source = source[max_i]
source_1dB_closest = source[sig_1dB_closest_i]
compress_i = sig_1dB_closest - 10 * np.log10(
max_source / source_1dB_closest
)
# print(k.split('_power_signal')[0], sig_1dB_closest)
try:
compression[k] = np.append(
compression[k], compress_i
)
except KeyError:
compression[k] = np.array([compress_i])
good_freq = True
if good_freq:
freq = np.append(freq, step.frequency)
# print(k, step.frequency, compress_i)
fig, ax = plt.subplots(1, 1)
for k in compression:
ax.plot(
freq,
compression[k],
'o',
label=f'Measured By: {k.split("_power_signal")[0]} Est.',
)
ax.axhline(0.4, color='r', ls='--', lw=3, label='Limit')
ax.legend(loc='best')
ax.set_xlabel('Frequency (GHz)')
ax.set_ylabel('Compression (dB)')
ax.set_title('Source Compression Check')
# plt.show()
figures.append(fig)
except Exception as e:
print(f"Warning: Couldn't estimate compression for : {e}")
...
# output a the dataframe results
if dataframe_results:
df = c.output_dataframe()
df.to_csv(dataframe_results)
# format fata for rmellipse calculataions
data = c.output_segments(fmt_for='rmellipse', include_specs=True)
ep = run.expt.config
# variablize the column names to make it a bit easier
assert signal_config['calorimeter_power']['type'] == 'thermoelectric'
e_col = signal_config['calorimeter_power']['e']['column']
# do a little bit of post processing to calculate power
# This calculates the inferred power flowing through the thermopile
outputs = {}
cal_coeffs = configs.ThermoelectricFitCoefficients(
signal_config['calorimeter_power']['coeffs']
).load()
try:
p_of_e_calorimeter = cal_coeffs.attrs['p_of_e']
E_on = openloope_te_power(
cal_coeffs, data.sel(col=e_col + '_on'), p_of_e=p_of_e_calorimeter
)
E_off = openloope_te_power(
cal_coeffs, data.sel(col=e_col + '_off'), p_of_e=p_of_e_calorimeter
)
outputs.update({'E_on': E_on, 'E_off': E_off})
except AttributeError:
print(
"Warning: Can't compute power inferred by calorimeter, skipping. Likely a full data model of the sensitivity wasn't supplied."
)
outputs.update(
{'e_on': data.sel(col=e_col + '_on'), 'e_off': data.sel(col=e_col + '_off')}
)
# this part of the code is trying to turn the voltage/current
# measurements into power measurements of the sensor inside
# calorimeter and/or on the sidearm.
# this should be a class insttead of a bunch of if statements
# if there is a voltage column and no thermoelectric column
# it's a dc substitution sensor inside the calorimeter (port 2)
# so use that.
if signal_config['DUT_power']['type'] == 'bolometer':
# possible no current was provided if measured on a type 4
s_v_col = signal_config['DUT_power']['vdc']['column']
if 'idc' in signal_config['DUT_power']:
s_i_col = signal_config['DUT_power']['idc']['column']
I_on = (data.sel(col=s_i_col + '_on'),)
I_off_slow = (data.sel(col=s_i_col + '_off_slow'),)
I_off_fast = (data.sel(col=s_i_col + '_off_fast'),)
p2dc_on = data.sel(col=s_v_col + '_on') * I_on
p2dc_off_slow = data.sel(col=s_v_col + '_off_slow') * I_off_slow
p2dc_off_fast = data.sel(col=s_v_col + '_off_fast') * I_off_fast
else:
s_resistance = signal_config['DUT_power']['resistance']
I_on = None
I_off_slow = None
I_off_fast = None
p2dc_on = data.sel(col=s_v_col + '_on') ** 2 / s_resistance
p2dc_off_slow = data.sel(col=s_v_col + '_off_slow') ** 2 / s_resistance
p2dc_off_fast = data.sel(col=s_v_col + '_off_fast') ** 2 / s_resistance
p2_fast = dc_sub(
data.sel(col=s_v_col + '_on'),
data.sel(col=s_v_col + '_off_fast'),
I_on=I_on,
I_off=I_off_fast,
R=s_resistance,
)
p2_slow = dc_sub(
data.sel(col=s_v_col + '_on'),
data.sel(col=s_v_col + '_off_slow'),
I_on=I_on,
I_off=I_off_slow,
R=s_resistance,
)
zeta = zeta_dcsub(
data.sel(col=e_col + '_on'),
data.sel(col=e_col + '_off'),
data.sel(col=s_v_col + '_on'),
data.sel(col=s_v_col + '_off_fast'),
data.sel(col=s_v_col + '_off_slow'),
I_on,
I_off_fast,
I_off_slow,
)
outputs.update(
{
'p2_fast': p2_fast,
'p2_slow': p2_slow,
'p2dc_on': p2dc_on,
'p2dc_off_slow': p2dc_off_slow,
'p2dc_off_fast': p2dc_off_fast,
'zeta': zeta,
}
)
# this is a thermoelectric power sensor, so lets do that
elif signal_config['DUT_power']['type'] == 'thermoelectric':
# get sensor coefficients
# and the calorimeter coefficients
s_coeffs = configs.ThermoelectricFitCoefficients(
signal_config['DUT_power']['coeffs']
).load()
s_e_col = signal_config['DUT_power']['e']['column']
# check if the slope of the sensor equals the slope of the
# coefficients, if not then the thermoelectric sensor's RF
# side has a negative polarity to the srf side and the voltage
# measured needs to be multiplied by -1.
s_e_const = 1.0
measured_slope_sign = np.sign(data.nom.sel(col=s_e_col + '_on')[0])
coeff_sign = np.sign(s_coeffs.nom.sel(deg=1))
if measured_slope_sign != coeff_sign:
s_e_const = -1.0
p2_on = openloope_te_power(
s_coeffs,
s_e_const * data.sel(col=s_e_col + '_on'),
p_of_e=s_coeffs.attrs['p_of_e'],
)
p2_off = openloope_te_power(
s_coeffs,
data.sel(col=s_e_col + '_off'),
p_of_e=cal_coeffs.attrs['p_of_e'],
)
p2_slow = p2_on - p2_off
zeta = zeta_general(
data.sel(col=e_col + '_on'),
data.sel(col=e_col + '_off'),
cal_coeffs,
cal_coeffs.attrs['p_of_e'],
p2_slow,
)
outputs.update(
{
'e_p2_off': data.sel(col=s_e_col + '_off'),
'e_p2_on': data.sel(col=s_e_col + '_on'),
'p2_fast': p2_slow,
'p2_slow': p2_slow,
'zeta': zeta,
}
)
# dummy sensors don't do anything
elif signal_config['DUT_power']['type'] == 'special':
pass
else:
msg = f'{signal_config["DUT_power"]["type"]} not recognized'
raise ValueError(msg)
# now calculate the relevant powers for the sidearm
# it may n ot be peresent, check for it in the signal config
# first.
try:
signal_config['monitor_power']
calculate_monitor = True
except KeyError:
calculate_monitor = False
if calculate_monitor and signal_config['monitor_power']['type'] == 'bolometer':
s3_v_col = signal_config['monitor_power']['columns']['v_col']
if 'idc' in signal_config['monitor_power']:
s3_i_col = signal_config['monitor_power']['columns']['i_col']
I3_on = (data.sel(col=s3_i_col + '_on'),)
I3_off_slow = (data.sel(col=s3_i_col + '_off_slow'),)
I3_off_fast = (data.sel(col=s3_i_col + '_off_fast'),)
p3_fast = dc_sub(
data.sel(col=s3_v_col + '_on'),
data.sel(col=s3_v_col + '_off_fast'),
I_on=I3_on,
I_off=I3_off_fast,
)
p3_slow = dc_sub(
data.sel(col=s3_v_col + '_on'),
data.sel(col=s3_v_col + '_off_slow'),
I_on=I3_on,
I_off=I3_off_slow,
)
else:
s3_resistance = signal_config['monitor_power']['resistance']
p3_fast = dc_sub(
data.sel(col=s3_v_col + '_on'),
data.sel(col=s3_v_col + '_off_fast'),
)
p3_slow = dc_sub(
data.sel(col=s3_v_col + '_on'),
data.sel(col=s3_v_col + '_off_slow'),
R=s3_resistance,
)
outputs.update(
{
'p3_fast': p3_fast,
'p3_slow': p3_slow,
}
)
# no distinguishing between a fast and slow analysis
# for a commercial power meter
elif calculate_monitor and signal_config['monitor_power']['type'] == 'commercial':
s3_p_col = signal_config['monitor_power']['power']['column']
p3_slow = data.sel(col=s3_p_col + '_on')
p3_fast = data.sel(col=s3_p_col + '_on')
outputs.update(
{
'p3_fast': p3_fast,
'p3_slow': p3_slow,
}
)
outputs.update()
elif calculate_monitor and signal_config['monitor_power']['type'] == 'special':
pass
elif calculate_monitor:
msg = f'{signal_config["monitor_power"]["type"]} not recognized'
raise ValueError(msg)
# plot parsed components
if make_plots:
plot_outputs = {k: v for k, v in outputs.items() if k in ['zeta']}
for k, v in plot_outputs.items():
fig, ax = plt.subplots(1, 1)
unc = v.stdunc().cov
ax.errorbar(v.nom.frequency, v.nom, yerr=unc, capsize=3, label='k=1', ls='')
ax.set_ylabel(k.replace('zeta', 'Uncorrected Eta'))
ax.set_xlabel('Frequency (GHz)')
sidearm_name = None
try:
sidearm_name = ep['measurement_description']['sidearm_name']
except KeyError:
pass
try:
fig.suptitle(
f'Internal mount: {ep["measurement_description"]["mount_name"]}, external mount: {sidearm_name} \n {k}'
)
except KeyError:
fig.suptitle(f'Parsed RF Sweep: {k}')
fig.tight_layout()
figures.append(fig)
return outputs, figures
_parse_cli = clitools.format_from_npdoc(parse)(_parse_cli)
def mean_last_of_point_dBm(signal, points, i: int, n_samples: int):
nearest = np.searchsorted(signal[0], points[0][i])
avg = float(
np.mean(
signal[1][:nearest][
abs(np.diff(signal[1][:nearest], append=signal[1][-1])) > 0
][-n_samples]
)
)
return 10 * np.log10(avg) + 30
[docs]
def runlist_from_loss(
metadata: Path,
output_dir: Path = Path('.'),
level_to: str = 'DUT_power',
DUT_power_max_dBm: float = 10,
monitor_power_max_dBm: float = 0,
max_source_dBm: float = 15,
output_name: str = 'runlist.csv',
n_samples: int = 1,
frequencies: np.ndarray[float] = None,
segment_size: int = 10,
off_step_length: int = 2,
safety_backoff_dBm: float = 3,
) -> list[plt.Figure]:
"""
Generate a runlist from the approximate RF Loss of measurement signals.
Parameters
----------
metadata : Path
Path to measurement metadata.
output_dir : Path, optional
Directory to output runfiles. The default is Path('.').
level_to : str, optional
Which signal to level to. The default is 'DUT_power'.
DUT_power_max_dBm : float, optional
Maximum DUT power in dBm. The default is 10.
monitor_power_max_dBm : float, optional
Maxmium monitor power in dBm. The default is 0.
max_source_dBm : float, optional
Maxmimum allowed source value in dBm . The default is None.
output_name : str, optional
Name to output the file as The default is 'runlist.csv'.
n_samples : int, optional
Number of samples to average over for calculations. The default is 1.
frequencies : np.ndarray[float], optional
Frequency list to interpolate the RF loss values too and produce
the runlist. If None, uses the frequencies in the provided
measurement. The default is None.
segment_size : int, optional
Number of frequencie points per segment. The default is 5.
off_step_length : int, optional
How many steps each off period should be. Typically 2, the default
it 2.
safety_backoff_dBm : float, optional
Back off the start value by this amount to avoid over sourcing.
The levelling feature will converge to the correct value during a
measurement. The default is 3.0.
Returns
-------
figures : list[plt.Figure]
List of generated figures.
"""
dr = ExistingRecord(metadata)
frequencies = np.sort(frequencies)
# PARSE THE MEASURMENT
max_powers = {
'DUT_power_signal (W)': DUT_power_max_dBm,
'monitor_power_signal (W)': monitor_power_max_dBm,
'RF_source_power_signal (W)': max_source_dBm,
}
signal_columns = ['DUT_power_signal (W)']
if 'monitor_power_signal (W)' in dr.columns:
signal_columns.append('monitor_power_signal (W)')
print('Present signals ', signal_columns)
d_full = dr.batch_read(
['frequency', 'power_on', 'point_counter', 'RF_source_power_signal (W)']
+ signal_columns
)
src = d_full['RF_source_power_signal (W)']
points_dr = d_full['point_counter']
def frmt(*args):
args = [float(a) for a in args]
return '{:6.3f} | {:6.3f} | {:6.3f} | {:6.3f}'.format(*args)
# read in settings file
try:
settings = pd.read_csv(dr.metadata['settings_file'])
except FileNotFoundError:
try_file = Path(metadata).parent / Path(dr.metadata['settings_file']).name
settings = pd.read_csv(try_file)
# the data record counts the initial warm up as a point.
# experiment needs to be finished for this
if len(settings) + 1 != len(points_dr[1]):
print(
'Incomplete measurement passed to metadata. Meas list may be incomplete or have bad source settings on last frequency.'
)
# for each completed measurment
# change the initial source to the final source
points = (points_dr[0][1:], points_dr[1][1:])
new_list = {c: [] for c in settings.columns}
read_frequencies = np.array([], float)
losses = {s: np.array([], float) for s in signal_columns}
# build an RF loss table
for i, (tp, p) in enumerate(zip(points[0], points[1])):
# copy row
fset = settings.iloc[i]
this_avg = {}
this_loss = {}
# leave the zero rows alone, otherwise update initial source
if not all([fs == 0 for fs in fset]):
# go up a point to find end of source
if i < len(points[0]):
# go up a point to find end of source
src = mean_last_of_point_dBm(
d_full['RF_source_power_signal (W)'], points, i, n_samples
)
# this is silly, but finds the start of the next point
# with nearest, then looks for where the source changes state
# closest to that point, and averages the previous 7 samples
# to get the source value right before the fast off
# happened.
for signal_name in signal_columns:
this_avg[signal_name] = mean_last_of_point_dBm(
d_full[signal_name], points, i, n_samples
)
this_loss[signal_name] = src - this_avg[signal_name]
losses[signal_name] = np.append(
losses[signal_name], this_loss[signal_name]
)
read_frequencies = np.append(read_frequencies, fset.Frequency_GHz)
# interpolate losses to the reqeusted frequency grid
# use the measured frequencies by default
if frequencies is None:
frequencies = read_frequencies
interp_losses = {}
for signal_name in signal_columns:
interp_losses[signal_name] = _smallest_neighbour(
frequencies, read_frequencies, losses[signal_name]
)
figs = []
# make some plots
fig, ax = plt.subplots()
figs.append(fig)
ax.set_xlabel('Frequency (GHz)')
ax.set_ylabel('RF Loss (dBm)')
colors = cycle(['b', 'C1', 'm'])
for signal_name in signal_columns:
color = next(colors)
print(signal_name, interp_losses[signal_name])
ax.plot(
read_frequencies,
losses[signal_name],
'o',
color=color,
label=signal_name.replace('(W)', 'Measured'),
)
ax.plot(
frequencies,
interp_losses[signal_name],
'-',
color=color,
label=signal_name.replace('(W)', 'Interpolated'),
)
ax.legend(loc='best')
# calculate source power to achieve target power
for s in signal_columns:
if level_to in s:
level_to = s
initial_source = interp_losses[level_to] + max_powers[level_to] - safety_backoff_dBm
# calculate expected powers
expected_powers = {'RF_source_power_signal (W)': initial_source}
for signal_name in signal_columns:
expected_powers[signal_name] = initial_source - interp_losses[signal_name]
# plot the expected power levels
fig, ax = plt.subplots()
figs.append(fig)
ax.set_xlabel('Frequency (GHz)')
ax.set_ylabel('Expected Initial Power (dBm)')
for signal_name in signal_columns + ['RF_source_power_signal (W)']:
color = next(colors)
ax.plot(
frequencies,
expected_powers[signal_name],
color=color,
label=signal_name.replace('(W)', 'expected'),
)
ax.axhline(
max_powers[signal_name],
color=color,
ls='--',
label=signal_name.replace('(W)', 'limit'),
)
# signal_name = 'RF_source_power_signal (W)'
# color = next(colors)
# ax.plot(frequencies, initial_source, color = color, label = signal_name.replace('(W)','expected'))
# ax.axhline(max_powers[signal_name], color = color,ls = '--', label = signal_name.replace('(W)','limit'))
ax.legend(loc='best')
# check no maximums are exceeded
no_maximums = True
for signal_name in signal_columns + ['RF_source_power_signal (W)']:
if (expected_powers[signal_name] > max_powers[signal_name]).any():
no_maximums = False
print(
f'Maximum power exceeded for {signal_name}, no runlist will be generated'
)
# if no maximums hit, build a run list
# interleave the frequency points
initial_source = _interleave(initial_source)
frequencies = _interleave(frequencies)
if no_maximums:
output_df = {
'Frequency_GHz': [],
'Initial_source_power_dBm': [],
'Target_source_power_dBm': [],
'Source_power_limit_dBm': [],
}
for i, fi in enumerate(frequencies):
# insert zero rows
if i % segment_size == 0:
for n in output_df:
output_df[n] += [0] * off_step_length
output_df['Frequency_GHz'].append(fi)
output_df['Initial_source_power_dBm'].append(initial_source[i])
output_df['Target_source_power_dBm'].append(max_powers[level_to])
output_df['Source_power_limit_dBm'].append(
max_powers['RF_source_power_signal (W)']
)
# append with a zero row
for n in output_df:
output_df[n] += [0] * off_step_length
output_df = pd.DataFrame(output_df)
output_df.to_csv(Path(output_dir) / output_name, index=False)
return figs
def _interleave(a):
out = np.append(a[0 : len(a) : 2], a[1 : len(a) : 2])
assert len(out) == len(a)
return out
def _smallest_neighbour(x_interp: np.array, x: np.array, y: np.array):
"""
Picks the smallest neighbour of y when interpolating x_interp to x.
"""
# make sure input arrays are sorted
# and unique
sort_ind = np.argsort(x)
x = x[sort_ind]
y = y[sort_ind]
x, uniq_index = np.unique(x, return_index=True)
y = y[uniq_index]
# pick neighbour with smalles y value
out = np.zeros(len(x_interp))
for i, xi in enumerate(x_interp):
closest_i = np.argmin(abs(x_interp[i] - x))
f_closest = x[closest_i]
if f_closest < xi:
other_neighbour = closest_i + 1
else:
other_neighbour = closest_i - 1
ytest_1 = y[closest_i]
try:
ytest_2 = y[other_neighbour]
except IndexError:
ytest_2 = np.inf
out[i] = min(ytest_1, ytest_2)
return out
[docs]
def generate_settled_runlist(
metadata: Path,
analysis_config: configs.RFSweepParserConfig = None,
output_dir: Path = Path('.'),
output_name: str = None,
n_samples: int = 7,
):
"""
Generate a run list with settled source powers.
Copies the run settings (frequency list, target power, initial source
power, and source limit) and replaces the initial source power
with the settled value from the provided run.
This function does NOT check if the source was actually
settled, it just assumes it was right before power was turned off.
Check that your self by inspecting the dashboard.
Unfinshed runs can be provided, but they may provide bad settings for
the final points if the experiment never actually settled, and the
frequency list may be incomplete.
Parameters
----------
metadata : Path
Metadata file of output.
output_dir : Path, optional
Folder to output new file in. The default is the current directory.
output_name : str, optional
What to name new file. The default is the provided metadata name
+ '_settled_runlist.csv'
n_samples : int, optiona;
Number of samples to average for final source value.
"""
dr = ExistingRecord(metadata)
d_full = dr.batch_read(
['rf_power_setting', 'frequency', 'power_on', 'point_counter']
)
# get columns I care about
src = d_full['rf_power_setting']
points_dr = d_full['point_counter']
def frmt(*args):
args = [float(a) for a in args]
return '{:6.3f} | {:6.3f} | {:6.3f} | {:6.3f}'.format(*args)
# read in settings file
try:
settings = pd.read_csv(dr.metadata['settings_file'])
except FileNotFoundError:
try_file = Path(metadata).parent / Path(dr.metadata['settings_file']).name
settings = pd.read_csv(try_file)
# the data record counts the initial warm up as a point.
# experiment needs to be finished for this
if len(settings) + 1 != len(points_dr[1]):
print(
'Incomplete measurement passed to metadata. Meas list may be incomplete or have bad source settings on last frequency.'
)
# for each completed measurment
# change the initial source to the final source
points = (points_dr[0][1:], points_dr[1][1:])
new_list = {c: [] for c in settings.columns}
for i, (tp, p) in enumerate(zip(points[0], points[1])):
# copy row
fset = settings.iloc[i]
for c in settings.columns:
new_list[c].append(fset[c])
# leave the zero rows alone, otherwise update initial source
if not all([fs == 0 for fs in fset]):
# go up a point to find end of source
if i < len(points[0]):
# go up a point to find end of source
nearest = np.searchsorted(src[0], points[0][i])
# this is silly, but finds the start of the next point
# with nearest, then looks for where the source changes state
# closest to that point, and averages the previous 7 samples
# to get the source value right before the fast off
# happened.
new = float(
np.mean(
src[1][:nearest][
abs(np.diff(src[1][:nearest], append=src[1][-1])) > 0
][-n_samples]
)
)
new_list['Initial_source_power_dBm'][-1] = new
pass
line = frmt(*[fs[-1] for fs in new_list.values()])
print(line)
print('-' * len(line))
# save new dataframe
df = pd.DataFrame(new_list)
output_dir = Path(output_dir)
if output_name is None:
output_name = os.path.basename(dr.metadata['settings_file']).split('.')[0]
output_name += '_settled.csv'
df.to_csv(output_dir / output_name, index=False)