Source code for AFL.automation.loading.PressureControllerAsPump
from AFL.automation.loading.SyringePump import SyringePump
from AFL.automation.loading.SerialDevice import SerialDevice
import time
import threading
import warnings
[docs]
class PressureControllerAsPump(SyringePump):
[docs]
def __init__(self,pressure_controller,dispense_pressure=3.5,implied_flow_rate = 50):
'''
Initialize a pressure controller as a syringe pump.
pressure_controller: controller to connect to
dispense_pressure: pressure at which dispense should run
implied_flow_rate: flow rate which should be used to convert to dispense times, mL/min
'''
self.app = None
self.name = 'PressureControllerAsPump'
self.controller = pressure_controller
self.dispense_pressure = dispense_pressure
self.implied_flow_rate = implied_flow_rate
self.active_callback = None
[docs]
def stop(self):
'''
Abort the current dispense/withdraw action. Equivalent to pressing stop button on panel.
'''
print(f'Pump stop was called, callback status {self.controller.dispenseRunning()}')
self.controller.stop()
def __del__(self):
self.stop()
[docs]
def withdraw(self,volume,block=True,delay=True):
warnings.warn('dispense only for Pressure controllers at this time')
#raise NotImplementedError('dispense only for Pressure controllers at this time')
[docs]
def dispense(self,volume,block=True,delay=True):
if self.app is not None:
rate = self.getRate()
self.app.logger.debug(f'Dispensing {volume}mL at {rate} mL/min')
dispense_time = volume / self.implied_flow_rate
dispense_time = dispense_time * 60 # convert from min to s as flow rate is in mL/min
self.controller.timed_dispense(self.dispense_pressure,dispense_time,block=block)
[docs]
def setRate(self,rate):
if self.app is not None:
self.app.logger.debug(f'Setting pump rate to {rate} mL/min')
self.implied_flow_rate = rate
if self.getRate()!=rate:
raise ValueError('Pump rate change failed')
[docs]
def getRate(self):
return self.implied_flow_rate
[docs]
def emptySyringe(self):
raise NotImplementedError('this makes no sense for a pressure controller')
[docs]
def blockUntilStatusStopped(self,pollingdelay=0.2):
self.controller.blockUntilStatusStopped()
[docs]
def getStatus(self):
'''
query the pump status and return a tuple of the status character,
infused volume, and withdrawn volume)
'''
if self.controller.active_callback.is_alive():
return ('disp',0,0)
else:
return ('S',0,0)