Source code for AFL.automation.loading.ChemyxSyringePump

'''

This is largely duplicated from the reference code provided by Chemyx.  
Their package is a GUI and direct import of the module would be problematic.

'''

from AFL.automation.loading.SyringePump import SyringePump

import sys

import lazy_loader as lazy
serial = lazy.load("serial", require="AFL-automation[serial]")

import sys
import glob

import time
import datetime
[docs] class ChemyxSyringePump(SyringePump):
[docs] def __init__(self,port,syringe_id_mm,syringe_volume,baud=9600,flow_delay=5): ''' Initializes and verifies connection to a Chemyx syringe pump. port = serial port reference syringe_id_mm = syringe inner diameter in mm, used for absolute volume. (will re-program the pump with this diameter on connection) syringe_volume = syringe volume in mL baud = baudrate for connection ''' self.app = None self.name = 'ChemyxSyringePump' self.flow_delay = flow_delay self.pump = ChemyxConnection(port,baud) self.pump.openConnection() #syringe = self.pump.get_syringe_param() self.syringe_id_mm = self.getValueFromParams('dia') #self.piston_stroke_mm = syringe.max_piston_stroke_mm #self.pump.set_volume_unit(qmixpump.UnitPrefix.milli, qmixpump.VolumeUnit.litres) #self.pump.set_flow_unit(qmixpump.UnitPrefix.milli, qmixpump.VolumeUnit.litres, # qmixpump.TimeUnit.per_minute) self.syringe_volume_ml = self.getValueFromParams('volume') #this may not be a real attribute param_limits = self.pump.getParameterLimits()[1].split(' ') self.max_rate = float(param_limits[0]) self.min_rate = float(param_limits[1]) self.max_volume = float(param_limits[2]) self.min_volume = float(param_limits[3]) print(f'Currently loaded syringe is a {self.syringe_volume_ml}, max pump rate {self.max_rate}, ID {self.syringe_id_mm}')
def __del__(self): pass
[docs] def wait_dosage_finished(self, timeout_seconds=60): """ The function waits until the last dosage command has finished until the timeout occurs. """ start_time = datetime.datetime.now() timeout = datetime.timedelta(seconds=timeout_seconds) exit = False while (not exit and (datetime.datetime.now()-start_time)<timeout): exit = not self.getStatus() # False if stopped -> exit = True = time.sleep(0.05)
[docs] def stop(self): ''' Abort the current dispense/withdraw action. ''' self.pump.stopPump()
[docs] def withdraw(self,volume,block=True,delay=True): self.pump.stopPump() rate = self.getRate() expected_duration = volume / rate * 60 # anticipated duration of pump move in s timeout = expected_duration + 30 timeout = max((timeout),30) # in case things get really weird. if self.app is not None: self.app.logger.debug(f'Withdrawing {volume}mL at {rate} mL/min, expected to take {expected_duration} s') #if (self.getLevel()+volume) > self.max_volume: # self.app.logger.warn(f'Requested withdrawal of {volume} but current level is {self.getLevel()} of a max {self.max_volume}. Moving to {self.max_volume}') # self.setLevel(self.max_volume) self.pump.setVolume(-float(volume)) self.pump.startPump() self.pump.startPump() if block: self.wait_dosage_finished(timeout) if delay: time.sleep(self.flow_delay)
[docs] def dispense(self,volume,block=True,delay=True): self.pump.stopPump() rate = self.getRate() expected_duration = volume / rate * 60 # anticipated duration of pump move in s timeout = expected_duration + 30 timeout = max((timeout),30) # in case things get really weird. if self.app is not None: self.app.logger.debug(f'Withdrawing {volume}mL at {rate} mL/min, expected to take {expected_duration} s') #if (self.getLevel()-volume) < 0: # self.app.logger.warn(f'Requested dispense of {volume} but current level is {self.getLevel()} . Moving to 0') # self.setLevel(self.max_volume) self.pump.setVolume(float(volume)) self.pump.startPump() self.pump.startPump() if block: self.wait_dosage_finished(timeout) if delay: time.sleep(self.flow_delay)
[docs] def setRate(self,rate): #@TODO if self.app is not None: self.app.logger.debug(f'Setting pump rate to {rate} mL/min') self.pump.setRate(rate)
[docs] def getRate(self): #@TODO return self.getValueFromParams('rate')
[docs] def emptySyringe(self): self.setLevel(0)
[docs] def getLevel(self): return float(self.pump.getDisplacedVolume()[1].split(' = ')[1])
#self.assertAlmostEqual(max_volume, fill_level_is)
[docs] def setLevel(self,level): self.pump.dispense(self.getLevel()-level)
[docs] def blockUntilStatusStopped(self,pollingdelay=0.2): ''' This is a deprecated function from old serial logic. It should work, but do not use. ''' self.wait_dosage_finished(30)
[docs] def getStatus(self): #@TODO ''' query the pump status and return whether the pump is moving or not (true if moving, false if stopped) ''' status = self.pump.getPumpStatus()[1] if int(status)==4: raise Exception('Pump stalled or other error!') return bool(int(status))
[docs] def getValueFromParams(self,search_key): params = self.pump.getParameters() for entry in params: try: key,val = entry.split(' = ') if key == search_key: return float(val) except ValueError as e: pass
[docs] def getOpenPorts(): # portinfo = [] # for port in serial.tools.list_ports.comports(): # if port[2] != 'n/a': # info = [port.device, port.name, port.description, port.hwid] # portinfo.append(info) # return portinfo if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): # this excludes your current terminal "/dev/tty" ports = glob.glob('/dev/tty[A-Za-z]*') elif sys.platform.startswith('darwin'): ports = glob.glob('/dev/tty.*') else: raise EnvironmentError('Unsupported platform') result = [] for port in ports: try: s = serial.Serial(port) s.close() result.append(port) #print(port) except (OSError, serial.SerialException): pass #print(result) return result
[docs] def parsePortName(portinfo): """ On macOS and Linux, selects only usbserial options and parses the 8 character serial number. """ portlist = [] for port in portinfo: if sys.platform.startswith('win'): portlist.append(port[0]) elif sys.platform.startswith('darwin') or sys.platform.startswith('linux'): if 'usbserial' in port[0]: namelist = port[0].split('-') portlist.append(namelist[-1]) return portlist
[docs] class ChemyxConnection(object):
[docs] def __init__(self, port, baudrate, x = 0, mode = 0, verbose=False): self.port = port self.baudrate = baudrate self.x = x self.mode = mode self.verbose = verbose
[docs] def openConnection(self): try: self.ser = serial.Serial() self.ser.baudrate = self.baudrate self.ser.port = self.port self.ser.timeout = 0 self.ser.open() if self.ser.isOpen(): if self.verbose: print("Opened port") print(self.ser) self.getPumpStatus() self.ser.flushInput() self.ser.flushOutput() except Exception as e: if self.verbose: print('Failed to connect to pump') print(e) pass
[docs] def closeConnection(self): self.ser.close() if self.verbose: print("Closed connection")
[docs] def sendCommand(self, command): try: arg = bytes(str(command), 'utf8') + b'\r' if self.verbose: print(f' wrote : {arg}') self.ser.write(arg) time.sleep(0.5) response = self.getResponse() if self.verbose: print(f' response: {response}') return response except TypeError as e: if self.verbose: print(e) self.ser.close()
[docs] def getResponse(self): try: response_list = [] while True: response = self.ser.readlines() for line in response: line = line.strip(b'\n').decode('utf8') line = line.strip('\r') if self.verbose: print(line) response_list.append(line) break return response_list except TypeError as e: if self.verbose: print(e) self.closeConnection() except Exception as f: if self.verbose: print(f) self.closeConnection()
[docs] def startPump(self): command = 'start' command = self.addX(command) command = self.addMode(command) response = self.sendCommand(command) if(self.verbose): print('sent start command') return response
[docs] def stopPump(self): command = 'stop' command = self.addX(command) response = self.sendCommand(command) return response
[docs] def pausePump(self): command = 'pause' command = self.addX(command) response = self.sendCommand(command) return response
[docs] def restartPump(self): command = 'restart' response = self.sendCommand(command) return response
[docs] def setUnits(self, units): units_dict = {'mL/min': '0', 'mL/hr': '1', 'μL/min': '2', 'μL/hr': 3} command = 'set units ' + str(units_dict[units]) response = self.sendCommand(command) return response
[docs] def setDiameter(self, diameter): command = 'set diameter ' + str(diameter) response = self.sendCommand(command) return response
[docs] def setRate(self, rate): command = 'set rate ' + str(rate) response = self.sendCommand(command) return response
[docs] def setVolume(self, volume): command = 'set volume ' + str(volume) response = self.sendCommand(command) return response
[docs] def setDelay(self, delay): command = 'set delay ' + str(delay) response = self.sendCommand(command) return response
[docs] def setTime(self, timer): command = 'set time ' + str(timer) response = self.sendCommand(command) return response
[docs] def getParameterLimits(self): command = 'read limit parameter' response = self.sendCommand(command) return response
[docs] def getParameters(self): command = 'view parameter' response = self.sendCommand(command) return response
[docs] def getDisplacedVolume(self): command = 'dispensed volume' response = self.sendCommand(command) return response
[docs] def getElapsedTime(self): command = 'elapsed time' response = self.sendCommand(command) return response
[docs] def getPumpStatus(self): command = 'pump status' response = self.sendCommand(command) return response
[docs] def addMode(self, command): if self.mode == 0: return command else: return command + ' ' + str(self.mode - 1)
[docs] def addX(self, command): if self.x == 0: return command else: return str(self.x) + ' ' + command