Performing calculations¶
Calculations can be performed and written to datasets after each scan point. To use calculations, create a model
for the calculation and register it with the scan with the name calc_model
(or simple assign self.calc_model
to the model instance in your scan). After each scan point, a method named calculate()
which you define in the
model will be called and passed the index of the current scan point. The calculate()
method must then return a
tuple of (value, error) which is the calculated value and it’s error. Datasets under the calc model’s namespace and
the mirror namespace will be mutated with these calculated values after each scan point. See
scans/heating_rate_scan.py
and lib/models/heating_rate.py
for examples.
from scan_framework.scans import *
from scan_framework.models import *
class CalculationScan(Scan1D, EnvExperiment):
def build(self):
super().build()
def prepare(self):
self.register_model(RsbScanModel(), measurement='bsb')
self.register_model(BsbScanModel(), measurement='rsb')
self.register_model(BkgdScanModel(), measurement='bkgd')
self.register_model(NbarScanModel(), calculation='nbar', fit='nbar', mutate_plot=True)
class RsbScanModel(TimeModel):
""" Heating Rate RSB Model"""
# datasets
namespace = 'heating_rate.rsb.%type' #: Dataset namespace
mirror = False
persist = True
broadcast = True
# stats
enable_histograms = True
to_scan = 'wait_time'
# fits
main_fit = None
fit_function = fit_functions.Exp
@property
def simulation_args(self):
if self.to_scan == 'wait_time':
return {
'A': -5,
'b': -1/(200*us),
'y0': 7,
}
if self.to_scan == 'rf_amplitude':
return {
'A': -5,
'b': -1,
'y0': 7,
}
# plots
@property
def x_label(self):
if self.to_scan == 'wait_time':
return 'wait time'
if self.to_scan == 'rf_amplitude':
return 'rf amplitude'
@property
def x_units(self):
if self.to_scan == 'wait_time':
return 'us'
if self.to_scan == 'rf_amplitude':
return ''
@property
def x_scale(self):
if self.to_scan == 'wait_time':
return 1*us
if self.to_scan == 'rf_amplitude':
return 1
@property
def plot_title(self):
if self.to_scan == 'wait_time':
return 'rsb vs wait time'
if self.to_scan == 'rf_amplitude':
return 'rsb vs rf amplitude'
class BsbScanModel(TimeModel):
""" Heating Rate BSB Data Model """
# datasets
namespace = 'heating_rate.bsb.%type' #: Dataset namespace
persist = True
broadcast = True
mirror = False
to_scan = 'wait_time'
# stats
enable_histograms = True
# fits
fit_function = fit_functions.Exp
main_fit = None
scan = 'wait_time'
@property
def simulation_args(self):
if self.to_scan == 'wait_time':
return {
'A': 1,
'b': -1 / (200 * us),
'y0': 7,
}
if self.to_scan == 'rf_amplitude':
return {
'A': 1,
'b': -1,
'y0': 7,
}
@property
def x_label(self):
if self.to_scan == 'wait_time':
return 'wait time'
if self.to_scan == 'rf_amplitude':
return 'rf amplitude'
@property
def x_units(self):
if self.to_scan == 'wait_time':
return 'us'
if self.to_scan == 'rf_amplitude':
return ''
@property
def x_scale(self):
if self.to_scan == 'wait_time':
return 1 * us
if self.to_scan == 'rf_amplitude':
return 1
@property
def plot_title(self):
if self.to_scan == 'wait_time':
return 'bsb'
if self.to_scan == 'rf_amplitude':
return 'bsb vs rf amplitude'
class BkgdScanModel(TimeModel):
""" Heating Rate BSB Data Model """
# datasets
namespace = 'heating_rate.bkgd' #: Dataset namespace
persist = True
broadcast = True
mirror = False
# stats
enable_histograms = True
# fits
fit_function = fit_functions.Line
main_fit = None
simulation_args = {
'slope': 0,
'y_intercept': 0
}
# plots
x_units = 'us'
x_scale = 1*us
plot_title = 'bkgd'
class NbarScanModel(TimeModel):
"""Heating Rate Temp Model"""
# datasets
namespace = 'heating_rate.nbar.%type' #: Dataset namespace
mirror = True
persist = True
broadcast = True
to_scan = 'wait_time'
# stats
enable_histograms = False
# fits
fit_function = fit_functions.Line
fit_use_yerr = True
main_fit = 'heating_rate'
fit_map = {
'slope': 'heating_rate'
}
# plots
y_label = 'n bar'
scan = 'wait_time'
@property
def x_label(self):
if self.to_scan == 'wait_time':
return 'wait time'
if self.to_scan == 'rf_amplitude':
return 'rf amplitude'
if self.to_scan == 'frequency':
return 'rf frequency'
@property
def x_units(self):
if self.to_scan == 'wait_time':
return 'us'
if self.to_scan == 'rf_amplitude':
return ''
if self.to_scan == 'frequency':
return 'MHz'
@property
def x_scale(self):
if self.to_scan == 'wait_time':
return 1 * us
if self.to_scan == 'rf_amplitude':
return 1
if self.to_scan == 'frequency':
return 1 * MHz
@property
def plot_title(self):
if self.to_scan == 'wait_time':
return 'nbar vs wait time'
if self.to_scan == 'rf_amplitude':
return 'nbar vs rf amplitude'
if self.to_scan == 'frequency':
return 'nbar vs rf frequency'
def build(self, rsb_model, bsb_model, bkgd_model, **kwargs):
self.rsb_model = rsb_model
self.bsb_model = bsb_model
self.bkgd_model = bkgd_model
super().build(**kwargs)
def load_datasets(self):
"""Load previously collected data into local storage within each model"""
super().load_datasets()
self.bsb_model.load_datasets()
self.rsb_model.load_datasets()
self.bkgd_model.load_datasets()
def calculate(self, i_point):
"""Calculate nbar and the error in nbar at the given scan point index 'i'"""
bsb = self.bsb_model.stat_model.means[i_point]
bsb_error = self.bsb_model.stat_model.errors[i_point]
rsb = self.rsb_model.stat_model.means[i_point]
rsb_error = self.rsb_model.stat_model.errors[i_point]
avgBkgd = np.nanmean(self.bkgd_model.stat_model.means)
# old calculation
#ratio = (rsb - avgBkgd) / (bsb - avgBkgd)
#nbar = ratio / (1 - ratio)
# new calculation (uses an inverted ratio)
ratio = (bsb - avgBkgd) / (rsb - avgBkgd)
if isnan(ratio):
nbar = 0
error = 0
else:
nbar = 1 / (ratio - 1)
error = (ratio / (ratio - 1)**2) * ((rsb_error / rsb)**2 + (bsb_error / bsb)**2)**.5
return nbar, error