Source code for scan_framework.scans.extensions

# Extensions to the base scan class which provide commonly used features.

from artiq.language.core import *
from .scan import *
from scan_framework.scans.loading_interface import *
from scipy.fftpack import fft


[docs]class TimeScan(Scan): """Scan class for scanning over time values."""
[docs] def scan_arguments(self, times={}, npasses={}, nrepeats={}, nbins={}, fit_options={}, guesses=False): # assign default values for scan GUI arguments for k, v in {'start': 0, 'stop': 10 * us, 'npoints': 50, 'unit': 'us', 'scale': 1 * us, 'global_step': 10 * us, 'ndecimals': 3}.items(): times.setdefault(k, v) # create core scan arguments super().scan_arguments(npasses=npasses, nrepeats=nrepeats, nbins=nbins, fit_options=fit_options, guesses=guesses) # create scan arguments for time scans self.setattr_argument('times', Scannable( default=RangeScan( start=times['start'], stop=times['stop'], npoints=times['npoints'] ), unit=times['unit'], scale=times['scale'], global_step=times['global_step'], ndecimals=times['ndecimals'] ), group='Scan Range')
[docs] def get_scan_points(self): return self.times
[docs]class FreqScan(Scan): """Scan class for scanning over frequency values.""" _freq_center_manual = None
[docs] def scan_arguments(self, frequencies={}, npasses={}, nrepeats={}, nbins={}, fit_options={}, guesses=False): # assign default values for scan GUI arguments for k, v in {'start': -5*MHz, 'stop': 5*MHz, 'npoints': 50, 'unit': 'MHz', 'scale': 1 * MHz, 'ndecimals':4}.items(): frequencies.setdefault(k, v) # crate core scan arguments super().scan_arguments(npasses=npasses, nrepeats=nrepeats, nbins=nbins, fit_options=fit_options, guesses=guesses) # create scan arguments for frequency scans group = 'Scan Range' self.setattr_argument('frequencies', Scannable( default=RangeScan( start=frequencies['start'], stop=frequencies['stop'], npoints=frequencies['npoints'] ), unit=frequencies['unit'], scale=frequencies['scale'], ndecimals=frequencies['ndecimals'] ), group=group)
[docs] def get_scan_points(self): return self.frequencies
[docs]class TimeFreqScan(Scan): """Allows a scan experiment to scan over either a set of time values or a set of frequency values.""" frequency_center = None # default must be None so it can be overriden in the scan pulse_time = None # default must be None so it can be overriden in the scan enable_auto_tracking = True
[docs] def scan_arguments(self, times={}, frequencies={}, frequency_center={}, pulse_time={}, npasses={}, nrepeats={}, nbins={}, fit_options={}, guesses=False): # assign default values for scan GUI arguments for k,v in {'start': 0, 'stop': 10 * us, 'npoints': 50, 'unit': 'us', 'scale': 1 * us, 'global_step': 10 * us, 'ndecimals':3}.items(): times.setdefault(k, v) for k, v in {'start': -5*MHz, 'stop': 5*MHz, 'npoints': 50, 'unit': 'MHz', 'scale': 1 * MHz, 'ndecimals':4}.items(): frequencies.setdefault(k, v) for k, v in {'unit': 'MHz', 'scale': MHz, 'default': 100 * MHz, 'ndecimals': 4}.items(): frequency_center.setdefault(k, v) for k, v in {'unit': 'us', 'scale': us, 'default': 100 * us, 'ndecimals': 4}.items(): pulse_time.setdefault(k, v) # create GUI argument to select if scan is a time or a frequency scan self.setattr_argument('scan', EnumerationValue(['frequency', 'time'], default='frequency')) # create core scan arguments super().scan_arguments(npasses=npasses, nrepeats=nrepeats, nbins=nbins, fit_options=fit_options, guesses=guesses) # create remaining scan arguments for time and frequency scans if frequencies is not False: self.setattr_argument('frequencies', Scannable( default=RangeScan( start=frequencies['start'], stop=frequencies['stop'], npoints=frequencies['npoints'] ), unit=frequencies['unit'], scale=frequencies['scale'], ndecimals=frequencies['ndecimals'] ), group='Scan Range') # auto tracking is disabled... # ask user for the frequency center if not self.enable_auto_tracking: if self.frequency_center is None: if frequency_center != False: self.setattr_argument('frequency_center', NumberValue(**frequency_center), group='Scan Range') if self.pulse_time is None: if pulse_time != False: self.setattr_argument('pulse_time', NumberValue(**pulse_time), group='Scan Range') self.setattr_argument('times', Scannable( default=RangeScan( start=times['start'], stop=times['stop'], npoints=times['npoints'] ), unit=times['unit'], scale=times['scale'], global_step=times['global_step'], ndecimals=times['ndecimals'] ), group='Scan Range')
[docs] def _attach_models(self): self.__bind_models() if not self.fit_only: self.__load_frequency_center() # tell scan to offset x values by the frequency_center # this is done even when not auto-tracking in case the user has manually set frequency_center if self.scan == 'frequency' and self.frequency_center is not None: self._x_offset = self.frequency_center self._logger.debug("set _x_offset to frequency_center value of {0}".format(self.frequency_center))
def __bind_models(self): # bind each registered model to the scan type (frequency or time) for entry in self._model_registry: # tell the model if this a frequency or time scan entry['model'].type = self.scan # bind model namespace to scan type if ('bind' in entry and entry['bind']) or ( self.enable_auto_tracking and 'auto_track' in entry and entry['auto_track']): entry['model'].bind() def __load_frequency_center(self): # frequency or pulse time manually set in the scan, state that in the debug logs if self.frequency_center is not None: self.logger.debug("frequency_center manually set to {0}".format(self.frequency_center)) if self.pulse_time is not None: self.logger.debug("pulse_time manually set to {0}".format(self.pulse_time)) # the frequency center is auto loaded from the fits by this class... if self.enable_auto_tracking: for entry in self._model_registry: model = entry['model'] if 'auto_track' in entry and entry['auto_track']: # fetch the last fitted frequency and time values from the datasets fitted_freq, fitted_time = self._get_main_fits(model) # hasn't been set yet, ok to auto load if self.frequency_center is None: # set frequency center from saved fit values self.frequency_center = fitted_freq self._logger.debug("auto set frequency_center to {0} from fits".format(fitted_freq)) # hasn't been set yet, ok to auto load if self.pulse_time is None: # set pulse time from saved fit values self.pulse_time = fitted_time self._logger.debug("auto set pulse_time to {0} from fits".format(fitted_time))
[docs] def _get_main_fits(self, model): """Get's the last fitted frequency and time values""" if hasattr(model, 'type'): restore = model.type else: restore = None # get the last fitted frequency model.type = 'frequency' model.bind() freq = model.get_main_fit(archive=False) # get the last fitted time model.type = 'time' model.bind() time = model.get_main_fit(archive=False) # rebind the model to it's original namespace model.type = restore model.bind() return freq, time
[docs] def get_scan_points(self): if self.scan == 'time': return self.times if self.scan == 'frequency': return self.frequencies
[docs] @portable def do_measure(self, point): # time scan if self.scan == 'time': pulse_time = point return self.measure(pulse_time, self.frequency_center) # frequency scan if self.scan == 'frequency': return self.measure(self.pulse_time, point) return 0
[docs] def report(self, location='both'): super().report(location) if location == 'bottom': self.logger.info("Type: %s scan" % self.scan) if self.scan == 'frequency': if self.frequency_center is not None: self.logger.info('Frequency Center: %f MHz' % (self.frequency_center / MHz)) if self.pulse_time is not None: self.logger.info("Pulse Time: %f us" % (self.pulse_time / us)) if self.scan == 'time': if self.frequency_center is not None: self.logger.info('Frequency: %f MHz' % (self.frequency_center / MHz))
[docs]class ReloadingScan(Scan): """Allows detection of lost ion, pausing scan, reloading ion, and resuming scan""" # -- settings enable_reloading = True #: Reload ion when it is lost? # -- scan state _lost_ion = False #: ion has been lost # -- loading defaults loading_threshold = 1.0 loading_pi_threshold = 5 loading_timeout = 120 * s loading_windows = 2 loading_repeats = 100 # -- ion checking & loading measure_background = True perc_of_dark = 90 #: Percentage of dark rate at which background rate is set. perc_of_dark_second = 150 #: Percentage of dark rate at which ion present detection threshold is set. ion_windows = 2 #: Number of lost ion signals seen in data before a detection check is performed ion_threshold = 0.6 #: default value is always overwritten when background is measured ion_second_threshold = 2.0 # default value is always overwritten when background is measured ion_failures = 0 check_for_ion = False lose_ion_at = -1 # ====== Scan Interface Methods ======
[docs] def _scan_arguments(self): if self.enable_reloading: self.setattr_argument("check_for_ion", BooleanValue(default=False), 'Reloading')
#if self.enable_simulations: # self.setattr_argument('lose_ion_at', NumberValue(default=-1, ndecimals=0, step=1), group='Simulation')
[docs] def _map_arguments(self): """Map coarse grained attributes to fine grained options.""" if not self.enable_reloading: self.check_for_ion = False self.measure_background = False if not self.check_for_ion: self.measure_background = False if self.simulate_scan: self.measure_background = False
[docs] def _initialize(self, resume): super()._initialize(resume) self._lose_ion_at = self.lose_ion_at if self.enable_reloading and not hasattr(self, 'loading'): raise Exception( "An instance of the Loading subcomponent needs to be assigned to self.loading to use reloading.") if not hasattr(self, 'loading'): # hack so kernel methods can compile, artiq complains that there is no self.loading variable even though # the code is unreachable self.loading = LoadingInterface(self)
[docs] def _report(self, location='both'): """Print details about the scan""" if location == 'both': self._logger.debug('enable_reloading is {0}'.format(self.enable_reloading))
[docs] def _reset_state(self, resume): self._lost_ion = False self.ion_failures = 0
[docs] @portable def _before_loop(self, resume): if self.simulate_scan: if resume: self._lose_ion_at = -1 if self.measure_background and not resume: # check if ion is present before measuring background if self._check_ion_experiment(repeats=2): # measure the background self.logger.debug('> measuring background.') self._measure_ion_threshold() # no ion is present, can't measure background else: self.logger.warning("Can't measure background because no ion is present.") self._lost_ion = True self._schedule_load_ion() raise Paused else: self.logger.debug('Skipping background measurement.')
[docs] @portable def _analyze_data(self, i_point, last_pass, last_point): # lost ion? if self.check_for_ion: # force a detection check on the very last scan point as data based checks can fail # to signal an ion lost at the end of a scan if last_pass and last_point: force_detection_check = True else: force_detection_check = False if not self._check_ion(i_point, force_detection_check=force_detection_check): # rewind to the scan point where the ion was first lost. self._rewind(num_points=self.ion_windows) # set state self._lost_ion = True # schedule ion reload self.logger.error("Lost ion.") self._schedule_load_ion() # break main loop in scan.py raise Paused
[docs] def _state_string(self): return "lost_ion=%s" % self._lost_ion
# ====== Local Methods ======
[docs] def _schedule_load_ion(self): # try to load or wait if tried too many times # ion loading can't be performed for some reason if not self.loading.can_load(): # schedule a high priority experiment (e.g. ion_monitor) that will pause this scan # until the issue can be fixed. self.logger.warning("Can't load ion, scheduling blocking experiment until issue is fixed.") self.loading.schedule_wait_experiment() self._yield() self._schedule_load_ion() return else: # schedule the load ion experiment self.logger.warning("Scheduling ion reload.") self.loading.schedule_load_ion(due_date=time(), synchronous=True)
[docs] @kernel def _measure_ion_threshold(self): """Measure dark rate and set self._ion_threshold to a percentage of the measured rate""" dark_rate = self.loading.measure_dark_rate() self.ion_threshold = (self.perc_of_dark / 100.0) * dark_rate self.ion_second_threshold = (self.perc_of_dark_second / 100.0) * dark_rate self.logger.debug("dark_rate measured at") self.logger.debug(dark_rate) self.logger.debug("ion_threshold set to") self.logger.debug(self.ion_threshold) self.logger.debug("ion_second_threshold set to") self.logger.debug(self.ion_second_threshold)
[docs] @portable def _check_ion(self, i_point, force_detection_check=False): """Return true if ion is present""" if force_detection_check: do_detection_check = True # -- data based checks: # analyze data and signal for a detection based check after n successive failures else: do_detection_check = False for i_measurement in range(self.nmeasurements): if not self._check_ion_data(i_measurement, i_point): self.ion_failures += 1 if self.ion_failures == self.ion_windows: do_detection_check = True break else: self.ion_failures = 0 # -- detection based check: # check for ion via detection if do_detection_check: self.logger.debug("") self.ion_failures = 0 if self._check_ion_experiment(): present = True else: present = False else: present = True return present
[docs] @portable def _check_ion_data(self, i_measurement, i_point): """Return true if data collected indicates that the ion is still present""" #offset = self._data.address(pos=[i_measurement, i_point]) #offset = #offset = offset + i_pass * self.nrepeats sum_ = 0.0 for i in range(self.nrepeats): sum_ += self._data[self._idx][i_measurement][self.nrepeats*self._i_pass + i] mean = sum_ / self.nrepeats if mean >= self.ion_threshold: present = True else: present = False # lose ion at #if self.simulate_scan and not self._resuming: # if i_pass * self.npoints + i_point >= self._lose_ion_at - 1: # present = False return present
[docs] @portable def _check_ion_experiment(self, repeats=1): """Return true if the loading.ion_present experiment indicates that the ion is still present""" #if self.simulate_scan: # if i_pass * self.npoints + i_point == self._lose_ion_at: # return False # else: # return True self.logger.debug(">>> calling loading.ion_present() with ion threshold set to ") self.logger.debug(self.ion_second_threshold) return self.loading.ion_present(repeats, threshold=self.ion_second_threshold)