Source code for AFL.automation.instrument.CHESSID3B

import gc
import time
import datetime
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.instrument.ScatteringInstrument import ScatteringInstrument
from AFL.automation.instrument.PySpecClient import PySpecClient
import numpy as np # for return types in get data
import h5py #for Nexus file writing
import os
import pathlib
import PIL
import warnings
import json

[docs] class CHESSID3B(ScatteringInstrument,Driver): defaults = {} defaults['sample axis'] = 'Z-stage' defaults['sample in'] = 26.5 defaults['sample out'] = 25.0 defaults['empty transmission'] = None defaults['transmission strategy'] = 'sum' defaults['reduced_data_dir'] = '/mnt/home/chess_id3b/beaucage/211012-data' defaults['exposure'] = 1. defaults['i0_counter'] = 'ic3' defaults['diode_counter'] = 'diode' defaults['preferred_det'] = 'PIL5' defaults['sample_thickness'] = 1.58 defaults['absolute_calibration_factor'] = 1 defaults['measurement_positions'] = [-91.47,-101.47,-126.47]
[docs] def __init__(self,overrides=None): ''' connect to spec ''' self.app = None Driver.__init__(self,name='CHESSID3B',defaults=self.gather_defaults(),overrides=overrides) ScatteringInstrument.__init__(self) self.client = PySpecClient(address='id3b.classe.cornell.edu',port='spec') self.client.connect() if self.config['reduced_data_dir'] is not None: os.chdir(self.config['reduced_data_dir']) self.__instrument_name__ = 'CHESS ID3B instrument' self.status_txt = 'Just started...' self.last_measured_transmission = [0,0,0,0]
[docs] def pre_execute(self,**kwargs): pass
[docs] def setReducedDataDir(self,path): self.config['reduced_data_dir'] = path os.chdir(path)
[docs] @Driver.quickbar(qb={'button_text':'Measure Transmission', 'params':{ 'set_empty_transmission':{'label':'Set Empty Trans?','type':'boolean','default':False} }}) def measureTransmission(self,set_empty_transmission=False,return_full=False): warnings.warn('measureTransmission is ill-defined on instruments with beamstop diodes. Returning the last measured transmission. To avoid this warning, call lastTransmission directly.',stacklevel=2) return self.lastTransmission(set_empty_transmission=set_empty_transmission,return_full=return_full)
[docs] def lastTransmission(self,set_empty_transmission=False,return_full=False): open_beam = self.client.get_counter(self.config['i0_counter']) trans_beam = self.client.get_counter(self.config['diode_counter']) print(f"lastTransmission: open_beam={open_beam}") print(f"lastTransmission: trans_beam={trans_beam}") try: trans = trans_beam / open_beam except ZeroDivisionError: trans=0 if set_empty_transmission: #XXX! Should this be stored in config? self.config['empty transmission'] = trans retval = (trans,open_beam,trans_beam,self.config['empty transmission']) elif self.config['empty transmission'] is not None: if return_full: retval = (trans / self.config['empty transmission'],open_beam,trans_beam,self.config['empty transmission']) else: retval = trans / self.config['empty transmission'] self.app.logger.info(f'Scaling raw transmission of {trans*100}% using empty transmission of {self.config["empty transmission"]*100} % for reported sample transmission of {trans / self.config["empty transmission"]*100}%') else: if return_full: retval=(trans,open_beam,trans_beam.sum()) else: retval = trans self.last_measured_transmission = (trans/self.config['empty transmission'],open_beam,trans_beam,self.config['empty transmission']) self.status_txt = 'Idle' return retval
[docs] def measureTransmissionQuick(self,exp=0.05,fn='align'): self._simple_expose(exposure=exp,name= fn,block=True) retval = self.lastTransmission() if self.config['open beam intensity'] is not None: retval = retval / self.config['open beam intensity'] return retval
[docs] def interactiveLoad(self,name='lineup',cutoff_trans = 0.8,timeout=120): self.client.run_cmd('pil_off') self.client.run_cmd('opens') self.client.run_cmd(f'newfile {name}') start = datetime.datetime.now() timeout=datetime.timedelta(seconds=timeout) self.client.run_cmd('ct 0.1') baseline_trans = self.lastTransmission() while(datetime.datetime.now() - start)<timeout: self.client.run_cmd('ct 0.1') trans_ratio = self.lastTransmission()/baseline_trans if trans_ratio < cutoff_trans_ratio: self.app.logger.debug(f'Stopping load at transmission ratio = {trans_ratio}') break self.client.run_cmd('closes') self.client.run_cmd('pil_on')
[docs] @Driver.unqueued() def getExposure(self): ''' get the currently set exposure time ''' return self.config['exposure']
[docs] @Driver.unqueued() def getFilename(self): ''' get the currently set file name ''' return self.client.get_variable('DATAFILE').get()
[docs] @Driver.unqueued() def getLastFilePath(self,**kwargs): ''' get the currently set file name ''' specdir = self.client.get_variable('CWD').get() datafile = self.getFilename() scan_n = int(self.client.get_variable('SCAN_N').get()) datadir = pathlib.Path(specdir) / (f'{datafile}_{scan_n:03d}') try: det_str = kwargs['preferred_det'] except KeyError: det_str = self.config['preferred_det'] files = [x for x in datadir.iterdir() if det_str in str(x)] filepath = max(files,key=lambda x: int(x.parts[-1][-8:-5])) return filepath
[docs] def setExposure(self,exposure): if self.app is not None: self.app.logger.debug(f'Setting exposure time to {exposure}') self.config['exposure'] = exposure
[docs] def setFilename(self,name): if self.app is not None: self.app.logger.debug(f'Setting filename to {name}') name = name.replace('\\','').replace('/','').replace(':','').replace('%','') self.client.run_cmd(f'newfile {name}')
[docs] def getElapsedTime(self): isFlyScan = bool(self.client.get_variable('scanType_Fly').get() & self.client.get_variable('_stype').get() ) if isFlyScan: return self.client.get_variable('_ctime').get()/self.client.get_variable('NPTS').get() else: return self.client.get_variable('_ctime').get()
[docs] @Driver.unqueued(render_hint='2d_img',log_image=True) def getData(self,**kwargs): try: filepath = self.getLastFilePath() data = np.array(PIL.Image.open(filepath)) except FileNotFoundError: nattempts = 1 while nattempts<11: nattempts = nattempts +1 time.sleep(0.2) try: filepath = self.getLastFilePath() data = np.array(PIL.Image.open(filepath)) except FileNotFoundError: if nattempts == 10: raise FileNotFoundError(f'could not locate file after {nattempts} tries') else: warnings.warn(f'failed to load file, trying again, this is try {nattempts}') else: break return np.nan_to_num(data)
[docs] def getMotorPosition(self,name): return self.client.get_motor(name).get_position()
[docs] def moveAxis(self,axis,value,block=False): ''' Moves a single axis using a connection to a labview vi. @param axis: the axis id or name of the motor to move @param value: the position of the motor to move to @param block: if True, this function will not return until the move is complete. ''' mot = self.client.get_motor(axis).move(value) mot.move(value) if block: while(mot.moving): pass
def _simple_expose(self,name=None,exposure=None,block=False): if name is None: name=self.getFilename() else: self.setFilename(name) if exposure is None: exposure=self.getExposure() else: self.setExposure(exposure) self.status_txt = f'Starting {exposure} s count named {name}' if self.app is not None: self.app.logger.debug(f'Starting exposure with name {name} for {exposure} s') self.client.run_cmd(f'opens') self.client.run_cmd(f'tseries 1 {exposure}') self.client.run_cmd(f'closes') if block: raise NotImplementedError() self.status_txt = 'Accessing Image' return self.getData(lv=lv)
[docs] @Driver.quickbar(qb={'button_text':'Expose', 'params':{ 'name':{'label':'Name','type':'text','default':'test_exposure'}, 'exposure':{'label':'Exposure (s)','type':'float','default':5}, 'reduce_data':{'label':'Reduce?','type':'bool','default':True}, 'measure_transmission':{'label':'Measure Trans?','type':'bool','default':True} }}) def expose(self,name=None,exposure=None,nexp=1,block=True,reduce_data=True,measure_transmission=True,save_nexus=True): if name is None: name=self.getFilename() else: self.setFilename(name) if exposure is None: exposure=self.getExposure() else: self.setExposure(exposure) self.status_txt = f'Starting {exposure} s count named {name}' if self.app is not None: self.app.logger.debug(f'Starting exposure with name {name} for {exposure} s') pos_trans = [] pos_raw_data = [] pos_simple_trans = [] pos_reduced_data = [] for idx,pos in enumerate(self.config['measurement_positions']): self.client.run_cmd(f'umv samx {pos}') self.client.run_cmd(f'opens') self.client.run_cmd(f'tseries {nexp} {exposure/10}') self.client.run_cmd(f'tseries {nexp} {exposure}',block=True) self.client.run_cmd(f'closes') pos_raw_data.append(self.getData()) pos_reduced_data.append(self.getReducedData(write_data=True,filename=f'{name}_{idx}')) pos_trans.append(self.lastTransmission(return_full=True)) pos_simple_trans.append(pos_trans[-1][0]) #time.sleep(0.5) if block or reduce_data or save_nexus: chosen_id = np.argmin(pos_simple_trans) self.client.block_for_ready() self.status_txt = 'Accessing Image' self.app.logger.debug(f'Min transmission was {np.min(pos_simple_trans)} at index {np.argmin(pos_simple_trans)}; returning that data. Other transmissions were {pos_simple_trans}') data = pos_raw_data[chosen_id] transmission = pos_trans[chosen_id] #transmission = self.lastTransmission(return_full=True) if save_nexus: self.status_txt = 'Writing Nexus' self._writeNexus(data,name,name,transmission) if reduce_data: self.status_txt = 'Reducing Data' reduced = pos_reduced_data[chosen_id] print(np.shape(reduced)) np.savetxt(f'{name}_chosen_r1d.csv',np.transpose(reduced),delimiter=',') out_dict = {} out_dict['transmissions'] = pos_trans out_dict['chosen'] = int(chosen_id) with open(f'{name}_trans.json','w') as f: json.dump(out_dict,f) #self.getReducedData(write_data=True,filename=name) #if save_nexus: #self._appendReducedToNexus(reduced,name,name) self.status_txt = 'Instrument Idle' return transmission
[docs] def scan(self,axis,npts,start,step,name=None,exposure=None,block=False): if name is not None: self.setFilename(name) else: name=self.getFilename() if exposure is not None: self.setExposure(exposure) else: exposure=self.getExposure() self.setNScans(npts) self.setSweepAxis(axis) self.setSweepStart(start) self.setSweepStep(step) if self.app is not None: self.app.logger.debug(f'Starting exposure with name {name} for {exposure} s') with LabviewConnection() as lv: lv.main_vi.setcontrolvalue('Expose Pilatus',True) time.sleep(0.5) if block: while(self.getStatus() != 'Success'): time.sleep(0.1)
[docs] def status(self): status = [] status.append(f'Last Measured Transmission: scaled={self.last_measured_transmission[0]} using empty cell trans of {self.last_measured_transmission[3]} with {self.last_measured_transmission[1]} raw counts in open/ {self.last_measured_transmission[2]} sample') status.append(f'Status: {self.status_txt}') #lmj = self._getLabviewValue("LMJ Status") #status.append(f'LMJ status: {"running, power on target = "+str(lmj[0]*lmj[1])+"W" if lmj[6]==1 else "not running"}') #status.append(f'Vacuum (mbar): {self._getLabviewValue("Pressure (mbar)")}') status.append(f'<a href="getData" target="_blank">Live Data (2D)</a>') status.append(f'<a href="getReducedData" target="_blank">Live Data (1D)</a>') status.append(f'<a href="getReducedData?render_hint=2d_img&reduce_type=2d">Live Data (2D, reduced)</a>') return status
if __name__ == '__main__': from AFL.automation.shared.launcher import *