Source code for AFL.automation.loading.NE1kSyringePump

from AFL.automation.loading.SyringePump import SyringePump
from AFL.automation.loading.SerialDevice import SerialDevice
import time

[docs] class NE1kSyringePump(SyringePump):
[docs] def __init__(self,port,syringe_id_mm,syringe_volume,baud=9600,daisy_chain=None,pumpid=None,flow_delay=5): ''' Initializes and verifies connection to a New Era 1000 syringe pump. port = serial port reference syringe_id_mm = syringe inner diameter in mm, used for absolute volume. (will re-program the pump with this diameter on connection) syringe_volume = syringe volume in mL baud = baudrate for connection daisy_chain = used for the 'party-line' mode on these pumps where a string of pumps is on one serial port. when setting up daisy chaining: connect to the first pump on a port with daisy_chain = False on subsequent pumps, set daisy_chain to the pump with a hardware connection (the first pump) or any other pump on the string. note: when daisy chaining you should probably set pumpid explicitly rather than autodiscovering as most likely the autodiscovery will return the first pump id each time. pumpid = the ID configured in the pump firmware. If not set, will attempt to auto-discover a pump. setting pumpid will save some time on connection and probably result in more reproducible behavior. Practically mandatory for daisy chain mode. ''' self.app = None self.name = 'NE1kSyringePump' self.pumpid = pumpid self.flow_delay = flow_delay if daisy_chain is not None: self.serial_device = daisy_chain.serial_device else: self.serial_device = SerialDevice(port,baudrate=baud,timeout=0.5) # try to connect if self.pumpid is None: for i in range(75): if len(self.serial_device.sendCommand('%iADR\x0D'%i))>0: self.pumpid = i break else: if len(self.serial_device.sendCommand('%iADR\x0D'%self.pumpid))==0: raise NoDeviceFoundException assert self.pumpid is not None, "Error: no answer from any of the first 75 pumps. Is speed correct?" # reset diameter self.syringe_id_mm = syringe_id_mm self.syringe_volume = syringe_volume self.stop()#stop the pump self.serial_device.sendCommand('%iDIA %.02f\x0D'%(self.pumpid,syringe_id_mm)) #set the diameter readback = self.serial_device.sendCommand('%iDIA\x0D'%self.pumpid) #readback dia = float(readback[4:-1]) assert dia==syringe_id_mm, "Warning: syringe diameter set failed. Commanded diameter "+str(syringe_id_mm)+", read back "+str(dia)
[docs] def stop(self): ''' Abort the current dispense/withdraw action. Equivalent to pressing stop button on panel. ''' self.serial_device.sendCommand('%iSTP\x0D'%self.pumpid,questionmarkOK=True)
[docs] def withdraw(self,volume,block=True,delay=True): if self.app is not None: rate = self.getRate() self.app.logger.debug(f'Withdrawing {volume}mL at {rate} mL/min') self.serial_device.sendCommand('%iVOLML\x0D'%self.pumpid) self.serial_device.sendCommand('%iVOL %.03f\x0D'%(self.pumpid,volume)) self.serial_device.sendCommand('%iDIRWDR\x0D'%self.pumpid) self.serial_device.sendCommand('%iRUN\x0D'%self.pumpid) if block: self.blockUntilStatusStopped() if delay: time.sleep(self.flow_delay)
[docs] def dispense(self,volume,block=True,delay=True): if self.app is not None: rate = self.getRate() self.app.logger.debug(f'Dispensing {volume}mL at {rate} mL/min') self.serial_device.sendCommand('%iVOLML\x0D'%self.pumpid) self.serial_device.sendCommand('%iVOL%.03f\x0D'%(self.pumpid,volume)) self.serial_device.sendCommand('%iDIRINF\x0D'%self.pumpid) self.serial_device.sendCommand('%iRUN\x0D'%self.pumpid) if block: self.blockUntilStatusStopped() if delay: time.sleep(self.flow_delay)
[docs] def setRate(self,rate): if self.app is not None: self.app.logger.debug(f'Setting pump rate to {rate} mL/min') self.serial_device.sendCommand('%iRAT%.02fMM\x0D'%(self.pumpid,rate),debug=True) if self.getRate()!=rate: raise ValueError('Pump rate change failed')
[docs] def getRate(self): output = self.serial_device.sendCommand('%iRAT\x0D'%self.pumpid) units = output[-3:-1] if units=='MM': rate = float(output[4:-3]) elif units=='UM': rate = float(output[4:-3])/1000 elif units=='MH': rate = float(output[4:-3])/60 elif units=='UH': rate = float(output[4:-3])/60/1000 else: raise ValueError(f'Rate units not supported in getRate: {units}') return rate
[docs] def emptySyringe(self): self.dispense(self.syringe_volume*0.9)
[docs] def blockUntilStatusStopped(self,pollingdelay=0.2): statuschar = 'X' while statuschar is not 'S': time.sleep(pollingdelay) statuschar = self.getStatus()[0]
[docs] def getStatus(self): ''' query the pump status and return a tuple of the status character, infused volume, and withdrawn volume) ''' dispensed = self.serial_device.sendCommand('%iDIS\x0D'%self.pumpid) # example answer: 10SI0.000W20.00ML statuschar = dispensed[3] infusedvol = float(dispensed[5:10]) withdrawnvol = float(dispensed[11:16]) return(statuschar,infusedvol,withdrawnvol)