Source code for AFL.automation.loading.SensorCallbackThread

import numpy as np
import warnings
import threading
import time
import datetime
import pathlib

            
[docs] class SensorCallbackThread(threading.Thread):
[docs] def __init__(self,poll,period=0.1,daemon=True,filepath=None,data=None,sensorlabel=''): threading.Thread.__init__(self, name='CallbackThread', daemon=daemon) self.app = None self.data = data self.poll = poll self.period = period self.thread_start = datetime.datetime.now() self._stop = False self._lock = threading.Lock() self._signal = [] self.status_str = 'No action yet...' if filepath is not None: self.filepath = pathlib.Path(filepath) self.filepath.mkdir(parents=True, exist_ok=True) self.sensorlabel = sensorlabel
[docs] def update_status(self,value): self.status_str = value if self.app is not None: self.app.logger.info(value) else: print(value)
[docs] def terminate(self): self._stop = True
[docs] def process_signal(self,signal): pass
[docs] def run(self): print('Starting runloop for CallbackThread:') while not self._stop: dt = datetime.datetime.now()-self.thread_start # print(f'Running for {dt.seconds:010d} seconds',end='\r') self.process_signal() time.sleep(self.period)
[docs] class StopLoadCBv1(SensorCallbackThread):
[docs] def __init__( self, poll, period, load_client, threshold_npts = 20, threshold_v_step = 1, threshold_std = 2.5, timeout = 120, loadstop_cooldown = 2, post_detection_sleep = 0.2, baseline_duration = 2, daemon=True, filepath=None, data=None, sensorlabel='', ): super().__init__(poll=poll,period=period,daemon=daemon,filepath=filepath,data=data,sensorlabel=sensorlabel) self.load_client = load_client self.threshold_npts = threshold_npts self.threshold_v_step = threshold_v_step self.threshold_std = threshold_std self.loadstop_cooldown = loadstop_cooldown self.post_detection_sleep = post_detection_sleep self.timeout = datetime.timedelta(seconds=timeout) self.baseline_duration = baseline_duration
[docs] def process_signal(self): if 'PROGRESS' in getServerState(self.load_client): datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S') self.update_status(f'[{datestr}] Detected a load...') start = datetime.datetime.now() self.update_status(f'Taking baseline data for {self.baseline_duration} s.') time.sleep(self.baseline_duration) signal = self.poll.read() baseline_val = np.mean(signal[-self.threshold_npts:]) self.update_status(f'Found baseline at {baseline_val}') while True and (not self._stop): signal = self.poll.read() voltage_not_changed = np.abs(np.mean(signal[-self.threshold_npts:])-baseline_val) < self.threshold_v_step signal_unstable = np.std(signal[-self.threshold_npts:]) > self.threshold_std not_timed_out = datetime.datetime.now()-start < self.timeout if (voltage_not_changed or signal_unstable) and not_timed_out: time.sleep(self.period) else: datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S') if not_timed_out: self.update_status(f'[{datestr}] Load stopped at voltage mean = {np.mean(signal[-self.threshold_npts:])} and stdev = {np.std(signal[-self.threshold_npts:])}') else: self.update_status(f'[{datestr}] Load timed out') self.update_status(f'Elapsed time: {datetime.datetime.now()-start}') elapsed_time = datetime.datetime.now()-start time_to_sleep = (self.post_detection_sleep)*elapsed_time time.sleep(time_to_sleep.total_seconds()) # was self.post_detection_sleep) print(f'waited for {time_to_sleep.total_seconds()} based on elapsed time of {elapsed_time.total_seconds()} and ratio of {self.post_detection_sleep*100} %') self.load_client.server_cmd(cmd='stopLoad',secret='xrays>neutrons') filename = self.filepath/str('Sensor-'+datestr+'.txt') self.update_status(f'Saving signal data to {filename}') np.savetxt(filename,signal) if self.data is not None: self.data[f'{self.sensorlabel}_load_stop_trace'] = signal self.data[f'{self.sensorlabel}_final_voltage'] = np.mean(signal[-self.threshold_npts:]) time.sleep(self.loadstop_cooldown) break
[docs] class StopLoadCBv2(SensorCallbackThread):
[docs] def __init__( self, poll, period, load_client=None, load_object=None, threshold_npts = 20, threshold_v_step = 1, threshold_std = 2.5, timeout = 120, min_load_time=3, loadstop_cooldown = 2, post_detection_sleep = 0.2, baseline_duration = 2, trigger_on_end = False, instatrigger = True, daemon=True, filepath=None, data=None, sensorlabel='', ): super().__init__(poll=poll,period=period,daemon=daemon,filepath=filepath,data=data,sensorlabel=sensorlabel) self.loader_comm = LoaderCommunication(load_client=load_client,load_object=load_object) self.threshold_npts = threshold_npts self.threshold_v_step = threshold_v_step self.threshold_std = threshold_std self.loadstop_cooldown = loadstop_cooldown self.post_detection_sleep = post_detection_sleep self.min_load_time = datetime.timedelta(seconds=min_load_time) self.timeout = datetime.timedelta(seconds=timeout) self.baseline_duration = baseline_duration self.trigger_on_end = trigger_on_end self.instatrigger = instatrigger print(f'StopLoad thread starting with data = {self.data} and sensor label = {self.sensorlabel}')
[docs] def process_signal(self): if ('PROGRESS' in self.loader_comm.getServerState()) and (self.sensorlabel in self.loader_comm.getServerState()): #make sure this sensor is queued for this load datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S') self.update_status(f'[{datestr}] Detected a load...') start = datetime.datetime.now() self.poll.reset_load_buffer() self.update_status(f'Taking baseline data for {self.baseline_duration} s.') time.sleep(self.baseline_duration) signal = np.array(self.poll.read_load_buffer()) baseline_val = np.mean(signal[-self.threshold_npts:,1])#column 0 is microseconds since beginning of load self.update_status(f'Found baseline at {baseline_val}') if self.data is not None: self.data[f'{self.sensorlabel}_stopper_baseline_voltage'] = baseline_val while True and (not self._stop): signal = np.array(self.poll.read_load_buffer()) small_v_step = np.abs(np.mean(signal[-self.threshold_npts:,1])-baseline_val) < self.threshold_v_step large_std = np.std(signal[-self.threshold_npts:,1]) > self.threshold_std time_since_load_start = datetime.datetime.now()-start timed_out = time_since_load_start > self.timeout too_soon = time_since_load_start < self.min_load_time if too_soon: time.sleep(self.period) elif (small_v_step or large_std) and (not timed_out): time.sleep(self.period) else: datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S') self.update_status(f'Elapsed time: {datetime.datetime.now()-start}') if not timed_out: self.update_status(f'[{datestr}] Load stopped at voltage mean = {np.mean(signal[-self.threshold_npts:,1])} and stdev = {np.std(signal[-self.threshold_npts:,1])}') else: self.update_status(f'[{datestr}] Load timed out') elapsed_time = datetime.datetime.now()-start if self.data is not None: self.data[f'{self.sensorlabel}_elapsed_time_at_first_trigger'] = elapsed_time.total_seconds() self.data[f'{self.sensorlabel}_first_trigger_voltage'] = np.mean(signal[-self.threshold_npts:,1]) self.data[f'{self.sensorlabel}first_trigger_std'] = np.std(signal[-self.threshold_npts:,1]) if self.trigger_on_end: self.update_status(f'[{datestr}] Awaiting stabilized return to within {self.threshold_v_step} V of baseline voltage of {baseline_val} V') second_trigger_start = datetime.datetime.now() while not self._stop: time_since_second_trigger = datetime.datetime.now() - second_trigger_start timed_out = time_since_second_trigger > self.timeout mean_not_normal = np.abs(np.mean(signal[-self.threshold_npts:,1])-baseline_val) > 3*self.threshold_v_step large_std = False #np.std(signal[-self.threshold_npts:,1]) > self.threshold_std if (mean_not_normal or large_std) and (not timed_out): time.sleep(self.period/10) else: datestr = datetime.datetime.strftime(datetime.datetime.now(),'%y%m%d-%H:%M:%S') self.update_status(f'[{datestr}] End of plug triggered at voltage mean {np.mean(signal[-self.threshold_npts:,1])} and stdev = {np.std(signal[-self.threshold_npts:,1])}') if self.data is not None: self.data[f'{self.sensorlabel}_elapsed_time_at_second_trigger'] = (datetime.datetime.now()-start).total_seconds() self.data[f'{self.sensorlabel}_second_trigger_voltage'] = np.mean(signal[-self.threshold_npts:,1]) self.data[f'{self.sensorlabel}_second_trigger_std'] = np.std(signal[-self.threshold_npts:,1]) break elif self.instatrigger: pass else: time_to_sleep = (self.post_detection_sleep)*elapsed_time time.sleep(time_to_sleep.total_seconds()) # was self.post_detection_sleep) print(f'waited for {time_to_sleep.total_seconds()} based on elapsed time of {elapsed_time.total_seconds()} and ratio of {self.post_detection_sleep} %') self.loader_comm.stopLoad() try: filename = str(self.filepath/str('Sensor-'+datestr+'.txt')) # self.update_status(f'Saving signal data to {filename}') np.savetxt(filename,signal) except Exception as e: print(f"Could not write load trace to {filename}, {e}") if self.data is not None: self.data[f'{self.sensorlabel}_load_stop_trace'] = signal self.data[f'{self.sensorlabel}_final_voltage'] = np.mean(signal[-self.threshold_npts:,1]) self.data[f'{self.sensorlabel}_final_std'] = np.std(signal[-self.threshold_npts:,1]) time.sleep(self.loadstop_cooldown) break
[docs] class SimpleThresholdCB(SensorCallbackThread):
[docs] def __init__(self,poll,period,window=5,threshold=1): super().__init__(poll=poll,period=period) self.window = window self.threshold = threshold
[docs] def process_signal(self): signal = self.poll.read() mean = np.mean(signal[-window:]) if mean>threshold: print('Above threshold!') else: print(f'mean={mean}')
[docs] class LoaderCommunication():
[docs] def __init__(self,load_client=None,load_object=None): if (load_client is None) and (load_object is None): raise ValueError('Need to specify load_client or load_object!!') elif (load_client is not None) and (load_object is not None): warnings.warn('Both load_client and load_object were specified! Using load_object...') self.load_object = load_object self.load_client = None elif load_client is None: self.load_object = load_object self.load_client = None else: self.load_object = None self.load_client = load_client
[docs] def getServerState(self): if self.load_client is None: status = self.load_object.status() else: status = self.load_client.driver_status() for entry in status: if 'State: ' in entry: return entry.replace('State: ','')
[docs] def stopLoad(self): if self.load_client is None: self.load_object.stopLoad(secret='xrays>neutrons') else: self.load_client.server_cmd(cmd='stopLoad?secret=xrays>neutrons')