import gc
import time
import datetime
from AFL.automation.APIServer.Driver import Driver
import numpy as np # for return types in get data
import h5py #for Nexus file reading
import lazy_loader as lazy
epics = lazy.load("epics", require="AFL-automation[neutron-scattering]")
import os
import pathlib
import warnings
Matilda = lazy.load("Matilda", require="AFL-automation[usaxs]")
[docs]
class APSUSAXS(Driver):
defaults = {}
defaults['sample_thickness'] = 1.58
defaults['run_initiate_pv'] = '9idcLAX:AutoCollectionStart'
defaults['script_name_pv'] = '9idcLAX:AutoCollectionStrInput'
defaults['instrument_status_pv'] = '9idcLAX:state'
defaults['script_path'] = '/mnt/share1/USAXS_data/2022-06/'
defaults['instrument_running_pv'] = '9idcLAX:dataColInProgress'
defaults['script_template_file'] = 'AFL-template.mac'
defaults['script_file'] = 'AFL.mac'
defaults['magic_project_key'] = '!!AFL-SETNAME!!'
defaults['magic_filename_key'] = '!!AFLREPLACEME!!'
defaults['magic_xpos_key'] = '!!AFLXPOS!!'
defaults['magic_ypos_key'] = '!!AFLYPOS!!'
defaults['active_holder'] = '6A'
defaults['script_write_cooldown'] = 1
defaults['platemap'] = {
'6A':{
'SlotA':
{
'x0': 0,
'y0': 0,
'x_step': 9,
'y_step': 9,
'x_per_y_skew': .5,
'y_per_x_skew': -(8/12)*.5,
},
'SlotB':
{
'x0': -150,
'y0': -150,
'x_step': 9,
'y_step': 9,
'x_per_y_skew': 1,
'y_per_x_skew': -(8/12)*1,
},
'SlotC':
{
'x0': 150,
'y0': 150,
'x_step': 9,
'y_step': 9,
'x_per_y_skew': -8,
'y_per_x_skew': -(8/12)*-8,
},
}
}
defaults['file_read_max_retries'] = 20
defaults['file_read_retry_sleep'] = 15.0
defaults['file_read_check_key'] = 'CalibratedData' # Dictionary key to check for None
defaults['userdir_pv'] = 'usxLAX:userDir'
defaults['datadir_pv'] = 'usxLAX:sampleDir'
defaults['next_fs_order_n_pv'] = 'usxLAX:USAXS:FS_OrderNumber'
[docs]
def __init__(self,overrides=None):
'''
connect to usaxs via EPICS
'''
self.readMyNXcanSAS = lazy.load("Matilda.hdf5code.readMyNXcanSAS", require="AFL-automation[usaxs]")
self.app = None
Driver.__init__(self,name='APSUSAXS',defaults=self.gather_defaults(),overrides=overrides)
self.__instrument_name__ = 'APS USAXS instrument'
self.status_txt = 'Just started...'
self.filename = 'default'
self.filename_prefix = 'AFL'
self.project = 'AFL'
self.xpos = 0
self.ypos = 0
[docs]
def pre_execute(self,**kwargs):
pass
[docs]
@Driver.unqueued()
def getFilenamePrefix(self):
'''
get the currently set file name prefix
'''
return self.filename_prefix
[docs]
@Driver.unqueued()
def setFilenamePrefix(self,prefix):
'''
set the currently set file name prefix
'''
self.filename_prefix = prefix
if self.app is not None:
self.app.logger.debug(f'Setting filename prefix to {prefix}')
[docs]
@Driver.unqueued()
def getFilename(self):
'''
get the currently set file name
'''
return self.filename
[docs]
def setFilename(self,name):
if self.app is not None:
self.app.logger.debug(f'Setting filename to {name}')
self.filename = name
[docs]
@Driver.unqueued()
def getProject(self):
'''
get the currently set file name
'''
return self.project
def _coords_from_tuple(self,slot,row,col):
if len(row)>1:
raise ValueError('row must be a single letter')
row = ord(row) & 31
col = int(col)
geom = self.config['platemap'][self.config['active_holder']][slot]
return (geom['x0'] + (row-1)*geom['x_step'] + geom['x_per_y_skew']*(col-1),
geom['y0'] + (col-1)*geom['y_step'] + geom['y_per_x_skew']*(row-1))
[docs]
def setProject(self,name):
if self.app is not None:
self.app.logger.debug(f'Setting filename to {name}')
self.project = name
def _writeUSAXSScript(self):
'''
burn the current filename and project name into a USAXS script
'''
lines = []
with open(pathlib.Path(self.config['script_path'])/self.config['script_template_file'],'r') as f:
for line in f:
s = line.replace(self.config['magic_project_key'],self.project)
s = s.replace(self.config['magic_filename_key'],self.filename)
s = s.replace(self.config['magic_xpos_key'],str(self.xpos))
s = s.replace(self.config['magic_ypos_key'],str(self.ypos))
s = s.replace('\r','')
s = s.replace('\n','')
lines.append(s)
with open(pathlib.Path(self.config['script_path'])/self.config['script_file'],'w') as f:
for line in lines:
f.write(line+'\r\n')
def _safe_read_file(self, filepath, filename,is_usaxs=True):
'''
Safely read a USAXS file with retry logic.
Checks if file exists, then calls readMyNXcanSAS. If the specified
dictionary entry is None, retries after sleeping. Continues until
max retries are reached or the file is successfully read.
Parameters
----------
filepath : pathlib.Path
Path to the directory containing the file
filename : str
Name of the file to read
Returns
-------
dict
Dictionary returned by readMyNXcanSAS
Raises
------
FileNotFoundError
If the file does not exist
RuntimeError
If the file cannot be read successfully after max retries
'''
full_path = filepath / filename
# Check if file exists
if not full_path.exists():
raise FileNotFoundError(f'File does not exist: {full_path}')
max_retries = self.config['file_read_max_retries']
retry_sleep = self.config['file_read_retry_sleep']
check_key = self.config['file_read_check_key']
for attempt in range(max_retries):
try:
data_dict = self.readMyNXcanSAS(filepath, filename,is_usaxs=is_usaxs)
# Check if the specified key is None
if check_key not in data_dict:
if self.app is not None:
self.app.logger.warning(
f'Key "{check_key}" not found in data dictionary. '
f'Available keys: {list(data_dict.keys())}'
)
# If key doesn't exist, treat as failure and retry
if attempt < max_retries - 1:
time.sleep(retry_sleep)
continue
else:
raise RuntimeError(
f'Key "{check_key}" not found in data dictionary after {max_retries} attempts'
)
if data_dict[check_key] is None:
if attempt < max_retries - 1:
if self.app is not None:
self.app.logger.debug(
f'Key "{check_key}" is None, retrying in {retry_sleep}s '
f'(attempt {attempt + 1}/{max_retries})'
)
time.sleep(retry_sleep)
continue
else:
raise RuntimeError(
f'Key "{check_key}" is None after {max_retries} attempts. '
f'File may not be fully written: {full_path}'
)
# Success - data is valid
if self.app is not None:
self.app.logger.debug(f'Successfully read file {filename} on attempt {attempt + 1}')
return data_dict
except Exception as e:
if attempt < max_retries - 1:
if self.app is not None:
self.app.logger.warning(
f'Error reading file {filename} (attempt {attempt + 1}/{max_retries}): {e}. '
f'Retrying in {retry_sleep}s...'
)
time.sleep(retry_sleep)
continue
else:
raise RuntimeError(
f'Failed to read file {full_path} after {max_retries} attempts. '
f'Last error: {e}'
)
# Should never reach here, but just in case
raise RuntimeError(f'Failed to read file {full_path} after {max_retries} attempts')
[docs]
@Driver.quickbar(qb={'button_text':'Expose',
'params':{
'name':{'label':'Name','type':'text','default':'test_exposure'},
'exposure':{'label':'Exposure (s)','type':'float','default':5},
'reduce_data':{'label':'Reduce?','type':'bool','default':True},
'measure_transmission':{'label':'Measure Trans?','type':'bool','default':True}
}})
def expose(self, name = None, block = True, read_USAXS = True, read_SAXS = True):
if name is None:
name=self.getFilenamePrefix()
else:
self.setFilenamePrefix(name)
self.status_txt = f'Starting USAXS/SAXS/WAXS scan named {name}'
if self.app is not None:
self.app.logger.debug(f'Starting USAXS/SAXS/WAXS exposure with name {name}')
self.status_txt = 'Writing script...'
self._writeUSAXSScript()
self.status_txt = 'Waiting for script save...'
time.sleep(self.config['script_write_cooldown'])
epics.caput(self.config['script_name_pv'],self.config['script_file'])
time.sleep(0.1)
epics.caput(self.config['run_initiate_pv'],1)
self.status_txt = 'Run started!'
time.sleep(0.5)
if block or reduce_data:
time.sleep(20)
self.block_for_run_finish()
self.status_txt = 'Instrument Idle'
user_dir = epics.caget(self.config['userdir_pv'],as_string=True)
data_dir = epics.caget(self.config['datadir_pv'],as_string=True)
fs_order_n = epics.caget(self.config['next_fs_order_n_pv']) - 1.0 # need to subtract 1 because the order number is incremented after the scan starts
filename= f"{self.filename_prefix}_{fs_order_n:04d}.h5"
if read_USAXS:
filepath_usaxs = pathlib.Path(user_dir) / (str(data_dir) + '_usaxs') / filename
data_dict_usaxs = self._safe_read_file(filepath_usaxs, filename,is_usaxs=True)
self.data.add_array('USAXS_q',data_dict_usaxs['CalibratedData']['Q'])
self.data.add_array('USAXS_I',data_dict_usaxs['CalibratedData']['Intensity'])
self.data.add_array('USAXS_dI',data_dict_usaxs['CalibratedData']['Error'])
self.data['USAXS_Filepath'] = str(filepath_usaxs)
self.data['USAXS_Filename'] = filename
self.data['USAXS_name'] = name
if read_SAXS:
filepath_saxs = pathlib.Path(user_dir) / (str(data_dir) + '_saxs') / filename
data_dict_saxs = self._safe_read_file(filepath_saxs, filename,is_usaxs=False)
self.data.add_array('SAXS_q',data_dict_saxs['CalibratedData']['Q'])
self.data.add_array('SAXS_I',data_dict_saxs['CalibratedData']['Intensity'])
self.data.add_array('SAXS_dI',data_dict_saxs['CalibratedData']['Error'])
self.data['SAXS_Filepath'] = str(filepath_saxs)
self.data['SAXS_Filename'] = filename
self.data['SAXS_name'] = name
[docs]
def block_for_run_finish(self):
while self.getRunInProgress():
time.sleep(5)
[docs]
def getRunStatus(self):
return epics.caget(self.config['instrument_status_pv'],as_string=True)
[docs]
def setPosition(self,plate,row,col,x_offset=0,y_offset=0):
(self.xpos,self.ypos) = self._coords_from_tuple(plate,row,col)
self.xpos+=x_offset
self.ypos+=y_offset
[docs]
def getRunInProgress(self):
return epics.caget(self.config['instrument_running_pv'])
[docs]
def status(self):
status = []
status.append(f'Status: {self.status_txt}')
status.append(f'EPICS status: {self.getRunStatus()}')
status.append(f'Next X: {self.xpos}')
status.append(f'Next Y: {self.ypos}')
status.append(f'Next filename: {self.filename}')
status.append(f'Next project: {self.project}')
return status
if __name__ == '__main__':
from AFL.automation.shared.launcher import *