Source code for AFL.automation.loading.PressureController

import threading,time

[docs] class PressureController(): ''' Abstract superclass for pressure controllers that provides timed dispensing '''
[docs] def timed_dispense(self,dispense_pressure,dispense_time,block=True): ''' Perform a pressure dispense at pressure `dispense_pressure`, stopping after `dispense_time`. This dispense can be interrupted by calling self.stop(). ''' self.active_callback = threading.Timer(dispense_time,self.stop) self.set_P(dispense_pressure) self.active_callback.start() while not self.active_callback.is_alive(): time.sleep(0.01) if block: self.blockUntilStatusStopped()
[docs] def blockUntilStatusStopped(self,pollingdelay=0.2): ''' block execution until the controller finishes a dispense ''' if self.active_callback is not None: status = self.active_callback.is_alive() else: status = False while status: time.sleep(pollingdelay) status = self.active_callback.is_alive()
[docs] def dispenseRunning(self): ''' Returns true if a timed dispense is running, false otherwise. ''' if self.active_callback is not None: return self.active_callback.is_alive() else: return False
[docs] def ramp_dispense(self,dispense_start_pressure, dispense_stop_pressure, dispense_time,const_time = 0,block=True): ''' Perform a pressure dispense with a linear ramp in pressure between `dispense_start_pressure` and `dispense_stop_pressure`, stopping after `dispense_time`. This dispense can be interrupted by calling self.stop(). If const_time is set, the last `const_time` seconds of the dispense will be at constant pressure, with the ramp occurring in the remaining time. ''' self.start_time = time.time() self.stop_flag = threading.Event() self.active_callback = threading.Thread(target= self._ramp_pressure,args=(dispense_start_pressure,dispense_stop_pressure,dispense_time,const_time,self.stop_flag)) self.set_P(dispense_start_pressure) self.active_callback.start() while not self.active_callback.is_alive(): time.sleep(0.01) if block: self.blockUntilStatusStopped()
def _ramp_pressure(self,dispense_start_pressure,dispense_stop_pressure,dispense_time,const_time,stop_flag): pressure_ramp_rate = (dispense_stop_pressure - dispense_start_pressure) / (dispense_time-const_time) elapsed_time = time.time() - self.start_time while elapsed_time<dispense_time and not stop_flag.is_set(): elapsed_time = time.time() - self.start_time elapsed_time = min(elapsed_time, (dispense_time - const_time)) self.set_P(dispense_start_pressure + pressure_ramp_rate * elapsed_time) time.sleep(0.05) self.stop()
[docs] def stop(self): ''' Abort the current timed dispense action. ''' print(f'Dispense stop was called, callback status {self.dispenseRunning()}') self.set_P(0) try: self.active_callback.cancel() except AttributeError: # this is a ramp thread, not a timer self.stop_flag.set() time.sleep(0.05) self.set_P(0)