import lazy_loader as lazy
win32com = lazy.load("win32com", require="AFL-automation[win32]")
gc = lazy.load("gc", require="AFL-automation[win32]")
pythoncom = lazy.load("pythoncom", require="AFL-automation[win32]")
import time
import datetime
from AFL.automation.APIServer.Driver import Driver
from AFL.automation.instrument.ScatteringInstrument import ScatteringInstrument
import numpy as np # for return types in get data
import h5py #for Nexus file writing
import os
import scipy.ndimage as ndi
import pandas as pd
[docs]
class CDSAXSLabview(ScatteringInstrument,Driver):
defaults = {}
defaults['beamstop axis'] = 'Beamstop-z'
defaults['beamstop in'] = 12.5
defaults['beamstop out'] = 3
defaults['sample axis'] = 'Z-stage'
defaults['sample in'] = 26.5
defaults['sample out'] = 25.0
defaults['use_new_motion_conventions'] = False
defaults['nmc_beamstop_in'] = {'Beamstop-z':12.5}
defaults['nmc_beamstop_out'] = {'Beamstop-z':3}
defaults['nmc_sample_out'] = {'Z-stage':20.0,'X-stage':18.0}
defaults['nmc_sample_in'] = {'Z-stage':26.5,'X-stage':0}
defaults['empty transmission'] = None
defaults['transmission strategy'] = 'sum'
defaults['vi'] = 'C:\saxs_control\GIXD controls.vi'
defaults['reduced_data_dir'] = r'Y:\\CDSAXS data\\autoreduce\\'
defaults['absolute_pressure_limit']=1
defaults['relative_pressure_ratio_limit']=10
defaults['sample_thickness']=0.18 #cm
defaults['absolute_calibration_factor']=1.0
defaults['empty_filename'] = None
axis_name_to_id_lut = {
'X-stage' : 0,
'Z-stage' : 1,
'Z-gonio' : 2,
'Phi' : 3,
'Theta' : 4,
'Y-gonio' : 5,
'Y-stage' : 6,
'Beamstop-z': 7,
'Beamstop-y': 8,
'Tungsten-y': 9 ,
'Detector-z': 10,
'Temperature': 11,
'None': 12
}
axis_id_to_name_lut = {value:key for key, value in axis_name_to_id_lut.items()}
[docs]
def __init__(self,overrides=None):
'''
connect to locally running labview vi with win32com and
parameters:
vi: (str) the path to the LabView virtual instrument file for the main interface
'''
import win32com.client
from win32process import SetProcessWorkingSetSize
from win32api import GetCurrentProcessId,OpenProcess
from win32con import PROCESS_ALL_ACCESS
self.client = win32com.client
self.SetProcessWorkingSetSize = SetProcessWorkingSetSize
self.GetCurrentProcessId = GetCurrentProcessId
self.OpenProcess = OpenProcess
self.PROCESS_ALL_ACCESS = PROCESS_ALL_ACCESS
self.app = None
Driver.__init__(self,name='CDSAXSLabview',defaults=self.gather_defaults(),overrides=overrides)
ScatteringInstrument.__init__(self)
if self.config['reduced_data_dir'] is not None:
os.chdir(self.config['reduced_data_dir'])
self.__instrument_name__ = 'NIST CDSAXS instrument'
self.status_txt = 'Just started...'
self.last_measured_transmission = [0,0,0,0]
[docs]
def pre_execute(self,**kwargs):
pressure = self._getLabviewValue("Pressure (mbar)")
#historical_pressure = self._getLabviewValue("Pressure Level Chart",lv=lv)
print(f'Pressure: {pressure}')
#print(f'Hist Press: {historical_pressure}')
abs_trip = pressure > self.config['absolute_pressure_limit']
#rel_trip = (pressure/historical_pressure.mean())>self.config['relative_pressure_ratio_limit']
if abs_trip:# or rel_trip:
#something has gone wrong -- vacuum excursion. raise an exception so the queue pauses.
raise Exception(f'Vacuum excursion!! Trip reason {"abs_trip" if abs_trip else "rel_trip"} Current pressure is {pressure}')#, historical average{historical_pressure.mean()}.')
[docs]
def setReducedDataDir(self,path):
self.config['reduced_data_dir'] = path
os.chdir(path)
[docs]
def measureTransmission(self,exp=5,fn='trans',set_empty_transmission=False,return_full=False,update_beam_center=True,lv=None):
'''
Perform a transmission measurement on the currently loaded sample.
Parameters
-------------
exp: float, default=5
exposure time
fn: string, default='trans'
file name to save the measurement in
set_empty_transmission: bool, default=False
set the currently-held empty transmission to the outcome of this measurement
return_full: bool, default=False
return more data. breaks some old API parts.
update_beam_center: bool, default=True
fit and update the beam center based on this measurement
lv: LabViewConnection, default=None
use this labview connection rather than making a new one if given
'''
with (LabviewConnection() if lv is None else lv) as lv:
self.status_txt = 'Moving beamstop out for transmission...'
self.moveAxis(self.config['nmc_beamstop_out'],block=True,lv=lv)
self.moveAxis(self.config['nmc_sample_out'],block=True,lv=lv)
self.status_txt = 'Measuring open beam intensity...'
open_beam = self._simple_expose(exposure=exp,filename = 't-open-beam-'+fn,block=True,lv=lv)
self.status_txt = 'Measuring sample direct beam intensity...'
self.moveAxis(self.config['nmc_sample_in'],block=True)
sample_transmission = self._simple_expose(exposure=exp,filename = 't-'+fn,block=True,lv=lv)
self.status_txt = 'Moving beamstop back in...'
self.moveAxis(self.config['nmc_beamstop_in'],lv=lv,block=True)
self.status_txt = 'Processing transmission measurement...'
trans = np.nan_to_num(sample_transmission).sum() / np.nan_to_num(open_beam).sum()
self.app.logger.info(f'Measured raw transmission of {trans*100}%, with {np.nan_to_num(sample_transmission).sum()} counts on samp and {np.nan_to_num(open_beam).sum()} in open beam.')
if set_empty_transmission:
#XXX! Should this be stored in config?
self.config['empty transmission'] = trans
retval = (trans,np.nan_to_num(open_beam).sum(),np.nan_to_num(sample_transmission).sum(),self.config['empty transmission'])
elif self.config['empty transmission'] is not None:
if return_full:
# sample transmission, open flux, sample flux, empty transmission
retval = (trans / self.config['empty transmission'],np.nan_to_num(open_beam).sum(),np.nan_to_num(sample_transmission).sum(),self.config['empty transmission'])
else:
retval = trans / self.config['empty transmission']
self.app.logger.info(f'Scaling raw transmission of {trans*100}% using empty transmission of {self.config["empty transmission"]*100} % for reported sample transmission of {trans / self.config["empty transmission"]*100}%')
else:
if return_full:
retval=(trans,np.nan_to_num(open_beam).sum(),np.nan_to_num(sample_transmission).sum())
else:
retval = trans
com = ndi.center_of_mass(np.nan_to_num(open_beam))
print(f'Fit center of mass {com} in open beam')
print(f'That converts to poni1 = {1.72e-4*com[0]} and poni2 = {1.72e-4*com[1]}')
print(f'poni1 should be {self.config["poni1"]} and poni2 {self.config["poni2"]}')
if update_beam_center:
self.config['poni1'] = 1.72e-4*com[0]
self.config['poni2'] = 1.72e-4*com[1]
self.last_measured_transmission = retval
self.status_txt = 'Idle'
return retval
[docs]
def measureTransmissionQuick(self,exp=1,fn='trans',setup=False,restore=False,lv=None):
with (LabviewConnection() if lv is None else lv) as lv:
if setup:
self.moveAxis(self.config['nmc_beamstop_out'],block=True,lv=lv)
self.moveAxis(self.config['nmc_sample_out'],block=True,lv=lv)
open_beam = self._simple_expose(exposure=exp,filename = 't-open-beam-'+fn,block=True,lv=lv)
self.config['open beam intensity'] = np.nan_to_num(open_beam).sum()
#self.config['open beam intensity updated'] = datetime.datetime.now()
self.moveAxis(self.config['nmc_sample_in'],block=True,lv=lv)
self.status_txt = 'Staged for rapid transmission measurement'
sample_transmission = self._simple_expose(exposure=exp,filename = 't-'+fn,block=True,lv=lv)
retval = np.nan_to_num(sample_transmission).sum()
if self.config['open beam intensity'] is not None:
retval = retval / self.config['open beam intensity']
if restore:
self.moveAxis(self.config['nmc_beamstop_in'],lv=lv,block=True)
self.status_txt = 'Idle'
return retval
[docs]
@Driver.unqueued()
def getExposure(self,lv=None):
'''
get the currently set exposure time
'''
return self._getLabviewValue('Single Pilatus Parameters',lv=lv)[0]
[docs]
@Driver.unqueued()
def getFilename(self,lv=None):
'''
get the currently set file name
'''
return self._getLabviewValue('Single Pilatus Parameters',lv=lv)[1]
[docs]
def setExposure(self,exposure,lv=None):
if self.app is not None:
self.app.logger.debug(f'Setting exposure time to {exposure}')
fileName = self.getFilename(lv=lv)
self._setLabviewValue('Single Pilatus Parameters',(exposure,fileName),lv=lv)
[docs]
def setFilename(self,name,lv=None):
if self.app is not None:
self.app.logger.debug(f'Setting filename to {name}')
name = name.replace('\\','').replace('/','').replace(':','').replace('%','')
exposure = self.getExposure()
self._setLabviewValue('Single Pilatus Parameters',(exposure,name),lv=lv)
[docs]
@Driver.unqueued(render_hint='2d_img',log_image=True,lv=None)
def getData(self,lv=None,**kwargs):
data = np.array(self._getLabviewValue('Pilatus Data',lv=lv))
return data
[docs]
def setPath(self,path,lv=None):
if self.app is not None:
self.app.logger.debug(f'Setting file path to {path}')
self.path = str(path)
self._setLabviewValue('FilePath',path,lv=lv)
[docs]
@Driver.unqueued()
def getStatus(self,lv=None):
return self._getLabviewValue('Process Status Message',lv=lv)
[docs]
@Driver.unqueued()
def getNScans(self,lv=None):
return self._getLabviewValue('# of Scans',lv=lv)
[docs]
def setNScans(self,nscans,lv=None):
self._setLabviewValue('# of Scans',nscans,lv=lv)
[docs]
@Driver.unqueued()
def getSweepAxis(self,lv=None):
return self.axis_id_to_name_lut[self._getLabviewValue('Sweep Axis',lv=lv)]
[docs]
def setSweepAxis(self,axis,lv=None):
print(f'setting sweep axis to {axis}')
if type(axis) is str:
axis=self.axis_name_to_id_lut[axis]
self._setLabviewValue('Sweep Axis',axis,lv=lv) # this is an enum, 6 is Y-stage
[docs]
@Driver.unqueued()
def getYStagePos(self,lv=None):
return self._getLabviewValue('Y-stage',lv=lv)
[docs]
@Driver.unqueued()
def getZStagePos(self,lv=None):
return self._getLabviewValue('Z-stage',lv=lv)
[docs]
@Driver.unqueued()
def getXStagePos(self,lv=None):
return self._getLabviewValue('X-Stage',lv=lv)
# def moveAxis(self,axis,value,exposure=0.001,filename = 'axis-move',return_data=False,block=True,lv=None):
# #this is very hacky, I apologize. The strategy is simply to set a sweep and do an exposure. This is because of labview-COM issue where we can't click the 'move axis' button.
#
# with (LabviewConnection() if lv is None else lv) as lv:
# self.setSweepAxis(axis,lv=lv)
# self.setSweepStart(value,lv=lv)
# self.setExposure(exposure,lv=lv)
# self.setFilename(filename,lv=lv)
# self.setNScans(1,lv=lv)
# self._setLabviewValue('Expose Pilatus',True,lv=lv)
# if block or return_data:
# while(self.getStatus(lv=lv) != 'Loading Image'):
# time.sleep(0.1)
# while(self.getStatus(lv=lv) != 'Success' and self.getStatus(lv=lv) != 'Collection Aborted'):
# time.sleep(0.1)
# if return_data:
# return self.getData(lv=lv)
def __moveAxis(self,axis,value,block=True,lv=None):
'''
Moves a single axis using a connection to a labview vi.
@param axis: the axis id or name of the motor to move
@param value: the position of the motor to move to
@param block: if True, this function will not return until the move is complete.
@param lv: a LabviewConnection object for use in integration with broader functions. If None (default) will make a new connection.
'''
with LabviewConnection(vi=r'C:\saxs_control\Move for Peter.vi') as lvm:
if type(axis) is str:
axis = self.axis_name_to_id_lut[axis]#this converts the axis name to an ID number used by labview
lvm.main_vi.setcontrolvalue('Axis',axis) #set the ‘axis’ drop-down box to the right axis
lvm.main_vi.setcontrolvalue('New Position',value) # type the position into the destination position box
lvm.main_vi.setcontrolvalue('Motor Move',True) # click the motor move button
if block:
with (LabviewConnection() if lv is None else lv) as lv: # this makes a connection to the main vi or uses an existing one if passed in
while('Moving' not in self.getStatus(lv=lv)): # wait for the instrument status to actually go to ‘moving’
time.sleep(0.1) #this just keeps the traffic down on the labview interface by only checking every 100 ms
while(self.getStatus(lv=lv) != 'Finished'): # wait for the instrument status to change to ‘finished’
time.sleep(0.1)
[docs]
def moveAxis(*args,block=True,lv=None):
self=args[0]
if len(args)==2:
self.nmcMoveAxis(args[1],block=block,lv=lv)
else:
self.__moveAxis(args[1],args[2],block=block,lv=lv)
[docs]
def nmcMoveAxis(self,dest,block=True,lv=None):
for motor,pos in dest.items():
self.__moveAxis(motor,pos,block=block,lv=None)
[docs]
def setSweepStart(self,start,lv=None):
self._setLabviewValue('Start Value',start,lv=lv)
[docs]
@Driver.unqueued()
def getSweepStart(self,lv=None):
return self._getLabviewValue('Start Value',lv=lv)
[docs]
def setSweepStep(self,step,lv=None):
self._setLabviewValue('Step',step,lv=lv)
[docs]
@Driver.unqueued()
def getSweepStep(self,lv=None):
return self._getLabviewValue('Step',lv=lv)
[docs]
@Driver.unqueued()
def getElapsedTime(self,lv=None):
return self._getLabviewValue('Elapsed Time',lv=lv)
def _getLabviewValue(self,val_name,lv=None):
with (LabviewConnection() if lv is None else lv) as lv:
return lv.main_vi.getcontrolvalue(val_name)
def _setLabviewValue(self,val_name,val_val,lv=None):
with (LabviewConnection() if lv is None else lv) as lv:
lv.main_vi.setcontrolvalue(val_name,val_val)
def _simple_expose(self,filename=None,exposure=None,block=True,lv=None):
with (LabviewConnection() if lv is None else lv) as lv:
if filename is None:
filename=self.getFilename(lv=lv)
if exposure is None:
exposure=self.getExposure(lv=lv)
self.status_txt = f'Starting {exposure} s count named {filename}'
self.setFilename(filename,lv=lv)
self.setExposure(exposure,lv=lv)
self.setNScans(1,lv=lv)
self.setSweepAxis('None',lv=lv)
if self.app is not None:
self.app.logger.debug(f'Starting exposure with name {filename} for {exposure} s')
self._setLabviewValue('Expose Pilatus',True,lv=lv)
#time.sleep(0.5)
if block:
while(self.getStatus(lv=lv) != 'Loading Image'):
time.sleep(0.1)
#name = self._getLabviewValue('Displayed Image P') # read back the actual name, including sequence number, for later steps.
while(self.getStatus(lv=lv) != 'Success' and self.getStatus(lv=lv) != 'Collection Aborted'):
time.sleep(0.1)
self.status_txt = 'Accessing Image'
return self.getData(lv=lv)
[docs]
def expose(self,name=None,exposure=None,block=True,reduce_data=True,measure_transmission=True,save_nexus=True,sample_position=None,sample_thickness=None,subtract_empty=False,lv=None):
with (LabviewConnection() if lv is None else lv) as lv:
if name is None:
name=self.getFilename(lv=lv)
if exposure is None:
exposure=self.getExposure(lv=lv)
if sample_position is not None:
cached_pos = self.config['nmc_sample_in']
self.config['nmc_sample_in'] = sample_position
if sample_thickness is not None:
cached_thickness = self.config['sample_thickness']
self.config['sample_thickness'] = sample_thickness
self.status_txt = f'Starting {exposure} s count named {name}'
if measure_transmission:
self.status_txt = f'Measuring transmission...'
transmission = self.measureTransmission(return_full=True,lv=lv)
self.status_txt = f'Starting {exposure} s count named {name} with transmission {str(transmission)}'
else:
transmission = [1,1,1,1]
self.setFilename(name,lv=lv)
self.setExposure(exposure,lv=lv)
self.setNScans(1,lv=lv)
self.setSweepAxis('None',lv=lv)
if self.app is not None:
self.app.logger.debug(f'Starting exposure with name {name} for {exposure} s')
self._setLabviewValue('Expose Pilatus',True,lv=lv)
#time.sleep(0.5)
if block or reduce_data or save_nexus:
while(self.getStatus(lv=lv) != 'Loading Image'):
time.sleep(0.1)
#name = self._getLabviewValue('Displayed Image P') # read back the actual name, including sequence number, for later steps.
while(self.getStatus(lv=lv) != 'Success' and self.getStatus(lv=lv) != 'Collection Aborted'):
time.sleep(0.1)
self.status_txt = 'Accessing Image'
data = self.getData(lv=lv)
if save_nexus:
self.status_txt = 'Writing Nexus'
self._writeNexus(data,name,name,transmission)
if reduce_data:
self.status_txt = 'Reducing Data'
reduced = self.getReducedData(write_data=True,filename=name,filename_kwargs={'lv':lv})
#if save_nexus:
#self._appendReducedToNexus(reduced,name,name)
self.status_txt = 'Instrument Idle'
if sample_position is not None:
self.config['nmc_sample_in'] = cached_pos
if sample_thickness is not None:
self.config['sample_thickness'] = cached_thickness
if subtract_empty:
if self.config['empty_filename'] is not None:
empty_data = pd.read_csv(self.config['empty_filename'],comment='#',header=None,delim_whitespace=True)
empty_data.columns = ['q','I']
self.data.add_array('raw_I',reduced[1])
reduced[1] = reduced[1] - empty_data.I
else:
self.app.logger.error('Asked for subtracted data, but no empty filename set. Returning UNSUBTRACTED DATA.')
print(np.shape(reduced))
self.data['q'] = reduced[0]
self.data['filename'] = name
self.data['transmission'] = transmission[0]
self.data['open_direct_beam_cts'] = transmission[1]
self.data['sample_direct_beam_cts'] = transmission[2]
self.data['empty_cell_transmission'] = transmission[3]
self.data.add_array('q',reduced[0])
self.data.add_array('I',reduced[1])
#self.data.add_array('dI',reduced[2])
return transmission
[docs]
def scan(self,axis,npts,start,step,name=None,exposure=None,block=False):
if name is not None:
self.setFilename(name)
else:
name=self.getFilename()
if exposure is not None:
self.setExposure(exposure)
else:
exposure=self.getExposure()
self.setNScans(npts)
self.setSweepAxis(axis)
self.setSweepStart(start)
self.setSweepStep(step)
if self.app is not None:
self.app.logger.debug(f'Starting exposure with name {name} for {exposure} s')
with LabviewConnection() as lv:
lv.main_vi.setcontrolvalue('Expose Pilatus',True)
time.sleep(0.5)
if block:
while(self.getStatus() != 'Success'):
time.sleep(0.1)
[docs]
def status(self):
status = []
status.append(f'Last Measured Transmission: scaled={self.last_measured_transmission[0]} using empty cell trans of {self.last_measured_transmission[3]} with {self.last_measured_transmission[2:3]} raw counts in open/sample')
status.append(f'Status: {self.status_txt}')
#lmj = self._getLabviewValue("LMJ Status")
#status.append(f'LMJ status: {"running, power on target = "+str(lmj[0]*lmj[1])+"W" if lmj[6]==1 else "not running"}')
#status.append(f'Vacuum (mbar): {self._getLabviewValue("Pressure (mbar)")}')
status.append(f'<a href="getData" target="_blank">Live Data (2D)</a>')
status.append(f'<a href="getReducedData" target="_blank">Live Data (1D)</a>')
status.append(f'<a href="getReducedData?render_hint=2d_img&reduce_type=2d">Live Data (2D, reduced)</a>')
return status
[docs]
class LabviewConnection():
[docs]
def __init__(self,vi=r'C:\saxs_control\GIXD controls.vi'):
'''
connect to locally running labview vi with win32com and
parameters:
vi: (str) the path to the LabView virtual instrument file for the main interface
'''
#print(f'init labview context...')
self.vi = vi
#print(f'Entering Labview context...')
pythoncom.CoInitialize()
self.labview = win32com.client.dynamic.Dispatch("Labview.Application")
self.main_vi = self.labview.getvireference(self.vi)
#self.main_vi.setcontrolvalue('Measurement',3) # 3 should bring the single Pilatus tab to the front
def __enter__(self):
return self
def __exit__(self,exittype,value,traceback):
pass #print(f'Exiting LabView context...')
def __del__(self):
#print(f'Deleting Labview object...')
self.main_vi = None
self.labview = None
gc.collect()
pythoncom.CoUninitialize()
if(pythoncom._GetInterfaceCount()>0):
print(f'Closed COM connection, but had remaining objects: {pythoncom._GetInterfaceCount()}')
self.SetProcessWorkingSetSize(self.OpenProcess(self.PROCESS_ALL_ACCESS,True,self.GetCurrentProcessId()),-1,-1)
if __name__ == '__main__':
from AFL.automation.shared.launcher import *