import lazy_loader as lazy
opentrons = lazy.load("opentrons", require="AFL-automation[opentrons]")
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.shared.utilities import listify
import warnings
from math import ceil,sqrt
import os,json,pathlib
serial = lazy.load("serial", require="AFL-automation[serial]")
import numpy
'''
Things we want to fix:
- pipette mixing should have separate aspirate/dispense settings
- tip saving e.g., the last transfer before a load should re-use a tip
-
'''
[docs]
class OT2_Driver(Driver):
defaults = {}
defaults['shaker_port'] = '/dev/ttyACM0'
[docs]
def __init__(self,overrides=None):
self.app = None
Driver.__init__(self,name='OT2_Driver',defaults=self.gather_defaults(),overrides=overrides)
self.name = 'OT2_Driver'
self.protocol = opentrons.execute.get_protocol_api('2.0')
self.max_transfer = None
self.min_transfer = None
self.prep_targets = []
self.has_tip = False #replace with pipette object check
self.last_pipette = None
self.modules = {}
[docs]
def reset_prep_targets(self):
self.prep_targets = []
[docs]
def add_prep_targets(self,targets,reset=False):
if reset:
self.reset_prep_targets()
self.prep_targets.extend(targets)
[docs]
def get_prep_target(self):
return self.prep_targets.pop(0)
[docs]
def status(self):
status = []
if len(self.prep_targets)>0:
status.append(f'Next prep target: {self.prep_targets[0]}')
status.append(f'Remaining prep targets: {len(self.prep_targets)}')
else:
status.append('No prep targets loaded')
for k,v in self.protocol.loaded_instruments.items():
aspirate = v.flow_rate.aspirate
dispense = v.flow_rate.dispense
flow_str = f' @ {aspirate}/{dispense} uL/s'
status.append(str(v)+flow_str)
status.append(f'Gantry Speed: {v.default_speed} mm/s')
n_tips_remain = 0
for rack in v.tip_racks:
for well in rack.wells():
if well.has_tip:
n_tips_remain += 1
status.append(f'{str(v)}: {n_tips_remain} tips left')
for k,v in self.protocol.loaded_labwares.items():
status.append(str(v))
return status
[docs]
@Driver.quickbar(qb={'button_text':'Refill Tipracks',
'params':{
'mount':{'label':'Which Pipet left/right/both','type':'text','default':'both'},
}})
def reset_tipracks(self,mount='both'):
for k,pipette in self.protocol.loaded_instruments.items():
if (mount.lower()=='both') or (k==mount.lower()):
pipette.reset_tipracks()
[docs]
def reset(self):
self.app.logger.info('Resetting the protocol context')
raise NotImplementedError('This method doesn\'t work yet. For now, just restart the flask server')
# opentrons.robot.reset() doesnt work
# #XXX HACK! asyncio event loop finding borks without this
# # self.app.logger.debug(opentrons.execute._HWCONTROL)
# # del opentrons.execute._HWCONTROL
# # opentrons.execute._HWCONTROL = None
# self.app.logger.debug(opentrons.execute._HWCONTROL)
# self.protocol = opentrons.execute.get_protocol_api('2.0')
[docs]
@Driver.quickbar(qb={'button_text':'Home',
})
def home(self,**kwargs):
self.app.logger.info('Homing the robot\'s axes')
self.protocol.home()
[docs]
def parse_well(self,loc):
for i,loc_part in enumerate(list(loc)):
if loc_part.isalpha():
break
slot = loc[:i]
well = loc[i:]
return slot,well
[docs]
def get_wells(self,locs):
self.app.logger.debug(f'Converting locations to well objects: {locs}')
wells = []
for loc in listify(locs):
slot,well = self.parse_well(loc)
labware = self.get_labware(slot)
wells.append(labware[well])
self.app.logger.debug(f'Created well objects: {wells}')
return wells
[docs]
def get_labware(self,slot):
self.app.logger.debug(f'Getting labware from slot \'{slot}\'')
contents = self.protocol.deck[slot]
if contents is not None:
if type(contents) == opentrons.protocols.geometry.module_geometry.ModuleGeometry:
labware = contents.labware
else:
labware = opentrons.protocol_api.labware.Labware(contents) #need Labware() to convert to public interface
self.app.logger.debug(f'Found labware \'{labware}\'')
return labware
else:
raise ValueError('Specified slot ({slot}) is empty of labware')
[docs]
def load_labware(self,name,slot,module=None,**kwargs):
'''Load labware (containers,tipracks) into the protocol'''
if self.protocol.deck[slot] is not None:
try:
if self.protocol.deck[slot].get_name() == name: #get_name() is part of the LabwareImplementation interface
self.app.logger.info(f'Labware \'{name}\' already loaded into slot \'{slot}\'.\n')
#do nothing
except AttributeError:
if self.protocol.deck[slot].labware.load_name == name:
self.app.logger.info(f'Labware \'{name}\' already loaded into module on slot \'{slot}\'/\n')
#do nothing
else:
raise RuntimeError(f'''Attempted to load labware \'{name}\' into slot \'{slot}\'.
Slot is already filled loaded with {self.protocol.deck[slot].get_display_name()}.''')
else:
self.app.logger.debug(f'Loading labware \'{name}\' into slot \'{slot}\' into the protocol context')
if module is not None:
self.modules[slot] = self.protocol.load_module(module,slot)
loadee = self.modules[slot]
else:
loadee = self.protocol
try:
loadee.load_labware(name,slot)
except FileNotFoundError:
CUSTOM_PATH = pathlib.Path(os.environ.get('NISTOROBOTO_CUSTOM_LABWARE'))
with open(CUSTOM_PATH / name / '1.json') as f:
labware_def = json.load(f)
loadee.load_labware_from_definition(labware_def,slot)
[docs]
def set_temp(self,slot,temp):
'''Set the temperature of a tempdeck in slot slot'''
self.modules[slot].set_temperature(temp)
[docs]
def get_temp(self,slot):
'''Get the temperature of a tempdeck in slot slot'''
return (self.modules[slot].target,self.modules[slot].temperature)
[docs]
def deactivate_temp(self,slot):
'''Disablea tempdeck in slot slot'''
return self.modules[slot].deactivate()
[docs]
def set_shake(self,rpm):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M3 S{str(int(rpm))}\r\n'.encode())
[docs]
def stop_shake(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'G28\r\n'.encode())
[docs]
def set_shaker_temp(self,temp):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M104 S{str(int(temp))}\r\n'.encode())
[docs]
def unlatch_shaker(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M242\r\n'.encode())
[docs]
def latch_shaker(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M243\r\n'.encode())
[docs]
def get_shaker_temp(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M105\r\n'.encode())
resp = p.readline()
return resp
[docs]
def get_shake_rpm(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M123\r\n'.encode())
resp = p.readline()
return resp
[docs]
def get_shake_latch_status(self):
with serial.Serial(self.config['shaker_port'],115200) as p:
p.write(f'M241\r\n'.encode())
resp = p.readline()
return resp
[docs]
def load_instrument(self,name,mount,tip_rack_slots,**kwargs):
'''Load a pipette into the protocol'''
if mount in self.protocol.loaded_instruments:
if self.protocol.loaded_instruments[mount].name == name:
self.app.logger.info(f'Instrument \'{name}\' already loaded into mount \'{mount}\'.\n')
#do nothing
else:
raise RuntimeError(f'''Attempted to load instrument \'{name}\' into mount \'{mount}\'.
mount is already loaded with {self.protocol.loaded_instruments[mount]}.''')
else:
self.app.logger.debug(f'Loading pipette \'{name}\' into mount \'{mount}\' with tip_racks in slots {tip_rack_slots}')
tip_racks = []
for slot in listify(tip_rack_slots):
tip_rack = opentrons.protocol_api.labware.Labware(self.protocol.deck[slot]) #need Labware() to convert from LabwareImplementation to public interface
if not tip_rack.is_tiprack:
raise RuntimeError('Supplied slot doesn\'t contain a tip_rack!')
tip_racks.append(tip_rack)
self.protocol.load_instrument(name,mount,tip_racks=tip_racks)
#update min/max transfer values
self.min_largest_pipette=None
self.max_smallest_pipette=None
for mount,pipette in self.protocol.loaded_instruments.items():
if (self.min_transfer is None) or (self.min_transfer>pipette.min_volume):
self.min_transfer = pipette.min_volume
self.app.logger.info(f'Setting mininum transfer to {self.min_transfer}')
if (self.max_transfer is None) or (self.max_transfer<pipette.max_volume):
self.max_transfer = pipette.max_volume
self.app.logger.info(f'Setting maximum transfer to {self.max_transfer}')
#update min/max transfer values
min_vols = [pipette.min_volume for mount,pipette in self.protocol.loaded_instruments.items()]
max_vols = [pipette.max_volume for mount,pipette in self.protocol.loaded_instruments.items()]
largest_pipette_index = argmax(max_vols)
smallest_pipette_index = argmin(max_vols)
self.min_largest_pipette = min_vols[largest_pipette_index]
self.max_smallest_pipette = max_vols[smallest_pipette_index]
[docs]
def mix(self,volume, location, repetitions=1,**kwargs):
self.app.logger.info(f'Mixing {volume}uL {repetitions} times at {location}')
#get pipette based on volume
pipette = self.get_pipette(volume)
#modify source well dispense location
location_well = self.get_wells(location)[0]
pipette.mix(repetitions,volume,location_well)
[docs]
@Driver.quickbar(qb={'button_text':'Transfer',
'params':{
'source':{'label':'Source Well','type':'text','default':'1A1'},
'dest':{'label':'Dest Well','type':'text','default':'1A1'},
'volume':{'label':'Volume (uL)','type':'float','default':300}
}})
def transfer(
self,
source,dest,
volume,
mix_before=None,
mix_after=None,
air_gap=0,
aspirate_rate=None,
dispense_rate=None,
mix_aspirate_rate=None,
mix_dispense_rate=None,
blow_out=False,
post_aspirate_delay=0.0,
aspirate_equilibration_delay=0.0,
post_dispense_delay=0.0,
drop_tip=True,
force_new_tip=False,
to_top=True,
to_center=False,
to_top_z_offset=0,
fast_mixing=False,
**kwargs):
'''Transfer fluid from one location to another
Arguments
---------
source: str
Source well to transfer from. Wells should be specified as three
character strings with the first character being the slot number.
dest: str
Destination well to transfer to. Wells should be specified as
three character strings with the first character being the slot
number.
volume: float
volume of fluid to transfer
'''
self.app.logger.info(f'Transfering {volume}uL from {source} to {dest}')
if aspirate_rate is not None:
self.set_aspirate_rate(aspirate_rate)
if dispense_rate is not None:
self.set_dispense_rate(dispense_rate)
#get pipette based on volume
pipette = self.get_pipette(volume)
#get source well object
source_wells = self.get_wells(source)
if len(source_wells)>1:
raise ValueError('Transfer only accepts one source well at a time!')
else:
source_well = source_wells[0]
#get dest well object
dest_wells = self.get_wells(dest)
if len(dest_wells)>1:
raise ValueError('Transfer only accepts one dest well at a time!')
else:
dest_well = dest_wells[0]
last_dest_well = None
if to_top and to_center:
raise ValueError('Cannot dispense to_top and to_center simulaneously')
if (to_top) and (mix_after is None):
dest_well = dest_well.top(z=to_top_z_offset)
elif (to_center) and (mix_after is None):
dest_well = dest_well.center()
elif (to_top or to_center) and (mix_after is not None) and (not fast_mixing):
raise ValueError('Cannot mix_after if dispensing to_top or to_center unless using fast mixing.')
elif (to_top or to_center) and (mix_after is not None) and fast_mixing: # a very special case - dispense to top on first and intermediate transfers, then on final transfer dispense to bottom and mix_after
last_dest_well = dest_well
if to_top:
dest_well = dest_well.top(z=to_top_z_offset)
elif to_center:
dest_well = dest_well.center()
transfers = self.split_up_transfers(volume)
user_drop_tip = drop_tip #store user set value for last transfer
user_mix_before = mix_before
user_mix_after = mix_after
if self.data is not None:
self.data['transfer_strategy'] = transfers
for i,sub_volume in enumerate(transfers):
#get pipette based on volume
pipette = self.get_pipette(sub_volume)
if (self.last_pipette is not pipette) and self.has_tip:
# need to drop tip on last pipette
self.last_pipette.drop_tip(self.protocol.deck[12]['A1'])
self.has_tip = False
# ensure that intermediate transfers don't drop tip
# during sub-volume transfers.
# Note that this will be overriden in _transfer if
# force_new_tip is set
if len(transfers) == 1:
drop_tip = user_drop_tip
mix_before = user_mix_before
mix_after = user_mix_after
if last_dest_well is not None:
dest_well = last_dest_well
elif i==0: # first transfer
if (not to_top) or ((mix_after is not None) and (not fast_mixing)):
drop_tip = True
else:
drop_tip = False
if fast_mixing:
mix_before = user_mix_before
mix_after = None
elif i==(len(transfers)-1): # last sub-volume transfer
drop_tip = user_drop_tip
if fast_mixing:
mix_after = user_mix_after
mix_before = None
drop_tip = user_drop_tip
if last_dest_well is not None:
dest_well = last_dest_well
else: # intermediate transfers
if (not to_top) or ((mix_after is not None) and (not fast_mixing)):
drop_tip = True
else:
drop_tip = False
if fast_mixing:
mix_before = None
mix_after = None
self._transfer(
pipette,
sub_volume,
source_well,
dest_well,
mix_before=mix_before,
mix_after=mix_after,
air_gap=air_gap,
blow_out=blow_out,
post_aspirate_delay=post_aspirate_delay,
aspirate_equilibration_delay=aspirate_equilibration_delay,
post_dispense_delay=post_dispense_delay,
drop_tip=drop_tip,
force_new_tip=force_new_tip,
mix_aspirate_rate=mix_aspirate_rate,
mix_dispense_rate=mix_dispense_rate)
self.last_pipette = pipette
[docs]
def split_up_transfers(self,vol):
transfers = []
while True:
if sum(transfers)<vol:
transfer = min(self.max_transfer,vol-sum(transfers))
if transfer<self.min_transfer and (len(transfers)>0) and (transfers[-1]>=(2*(self.min_transfer))):
transfers[-1]-=(self.min_transfer-transfer)
transfer = self.min_transfer
if (transfer<self.min_largest_pipette) and (transfer>self.max_smallest_pipette): # Valley of death
#n_transfers = np.ceil(transfer/self.max_smallest_pipette)
transfer = self.max_smallest_pipette # I fear no evil
transfers.append(transfer)
else:
break
return transfers
def _transfer(
self,
pipette,
volume,
source_well,
dest_well,
mix_before=None,
mix_after=None,
air_gap=0,
blow_out=False,
post_aspirate_delay=0.0,
aspirate_equilibration_delay=0.0,
post_dispense_delay=0.0,
drop_tip=True,
force_new_tip=False,
mix_aspirate_rate=None,
mix_dispense_rate=None):
if force_new_tip and self.has_tip:
pipette.drop_tip(self.protocol.deck[12]['A1'])
self.has_tip = False
if not self.has_tip:
pipette.pick_up_tip()
self.has_tip = True
#need to mix before final aspirate
if mix_before is not None:
if mix_aspirate_rate is not None:
aspirate_rate = pipette.flow_rate.aspirate# store current rates
pipette.flow_rate.aspirate = mix_aspirate_rate
if mix_dispense_rate is not None:
dispense_rate = pipette.flow_rate.dispense# store current rates
pipette.flow_rate.dispense = mix_dispense_rate
nmixes,mix_volume = mix_before
pipette_max_volume = pipette.max_volume
if(mix_volume>pipette_max_volume):
warnings.warn(f'Requested mix volume {mix_volume} > pipette max volume {pipette_max_volume}. Using the max volume. This may result in unexpected behavior.',stacklevel=2)
mix_volume = min(mix_volume,pipette_max_volume)
for _ in range(nmixes):
pipette.aspirate(mix_volume,location=source_well)
pipette.dispense(mix_volume,location=source_well)
if mix_aspirate_rate is not None:
pipette.flow_rate.aspirate = aspirate_rate
if mix_dispense_rate is not None:
pipette.flow_rate.dispense = dispense_rate
pipette.aspirate(volume+air_gap,location=source_well)
self.protocol.delay(seconds=aspirate_equilibration_delay)
if post_aspirate_delay>0.0:
try:
pipette.move_to(source_well.top())
except AttributeError:
# if location is already specified
pipette.move_to(source_well)
self.protocol.delay(seconds=post_aspirate_delay)
# need to dispense before mixing
pipette.dispense(volume+air_gap,location=dest_well)
# mix sample after dispensing
if mix_after is not None:
if mix_aspirate_rate is not None:
aspirate_rate = pipette.flow_rate.aspirate# store current rates
pipette.flow_rate.aspirate = mix_aspirate_rate
if mix_dispense_rate is not None:
dispense_rate = pipette.flow_rate.dispense# store current rates
pipette.flow_rate.dispense = mix_dispense_rate
nmixes,mix_volume = mix_after
pipette_max_volume = pipette.max_volume
if(mix_volume>pipette_max_volume):
warnings.warn(f'Requested mix volume {mix_volume} > pipette max volume {pipette_max_volume}. Using the max volume. This may result in unexpected behavior.',stacklevel=2)
mix_volume = min(mix_volume,pipette_max_volume)
for _ in range(nmixes):
pipette.aspirate(mix_volume,location=dest_well)
pipette.dispense(mix_volume,location=dest_well)
if mix_aspirate_rate is not None:
pipette.flow_rate.aspirate = aspirate_rate
if mix_dispense_rate is not None:
pipette.flow_rate.dispense = dispense_rate
if blow_out:
pipette.blow_out()
if post_dispense_delay>0.0:
try:
pipette.move_to(dest_well.top())
except AttributeError:
# if location is already specified
pipette.move_to(dest_well)
self.protocol.delay(seconds=post_dispense_delay)
if self.has_tip and drop_tip:
pipette.drop_tip(self.protocol.deck[12]['A1'])
self.has_tip=False
[docs]
def set_aspirate_rate(self,rate=150):
'''Set aspirate rate of both pipettes in uL/s. Default is 150 uL/s'''
for mount,pipette in self.protocol.loaded_instruments.items():
pipette.flow_rate.aspirate = rate
[docs]
def set_dispense_rate(self,rate=300):
'''Set dispense rate of both pipettes in uL/s. Default is 300 uL/s'''
for mount,pipette in self.protocol.loaded_instruments.items():
pipette.flow_rate.dispense = rate
[docs]
def set_gantry_speed(self,speed=400):
'''Set movement speed of gantry. Default is 400 mm/s'''
for mount,pipette in self.protocol.loaded_instruments.items():
pipette.default_speed = speed
[docs]
def get_pipette(self,volume,method='min_transfers'):
self.app.logger.debug(f'Looking for a pipette for volume {volume}')
pipettes = []
minVolStr = ''
for mount,pipette in self.protocol.loaded_instruments.items():
if volume>=pipette.min_volume:
pipettes.append({'object':pipette})
for pipette in pipettes:
max_volume = pipette['object'].max_volume
ntransfers = ceil(volume/max_volume)
vol_per_transfer = volume / ntransfers
pipette['ntransfers'] = ntransfers
# **Peter** personally apologizes for these impressively long lines,
# which he believes to be correct. The systematic error is added
# straight (i.e, not sumsq) while the random is added as sumsq,
# then the two are combined as sumsq
pipette['uncertainty'] = sqrt(
(ntransfers*self._pipette_uncertainty(max_volume,vol_per_transfer,'random')**2)+
(ntransfers*self._pipette_uncertainty(max_volume,vol_per_transfer,'systematic'))**2
)
# last_transfer_vol_maxmin = volume - max_volume*(ntransfers-1)
# pipette['total_uncertainty_maxmin'] = sqrt(
# (ntransfers-1*_pipette_uncertainty(max_volume,max_volume,'random')**2+
# _pipette_uncertainty(max_volume,last_transfer_vol_maxmin,'random')**2)+
# ((ntransfers-1)*_pipette_uncertainty(max_volume,vol_per_transfer,'systematic')+
# _pipette_uncertainty(max_volume,last_transfer_vol_maxmin,'systematic')**2)
# )
self.app.logger.debug(f'Found pipettes with suitable minimum volume and computed uncertainties: {pipettes}')
if not pipettes:
raise ValueError('No suitable pipettes found!\n')
if self.data is not None:
self.data['transfer_method'] = method
self.data['pipette_options'] = str(pipettes)
if(method == 'uncertainty'):
pipette = min(pipettes,key=lambda x: x['uncertainty'])
elif(method == 'min_transfers'):
min_xfers = min(pipettes,key=lambda x: x['ntransfers'])['ntransfers']
acceptable_pipettes = filter(lambda x: x['ntransfers']==min_xfers,pipettes)
pipette = min(acceptable_pipettes,key=lambda x: x['object'].max_volume)
else:
raise ValueError(f'Pipette selection method {method} was not recognized.')
self.app.logger.debug(f'Chosen pipette: {pipette}')
if self.data is not None:
self.data['chosen_pipette'] = str(pipette)
return pipette['object']
def _pipette_uncertainty(self,maxvolume,volume,errortype):
'''pipette uncertainties from the opentrons gen 1 whitepaper
@ https://opentrons.com/publications/OT-2-Pipette-White-Paper-GEN1.pdf
pipette moving uL random error uL ± systematic error uL ±
P10 single 10 0.1 0.2
P10 single 5 0.15 0.25
P10 single 1 0.05 0.15
P50 single 50 0.2 0.5
P50 single 25 0.15 0.375
P50 single 5 0.25 0.25
P300 single 300 0.9 1.8
P300 single 150 0.6 1.5
P300 single 30 0.45 0.9
P1000 single 1000 1.5 7
P1000 single 500 1.0 5
P1000 single 100 1.0 2
'''
#dict of uncertainty data where params are error = a * volume + b
pipette_uncertainties = [
{'size':10, 'gen':1,'random_a':0.00491803278688525, 'random_b':0.0737704918032787,'systematic_a':0.00491803278688525,'systematic_b':0.173770491803279},
{'size':20, 'gen':1,'random_a':0.00491803278688525, 'random_b':0.0737704918032787,'systematic_a':0.00491803278688525,'systematic_b':0.173770491803279},#XXX fudging 20 for now, will fix...
{'size':50, 'gen':1,'random_a':-0.000983606557377049,'random_b':0.226229508196721, 'systematic_a':0.0055327868852459, 'systematic_b':0.227459016393443},
{'size':300, 'gen':1,'random_a':0.00168032786885246, 'random_b':0.381147540983607, 'systematic_a':0.00327868852459016,'systematic_b':0.875409836065574},
{'size':1000,'gen':1,'random_a':0.000573770491803279, 'random_b':0.860655737704918, 'systematic_a':0.00549180327868852,'systematic_b':1.73770491803279}
]
for pu in pipette_uncertainties:
if pu['size'] == maxvolume:
#I think sum squares is the correct way to combine these errors, but not 100%
if errortype == 'random':
return pu['random_a'] * volume + pu['random_b']
elif errortype == 'systematic':
return pu['systematic_a'] * volume + pu['systematic_b']
else:
return None
#argmax because numpy cannot be installed
[docs]
def argmax(array):
return array.index(max(array))
#argmin because numpy cannot be installed
[docs]
def argmin(array):
return array.index(min(array))
if __name__ == '__main__':
from AFL.automation.shared.launcher import *