Source code for AFL.automation.EpicsADLiveProcess.AreaDetectorLive

import lazy_loader as lazy
epics = lazy.load("epics", require="AFL-automation[neutron-scattering]")
import numpy as np
import queue


[docs] class AreaDetectorLive():
[docs] def __init__(self,basepv="PIL5:",cam="cam1:",filewriter="TIFF1:",image="image1:"): self.imgqueue = queue.Queue() self.size_x = epics.caget(basepv+cam+"ArraySizeX_RBV") self.size_y = epics.caget(basepv+cam+"ArraySizeY_RBV") self.det_model = epics.caget(basepv+cam+"Model_RBV") self.det_mfgr = epics.caget(basepv+cam+"Manufacturer_RBV") self.filename = "notset" self.expt = 0 self.arraydata = None self.PVimagearray = epics.PV(basepv+image+"ArrayData",callback=self.cbfunc,auto_monitor=True) self.PVfilename = epics.PV(basepv+cam+"FileName_RBV",callback=self.cbfunc,auto_monitor=True) self.PVexpt = epics.PV(basepv+cam+"AcquireTime",callback=self.cbfunc,auto_monitor=True) #@TODO: this could track all the metadata in, e.g., the HDF5 filewriter, and might need to for complex experiments. For now this is probably fine. print(f'Connected to a {self.det_mfgr} {self.det_model}, {self.size_x} x {self.size_y}')
[docs] def cbfunc(self,pvname=None,value=None,char_value=None,**kwargs): self.imgqueue.put((pvname,value))
[docs] def queuehandler(self): data = self.imgqueue.get(block=True,timeout=None) if(data[0] == self.PVfilename.pvname): self.filename = data[1].tobytes().decode('ascii') return None elif(data[0] == self.PVexpt.pvname): self.expt = data[1] return None elif(data[0] == self.PVimagearray.pvname): if(len(data[1])>0): self.arraydata = np.reshape(data[1],(self.size_y,self.size_x)) return (self.filename,self.expt,self.arraydata) return None else: raise NotImplementedError
[docs] def status(self): print(f'Last filename: {self.filename}') print(f'Last exposure: {self.expt}') print(f'Queue contains {self.imgqueue.qsize()} unprocessed items')