import requests
from AFL.automation.loading.SampleCell import SampleCell
from AFL.automation.APIServer.Driver import Driver
from collections import defaultdict
import warnings
import time
import numpy as np
import math
[docs]
class PneumaticPressureSampleCell(Driver,SampleCell):
'''
Class for a sample cell consisting of a push-through, pneumatically-closed sample loader.
Driven by a variable-pressure regulator.
'''
defaults={}
defaults['load_mode'] = 'static'
defaults['load_pressure'] = 2
defaults['blowout_pressure'] = 20
defaults['load_timeout'] = 60
defaults['arm_move_delay'] = 0.2
defaults['vent_delay'] = 0.5
defaults['rinse_program'] = [
('rinse1',5),
(None,2),
('rinse2',5),
('blow',5),
(None,0.5),
('blow',5)
]
defaults['external_load_complete_trigger'] = False
defaults['ramp_load_stop_pressure'] = 7
defaults['ramp_load_duration'] = 20
[docs]
def __init__(self,pctrl,
relayboard,
digitalin=None,
rinse1_tank_level=950,
rinse2_tank_level=950,
waste_tank_level=0,
load_stopper=None,
robot_interlock_host=None,
overrides=None,
):
'''
pctrl: a pressurecontroller object supporting the set_P method() and the optional p_ramp() method.
e.g. pctrl = DigitalOutPressureController(LabJackDigitalOut(...))
relayboard: a relay board object supporting string-based setChannels() method
required channels are 'arm-up','arm-down',
'rinse1','rinse2','blow','enable','piston-vent','postsample'
e.g. selector = SainSmartRelay(port,portlabels={'catch':1,'cell':2,'rinse':3,'waste':4,'air':5})
'''
self._app = None
Driver.__init__(self,name='PneumaticSampleCell',defaults=self.gather_defaults(),overrides=overrides)
self.pctrl = pctrl
self.relayboard = relayboard
self.cell_state = defaultdict(lambda: 'clean')
self.digitalin = digitalin
self.rinse1_tank_level = rinse1_tank_level
self.waste_tank_level = waste_tank_level
self.rinse2_tank_level = rinse2_tank_level
self.loadStoppedExternally = False
self.state = 'FRESH'
if 'enable' in self.relayboard.labels.values():
self.relayboard.setChannels({'enable':True})
if robot_interlock_host:
self.robot_interlock_url = f'http://{robot_interlock_host}:31950/robot/door/status'
else:
self.robot_interlock_url = None
self._USE_ARM_LIMITS = False
self._USE_DOOR_INTERLOCK = False
if self.digitalin is not None:
if 'ARM_UP' in self.digitalin.state.keys() and 'ARM_DOWN' in self.digitalin.state.keys():
self._USE_ARM_LIMITS =True
if 'DOOR' in self.digitalin.state.keys():
self._USE_DOOR_INTERLOCK = True
if self.robot_interlock_url:
self._USE_DOOR_INTERLOCK = True
self.relayboard.setChannels({'piston-vent':True})
self._arm_up()
time.sleep(0.2)
self.state = 'READY'
self.rinse_status = 'Not Rinsing'
if load_stopper is not None:
if type(load_stopper) is not list:
load_stopper = [load_stopper]
self.load_stopper=load_stopper
# remove any load clients and push this object in for direct control
for ls in load_stopper:
ls.load_client = None
ls.load_object = self
ls.reset() # initialize and start load stopping threads
else:
self.load_stopper = None
[docs]
@Driver.quickbar(qb={'button_text':'Reset Tank Levels',
'params':{
'rinse1':{'label':'Rinse1 (mL)','type':'float','default':950},
'rinse2':{'label':'Rinse2 (mL)','type':'float','default':950},
'waste':{'label':'Waste (mL)','type':'float','default':0}
}})
def reset_tank_levels(self,rinse1=950,rinse2=950,waste=0):
self.rinse1_tank_level = rinse1
self.waste_tank_level = waste
self.rinse2_tank_level = rinse2
@property
def app(self):
return self._app
@app.setter
def app(self,app):
if app is None:
self._app = app
else:
self._app = app
self.pctrl.app = app
self.relayboard.app = app
for ls in self.load_stopper:
ls.app = app
@property
def data(self):
return self._data
@data.setter
def data(self,data):
if data is None:
self._data = data
else:
self._data = data
self.pctrl.data = data
self.relayboard.data = data
for ls in self.load_stopper:
ls.data = data
[docs]
def status(self):
status = []
status.append(f'State: {self.state}')
status.append(f'Arm State: {self.arm_state}')
status.append(f'Rinse 1 tank: {self.rinse1_tank_level} mL')
status.append(f'Rinse 2 tank: {self.rinse2_tank_level} mL')
status.append(f'Waste tank: {self.waste_tank_level} mL')
status.append(f'Relay status: {self.relayboard.getChannels()}')
if self._USE_ARM_LIMITS:
status.append(f"Arm Up Limit: {self.digitalin.state['ARM_UP']} / Arm Down Limit{self.digitalin.state['ARM_DOWN']}")
if self._USE_DOOR_INTERLOCK:
status.append(f"Door closed: {not self._door_state()}")
if self.digitalin is not None:
status.append(f'DIO state: {self.digitalin.state}')
status.append(self.rinse_status)
if self.load_stopper is not None:
for ls in self.load_stopper:
status.extend(ls.status())
return status
def _arm_interlock_check(self):
if self._USE_DOOR_INTERLOCK:
oldstate = self.state
while self._door_state():
time.sleep(0.2)
self.state = 'AWAITING DOOR CLOSED BEFORE MOVING ARM'
self.state = oldstate
def _door_state(self):
if self.digitalin is not None:
if 'DOOR' in self.digitalin.state.keys():
return not self.digitalin.state['DOOR']
try:
state = requests.get(self.robot_interlock_url,headers = {
'Opentrons-Version': '2'}).json()['data']['status']
except Exception:
return True
if state == 'open':
return True
elif state == 'closed':
return False
else:
raise ValueError('could not get robot door status')
def _arm_up(self):
self._arm_interlock_check()
self.relayboard.setChannels({'piston-vent':True,'arm-up':True,'arm-down':False})
if self._USE_ARM_LIMITS:
while self.digitalin.state['ARM_UP']:
time.sleep(0.1)
else:
time.sleep(self.config['arm_move_delay'])
self.arm_state = 'UP'
def _arm_down(self):
self._arm_interlock_check()
self.relayboard.setChannels({'piston-vent':True,'arm-up':False,'arm-down':True})
time.sleep(self.config['arm_move_delay'])
if self._USE_ARM_LIMITS:
while self.digitalin.state['ARM_DOWN']:
time.sleep(0.1)
else:
time.sleep(self.config['arm_move_delay'])
self.arm_state = 'DOWN'
[docs]
@Driver.quickbar(qb={'button_text':'Load Sample',
'params':{'sampleVolume':{'label':'Sample Volume (mL)','type':'float','default':0.3}}})
def loadSample(self,cellname='cell',sampleVolume=None,load_dest_label=''):
'''
Load a sample into the cell
Params `cellname` and `sampleVolume` are kept for backward compat, but are deprecated and unused.
'''
if self.state != 'READY':
raise Exception('Tried to load sample but cell not READY.')
self.state = 'PREPARING TO LOAD'
self.relayboard.setChannels({'piston-vent':True,'postsample':False})
self._arm_down()
time.sleep(self.config['vent_delay'])
self.relayboard.setChannels({'piston-vent':False,'postsample':True})
print('setting state...')
self.loadStoppedExternally = False
if load_dest_label == '':
self.state = 'LOAD IN PROGRESS'
else:
self.state = f'LOAD IN PROGRESS to {load_dest_label}'
print('sending dispense command')
if self.config['load_mode'] == 'static':
self.pctrl.timed_dispense(self.config['load_pressure'],self.config['load_timeout'],block=False)
elif self.config['load_mode'] == 'ramp':
self.pctrl.ramp_dispense(self.config['load_pressure'],self.config['ramp_load_stop_pressure'],self.config['load_timeout'],const_time = self.config['load_timeout']-self.config['ramp_load_duration'])
else:
raise ValueError('invalid load_mode in config. cannot load. valid values are "static" or "ramp"')
while(self.pctrl.dispenseRunning() and not self.loadStoppedExternally):
time.sleep(0.02)
self.loadStoppedExternally = False
self.relayboard.setChannels({'postsample':False})
self.state = 'LOADED'
time.sleep(1) # crude hack to allow sensor to push data into packet
[docs]
@Driver.quickbar(qb={'button_text':'Advance Sample',
'params':{'sampleVolume':{'label':'Sample Volume (mL)','type':'float','default':0.3}}})
def advanceSample(self,load_dest_label=''):
'''
Move a sample from one measurement cell to the next
Params:
load_dest_label (str, default ''): a 'destination label' for this load.
labeled sensors will only stop the load if their name is in this destination label.
example:
sensor 1 labeled 'afterSANS'
sensor 2 labeled 'beforeSPEC'
sensor 3 labeled '' (default)
advanceSample(load_dest_label='afterSANS') --> sensor 1 or sensor 3 can stop it
advanceSample(load_dest_label='beforeSPEC afterSANS') --> sensor 1, sensor 2, or sensor 3 can stop it
advanceSample(load_dest_label='') --> only sensor 3 can stop it
'''
if self.state != 'LOADED':
raise Exception('Tried to advance sample but no sample is loaded.')
self.state = 'PREPARING TO Advance'
self.relayboard.setChannels({'postsample':True})
print('setting state...')
if load_dest_label == '':
self.state = 'LOAD IN PROGRESS'
else:
self.state = f'LOAD IN PROGRESS to {load_dest_label}'
print('sending dispense command')
self.pctrl.timed_dispense(self.config['load_pressure'],self.config['load_timeout'],block=False)
self.loadStoppedExternally = False
while(self.pctrl.dispenseRunning() and not self.loadStoppedExternally):
time.sleep(0.02)
self.loadStoppedExternally = False
self.relayboard.setChannels({'postsample':False})
self.state = 'LOADED'
time.sleep(1) # this is a crude hack to give the system time for the sensor data to push into the DataPacket
[docs]
@Driver.unqueued(render_hint='raw')
def stopLoad(self,**kwargs):
print(kwargs)
try:
if kwargs['secret'] == 'xrays>neutrons':
if 'LOAD IN PROGRESS' not in self.state:
warnings.warn('Tried to stop load but load is not in progress. Doing nothing.',stacklevel=2)
return 'There is no load running.'
else:
self.pctrl.stop()
self.relayboard.setChannels({'postsample':False})
self.loadStoppedExternally=True
if self.data is not None:
print(self.data)
try:
self.data['load_stop_source'] = 'external'
except AttributeError:
pass
return 'Load stopped successfully.'
else:
return 'Wrong secret.'
except KeyError:
return 'Need valid secret to stop load.'
[docs]
@Driver.quickbar(qb={'button_text':'Rinse Cell'})
def rinseCell(self,cellname='cell'):
if self.state != 'LOADED':
if self.state == 'READY':
warnings.warn('Rinsing despite READY state. This is OK, just a little extra. Lowering the arm to rinse.',stacklevel=2)
self._arm_down()
else:
raise Exception(f'Cell in inconsistent state: {self.state}')
self.relayboard.setChannels({'piston-vent':False,'postsample':True})
for i,(step,waittime) in enumerate(self.config['rinse_program']):
self.rinse_status = f'Rinse Program Step {i}/{len(self.config["rinse_program"])}: {step} for {waittime}s'
if step is not None:
if step == 'ctrlblow':
self.pctrl.set_P(self.config['blowout_pressure'])
else:
self.relayboard.setChannels({step:True})
time.sleep(waittime)
if step is not None:
if step == 'ctrlblow':
self.pctrl.set_P(0)
else:
self.relayboard.setChannels({step:False})
self.relayboard.setChannels({'postsample':False})
self._arm_up()
self.state = 'READY'
self.rinse_status = 'Not Rinsing'
[docs]
def rinseAll(self):
self.rinseCell()
[docs]
def setRinseLevel(self,vol):
self.rinse_tank_level = vol
[docs]
def setWasteLevel(self,vol):
self.waste_tank_level = vol
[docs]
@Driver.quickbar(qb={'button_text':'Prime Rinse'})
def primeRinse(self,waittime=10):
if self.state != 'READY':
raise Exception(f'Cell in inconsistent state: {self.state}')
self._arm_up()
self.relayboard.setChannels({'rinse1':True,'rinse2':False})
time.sleep(waittime)
self.relayboard.setChannels({'rinse1':False,'rinse2':True})
time.sleep(waittime)
self.relayboard.setChannels({'rinse1':False,'rinse2':False})
[docs]
@Driver.unqueued()
@Driver.quickbar(qb={'button_text':'calibrate', 'params':{}})
def calibrate_sensor(self):
if self.load_stopper is not None:
out = []
for ls in self.load_stopper:
out.append(ls.sensor.calibrate())
return out
[docs]
@Driver.unqueued()
def read_sensor(self):
if self.load_stopper is not None:
out = []
for ls in self.load_stopper:
out.append(ls.sensor.read())
return out
[docs]
@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
def read_sensor_poll(self,**kwargs):
if self.load_stopper is not None:
out = []
for ls in self.load_stopper:
output = np.transpose(ls.poll.read())
print('Serving sensor poll:',len(output),len(output[0]),len(output[1]))
out.append(list(output))
return out
[docs]
@Driver.unqueued(render_hint='1d_plot',xlin=True,ylin=True,xlabel='time',ylabel='Signal (V)')
def read_sensor_poll_load(self,**kwargs):
if self.load_stopper is not None:
out = []
for ls in self.load_stopper:
out.append(list(np.transpose(ls.poll.read_load_buffer())))
return out
[docs]
def set_sensor_config(self,**kwargs):
if self.load_stopper is not None:
if 'sensor_n' in kwargs:
self.load_stopper[kwargs[sensor_n]].update(kwargs)
self.load_stopper[kwargs[sensor_n]].reset()
else: # assume it should apply to all
for ls in self.load_stopper:
ls.config.update(kwargs)
ls.reset()
[docs]
def get_sensor_config(self,**kwargs):
if self.load_stopper is not None:
out = []
for ls in self.load_stopper:
out.append(ls.config.config)
return out
[docs]
@Driver.unqueued()
@Driver.quickbar(qb={'button_text':'Reset Sensor', 'params':{}})
def sensor_reset(self,sensor_n = None):
if self.load_stopper is not None:
if sensor_n is not None:
self.load_stopper[sensor_n].reset_poll()
self.load_stopper[sensor_n].reset_stopper()
if self.load_stopper[sensor_n]._app is not None:
self.load_stopper[sensor_n].poll.app = self._app
self.load_stopper[sensor_n].stopper.app = self._app
if self.load_stopper[sensor_n]._data is not None:
self.load_stopper[sensor_n].poll.data = self._data
self.load_stopper[sensor_n].stopper.data = self._data
self.load_stopper[sensor_n].poll.start()
self.load_stopper[sensor_n].stopper.start()
else:
for ls in self.load_stopper:
ls.reset_poll()
ls.reset_stopper()
if ls_app is not None:
ls.poll.app = self._app
ls.stopper.app = self._app
if ls._data is not None:
ls.poll.data = self._data
ls.stopper.data = self._data
ls.poll.start()
ls.stopper.start()
_DEFAULT_CUSTOM_CONFIG = {
'_classname': 'AFL.automation.loading.PneumaticPressureSampleCell.PneumaticPressureSampleCell',
'_args': [
{'_classname': 'AFL.automation.loading.DigitalOutPressureController.DigitalOutPressureController',
'_args': [
{'_classname': 'AFL.automation.loading.LabJackDigitalOut.LabJackDigitalOut',
'intermittent_device_handle': False,
'port_to_write': 'TDAC4',
#'shared_device' =
},
3
]},
{'_classname': 'AFL.automation.loading.PiPlatesRelay.PiPlatesRelay',
'_args': [
{
7:'arm-up',6:'arm-down',
1:'rinse1',2:'rinse2',3:'blow',4:'piston-vent',5:'postsample'
}]}
],
'digitalin': {'_classname': 'AFL.automation.loading.PiGPIO.PiGPIO',
'_args': [{4:'DOOR',14:'ARM_UP',15:'ARM_DOWN'}],
'pull_dir':'UP'
},
'load_stopper': [
{'_classname': 'AFL.automation.loading.LoadStopperDriver.LoadStopperDriver',
'_args': [{'_classname': 'AFL.automation.loading.LabJackSensor.LabJackSensor',
'port_to_read': 'AIN0',
'reset_port': 'DIO6'}],
'_add_data': 'data',
'name': 'LoadStopperDriver_sans',
'auto_initialize':False,
'sensorlabel':'afterSANS'
},
{'_classname': 'AFL.automation.loading.LoadStopperDriver.LoadStopperDriver',
'_args': [{'_classname': 'AFL.automation.loading.LabJackSensor.LabJackSensor',
'port_to_read': 'AIN1',
'reset_port': 'DIO7'}],
'_add_data': 'data',
'name': 'LoadStopperDriver_spec',
'auto_initialize':False,
'sensorlabel':'afterSPEC'
}
],
}
if __name__ == '__main__':
from AFL.automation.shared.launcher import *