Source code for AFL.automation.loading.LoadStopperDriver

from AFL.automation.APIServer.Driver import Driver
from AFL.automation.APIServer.Client import Client
from AFL.automation.loading.SensorPollingThread import SensorPollingThread
from AFL.automation.loading.SensorCallbackThread import StopLoadCBv1
from AFL.automation.loading.SensorCallbackThread import StopLoadCBv2
import warnings
import time
import pathlib
import numpy as np

import math

[docs] class LoadStopperDriver(Driver): ''' Driver for stopping loads ''' defaults={} defaults['load_speed'] = 2 defaults['period'] = 0.05 defaults['poll_window'] = 1000 defaults['stopper_threshold_npts'] = 50 defaults['stopper_threshold_v_step'] = 1 defaults['stopper_threshold_std'] = 3.0 defaults['stopper_min_load_time'] = 3 defaults['stopper_timeout'] = 120 defaults['stopper_loadstop_cooldown'] = 2 defaults['stopper_post_detection_sleep'] = 1 defaults['stopper_baseline_duration'] = 2 defaults['stopper_filepath'] = str(pathlib.Path.home()/'.afl/loadstopper_data/') defaults['sensorlabel'] = ''
[docs] def __init__(self,sensor,load_client=None,load_object=None,auto_initialize=True,overrides=None,data=None,sensorlabel='',name='LoadStopperDriver'): self._app = None Driver.__init__(self,name=name,defaults=self.gather_defaults(),overrides=overrides) if self.data is None: self.data = data print(f'LoadStopperDriver started with data = {self.data}') self.load_object = load_object self.load_client = load_client self.sensor = sensor self.sensorlabel = sensorlabel self.poll = None self.stopper = None if auto_initialize: self.reset()
[docs] def status(self): status = [] status.append(f'{self.stopper.status_str}') return status
@property def app(self): return self._app @app.setter def app(self,value): if value is None: self._app = value else: self._app = value if self.poll is not None: self.poll.app = value if self.stopper is not None: self.stopper.app = value #@Driver.unqueued() #@Driver.quickbar(qb={'button_text':'calibrate', 'params':{}})
[docs] def calibrate_sensor(self): return self.sensor.calibrate()
#@Driver.unqueued()
[docs] def read_sensor(self): return self.sensor.read()
#@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
[docs] def read_poll(self): output = np.transpose(self.poll.read()) return list(output)
#@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
[docs] def read_poll_load(self): output = np.transpose(self.poll.read_load_buffer()) return list(output)
#@Driver.unqueued() #@Driver.quickbar(qb={'button_text':'reset', 'params':{}})
[docs] def reset(self): self.reset_poll() self.reset_stopper() if self._app is not None: self.poll.app = self._app self.stopper.app = self._app self.poll.start() self.stopper.start()
[docs] def reset_poll(self): if self.poll is not None: self.poll.terminate() self.poll = SensorPollingThread(self.sensor,period=self.config['period'],window=self.config['poll_window'],daemon=True,data=self.data)
[docs] def reset_stopper(self): if self.stopper is not None: self.stopper.terminate() if self.poll is None: self.reset_poll() # self.stopper = StopLoadCBv1( # self.poll, # period=self.config['period'], # load_client=self.load_client, # threshold_npts = self.config['stopper_threshold_npts'], # threshold_v_step = self.config['stopper_threshold_v_step'], # threshold_std = self.config['stopper_threshold_std'], # timeout = self.config['stopper_timeout'], # loadstop_cooldown = self.config['stopper_loadstop_cooldown'], # post_detection_sleep = self.config['stopper_post_detection_sleep'] , # baseline_duration = self.config['stopper_baseline_duration'], # filepath=self.config['stopper_filepath'], # daemon=True, # sensorlabel=self.sensorlabel, # ) self.stopper = StopLoadCBv2( self.poll, period=self.config['period'], load_client=self.load_client, load_object=self.load_object, threshold_npts = self.config['stopper_threshold_npts'], threshold_v_step = self.config['stopper_threshold_v_step'], threshold_std = self.config['stopper_threshold_std'], min_load_time = self.config['stopper_min_load_time'], timeout = self.config['stopper_timeout'], loadstop_cooldown = self.config['stopper_loadstop_cooldown'], post_detection_sleep = self.config['stopper_post_detection_sleep'] , baseline_duration = self.config['stopper_baseline_duration'], filepath=self.config['stopper_filepath'], daemon=True, data = self.data, sensorlabel=self.sensorlabel, )