from AFL.automation.loading.SampleCell import SampleCell
from AFL.automation.APIServer.Driver import Driver
from collections import defaultdict
import warnings
import time
import numpy as np
import math
[docs]
class PneumaticSampleCell(Driver,SampleCell):
'''
Class for a sample cell consisting of a push-through, pneumatically-closed sample loader.
Driven by a syringe pump.
'''
defaults={}
defaults['load_speed'] = 2
defaults['air_speed'] = 30
defaults['withdraw_vol'] = 1.5
defaults['large_dispense_vol'] = 5
defaults['arm_move_delay'] = 0.2
defaults['vent_delay'] = 0.5
defaults['rinse_program'] = [
('rinse1',5),
(None,2),
('rinse2',5),
('blow',5),
(None,0.5),
('blow',5)
]
defaults['catch_to_cell_vol'] = 1.15
defaults['external_load_complete_trigger'] = False
[docs]
def __init__(self,pump,
relayboard,
digitalin=None,
rinse1_tank_level=950,
rinse2_tank_level=950,
waste_tank_level=0,
load_stopper=None,
overrides=None,
):
'''
pump: a pump object supporting withdraw() and dispense() methods
e.g. pump = NE1KSyringePump(port,syringe_id_mm,syringe_volume)
relayboard: a relay board object supporting string-based setChannels() method
required channels are 'arm-up','arm-down',
'rinse1','rinse2','blow','enable','piston-vent','postsample'
e.g. selector = SainSmartRelay(port,portlabels={'catch':1,'cell':2,'rinse':3,'waste':4,'air':5})
'''
self._app = None
Driver.__init__(self,name='PneumaticSampleCell',defaults=self.gather_defaults(),overrides=overrides)
self.pump = pump
self.relayboard = relayboard
self.cell_state = defaultdict(lambda: 'clean')
self.digitalin = digitalin
self.rinse1_tank_level = rinse1_tank_level
self.waste_tank_level = waste_tank_level
self.rinse2_tank_level = rinse2_tank_level
self.loadStoppedExternally = False
self.state = 'FRESH'
if 'enable' in self.relayboard.labels.keys():
self.relayboard.setChannels({'enable':True})
self._USE_ARM_LIMITS = False
self._USE_DOOR_INTERLOCK = False
if self.digitalin is not None:
if 'ARM_UP' in self.digitalin.state.keys() and 'ARM_DOWN' in self.digitalin.state.keys():
self._USE_ARM_LIMITS =True
if 'DOOR' in self.digitalin.state.keys():
self._USE_DOOR_INTERLOCK = True
self.relayboard.setChannels({'piston-vent':True})
self._arm_up()
time.sleep(0.2)
self.pump.setRate(self.config['air_speed'])
# self.pump.dispense(self.config['large_dispense_vol'])
# self.pump.withdraw(self.config['withdraw_vol'])
self.reset_pump(dispense=False)
self.state = 'READY'
self.rinse_status = 'Not Rinsing'
if load_stopper is not None:
self.load_stopper=load_stopper
# remove any load clients and push this object in for direct control
self.load_stopper.load_client = None
self.load_stopper.load_object = self
self.load_stopper.reset()# initialize and start load stopping threads
else:
self.load_stopper = None
[docs]
@Driver.quickbar(qb={'button_text':'Reset Pump',
'params':{
'dispense':{'label':'dispense','type':'bool','default':False}
}})
def reset_pump(self,dispense=False):
self.pump.setRate(self.config['air_speed'])
if dispense and (self.pump_level>0):
self.pump.dispense(self.pump_level)
self.pump.withdraw(self.config['withdraw_vol'])
self.pump_level = self.config['withdraw_vol']
[docs]
@Driver.quickbar(qb={'button_text':'Reset Tank Levels',
'params':{
'rinse1':{'label':'Rinse1 (mL)','type':'float','default':950},
'rinse2':{'label':'Rinse2 (mL)','type':'float','default':950},
'waste':{'label':'Waste (mL)','type':'float','default':0}
}})
def reset_tank_levels(self,rinse1=950,rinse2=950,waste=0):
self.rinse1_tank_level = rinse1
self.waste_tank_level = waste
self.rinse2_tank_level = rinse2
@property
def app(self):
return self._app
@app.setter
def app(self,app):
if app is None:
self._app = app
else:
self._app = app
self.pump.app = app
self.relayboard.app = app
[docs]
def status(self):
status = []
status.append(f'State: {self.state}')
status.append(f'Arm State: {self.arm_state}')
status.append(f'Rinse 1 tank: {self.rinse1_tank_level} mL')
status.append(f'Rinse 2 tank: {self.rinse2_tank_level} mL')
status.append(f'Waste tank: {self.waste_tank_level} mL')
status.append(f'Relay status: {self.relayboard.getChannels()}')
if self._USE_ARM_LIMITS:
status.append(f"Arm Up Limit: {not self.digitalin.state['ARM_UP']} / Arm Down Limit{not self.digitalin.state['ARM_DOWN']}")
if self._USE_DOOR_INTERLOCK:
status.append(f"Door closed: {not self.digitalin.state['DOOR']}")
if self.digitalin is not None:
status.append(f'DIO state: {self.digitalin.state}')
status.append(f'Pump Level: {self.pump_level} mL')
status.append(self.rinse_status)
if self.load_stopper is not None:
status.extend(self.load_stopper.status())
return status
def _arm_interlock_check(self):
if self._USE_DOOR_INTERLOCK:
oldstate = self.state
while self.digitalin.state['DOOR']:
time.sleep(0.2)
self.state = 'AWAITING DOOR CLOSED BEFORE MOVING ARM'
self.state = oldstate
def _arm_up(self):
self._arm_interlock_check()
self.relayboard.setChannels({'piston-vent':True,'arm-up':True,'arm-down':False})
if self._USE_ARM_LIMITS:
while self.digitalin.state['ARM_UP']:
time.sleep(0.1)
else:
time.sleep(self.config['arm_move_delay'])
self.arm_state = 'UP'
def _arm_down(self):
self._arm_interlock_check()
self.relayboard.setChannels({'piston-vent':True,'arm-up':False,'arm-down':True})
time.sleep(self.config['arm_move_delay'])
if self._USE_ARM_LIMITS:
while self.digitalin.state['ARM_DOWN']:
time.sleep(0.1)
else:
time.sleep(self.config['arm_move_delay'])
self.arm_state = 'DOWN'
[docs]
@Driver.quickbar(qb={'button_text':'Load Sample',
'params':{'sampleVolume':{'label':'Sample Volume (mL)','type':'float','default':0.3}}})
def loadSample(self,cellname='cell',sampleVolume=0):
if self.state != 'READY':
raise Exception('Tried to load sample but cell not READY.')
self.state = 'PREPARING TO LOAD'
self.relayboard.setChannels({'piston-vent':True,'postsample':False})
self._arm_down()
time.sleep(self.config['vent_delay'])
self.relayboard.setChannels({'piston-vent':False,'postsample':True})
print('setting pump rate...')
self.pump.setRate(self.config['load_speed'])
print('setting state...')
self.state = 'LOAD IN PROGRESS'
print('sending dispense command')
self.pump.dispense(self.config['catch_to_cell_vol']+sampleVolume/2,block=False)
while(self.pump.getStatus()[0] != 'S' and not self.loadStoppedExternally):
#print(f'awaiting pump complete, {self.pump.getStatus()}')
time.sleep(0.1)
infusion_vol = self.pump.getStatus()[1]
self.pump_level -= infusion_vol
if self.pump_level<0:
raise Exception(f'Pump level found to be less than zero: {self.pump_level}')
self.loadStoppedExternally = False
self.relayboard.setChannels({'postsample':False})
self.state = 'LOADED'
[docs]
@Driver.unqueued(render_hint='raw')
def stopLoad(self,**kwargs):
print(kwargs)
try:
if kwargs['secret'] == 'xrays>neutrons':
if self.state!= 'LOAD IN PROGRESS':
warnings.warn('Tried to stop load but load is not in progress. Doing nothing.',stacklevel=2)
return 'There is no load running.'
else:
self.pump.stop()
self.relayboard.setChannels({'postsample':False})
self.loadStoppedExternally=True
if self.data is not None:
self.data['load_stop_source'] = 'external'
return 'Load stopped successfully.'
else:
return 'Wrong secret.'
except KeyError:
return 'Need valid secret to stop load.'
[docs]
@Driver.quickbar(qb={'button_text':'Rinse Cell'})
def rinseCell(self,cellname='cell'):
if self.state != 'LOADED':
if self.state == 'READY':
warnings.warn('Rinsing despite READY state. This is OK, just a little extra. Lowering the arm to rinse.',stacklevel=2)
self._arm_down()
else:
raise Exception(f'Cell in inconsistent state: {self.state}')
self.relayboard.setChannels({'piston-vent':False,'postsample':True})
# self.pump.setRate(self.config['air_speed'])
# self.pump.dispense(self.config['large_dispense_vol'],block=False) # removed for OEM pump w/o force limiter
self.rinse_status = 'Pushing with syringe...'
#need to dispense with piston down, and then withdraw with piston up
self.pump.setRate(self.config['air_speed'])
if self.pump_level>0:
self.pump.dispense(self.pump_level)
for i,(step,waittime) in enumerate(self.config['rinse_program']):
self.rinse_status = f'Rinse Program Step {i}/{len(self.config["rinse_program"])}: {step} for {waittime}s'
if step is not None:
self.relayboard.setChannels({step:True})
time.sleep(waittime)
if step is not None:
self.relayboard.setChannels({step:False})
self.relayboard.setChannels({'postsample':False})
self._arm_up()
self.state = 'READY'
self.rinse_status = 'Not Rinsing'
# self.pump.withdraw(self.config['withdraw_vol'],block=True)
self.reset_pump(dispense=False)
[docs]
def rinseAll(self):
self.rinseCell()
[docs]
def setRinseLevel(self,vol):
self.rinse_tank_level = vol
[docs]
def setWasteLevel(self,vol):
self.waste_tank_level = vol
[docs]
@Driver.quickbar(qb={'button_text':'Prime Rinse'})
def primeRinse(self,waittime=10):
if self.state != 'READY':
raise Exception(f'Cell in inconsistent state: {self.state}')
self._arm_up()
self.relayboard.setChannels({'rinse1':True,'rinse2':False})
time.sleep(waittime)
self.relayboard.setChannels({'rinse1':False,'rinse2':True})
time.sleep(waittime)
self.relayboard.setChannels({'rinse1':False,'rinse2':False})
[docs]
@Driver.unqueued()
@Driver.quickbar(qb={'button_text':'calibrate', 'params':{}})
def calibrate_sensor(self):
if self.load_stopper is not None:
return self.load_stopper.sensor.calibrate()
[docs]
@Driver.unqueued()
def read_sensor(self):
if self.load_stopper is not None:
return self.load_stopper.sensor.read()
[docs]
@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
def read_sensor_poll(self,**kwargs):
if self.load_stopper is not None:
output = np.transpose(self.load_stopper.poll.read())
print('Serving sensor poll:',len(output),len(output[0]),len(output[1]))
return list(output)
[docs]
@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
def read_sensor_poll_load(self,**kwargs):
if self.load_stopper is not None:
output = np.transpose(self.load_stopper.poll.read_load_buffer())
return list(output)
[docs]
def set_sensor_config(self,**kwargs):
if self.load_stopper is not None:
self.load_stopper.config.update(kwargs)
self.load_stopper.reset()
[docs]
def get_sensor_config(self,**kwargs):
if self.load_stopper is not None:
return self.load_stopper.config.config
[docs]
@Driver.unqueued()
@Driver.quickbar(qb={'button_text':'Reset Sensor', 'params':{}})
def sensor_reset(self):
if self.load_stopper is not None:
self.load_stopper.reset_poll()
self.load_stopper.reset_stopper()
if self.load_stopper._app is not None:
self.load_stopper.poll.app = self._app
self.load_stopper.stopper.app = self._app
self.load_stopper.poll.start()
self.load_stopper.stopper.start()