Source code for AFL.automation.instrument.BioSANS

import time
import pathlib
import re
import warnings
import numpy as np  # for return types in get data
import xarray as xr
import h5py  # for Nexus file writing

from AFL.automation.shared.mock_eic_client import MockEICClient
# Optional imports for hardware dependencies; real usage is gated by config
try:
    from epics import caget, caput, cainfo
except ImportError:
    caget = None
    caput = None
    cainfo = None

from AFL.automation.APIServer.Driver import Driver

try:
    from eic_client.EICClient import EICClient
except ImportError:
    EICClient = None


[docs] class BioSANS(Driver): ''' Driver for Bio-SANS instrument ORNL. ''' # confirmed config parameteters defaults = {} defaults['eic_token'] = "1" defaults['ipts_number'] = '1234' defaults['beamline'] = 'CG3' defaults['run_cycle'] = 'RC511' defaults['use_subtracted_data'] = True defaults['config'] = 'Config0' defaults['mock_mode'] = False defaults['reduction_log_data_path'] = f'/HFIR/{{INST}}/IPTS-{{IPTS}}/shared/autoreduce/{{RUN_CYCLE}}/{{CONFIG}}' defaults['reduced_file_data_path'] = f'/HFIR/{{INST}}/IPTS-{{IPTS}}/shared/autoreduce/{{RUN_CYCLE}}/{{CONFIG}}/1D' defaults['PVs_to_store'] = [] defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SRC{i}Comp' for i in range(1,9)]) defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SRC{i}Name' for i in range(1,9)]) defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SRC{i}Conc' for i in range(1,9)]) defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SRC{i}ConcUnits' for i in range(1,9)]) defaults['PVs_to_store'].extend([f'CG3:SE:CMP:S{i}Vol' for i in range(1,9)]) defaults['PVs_to_store'].extend([f'CG3:SE:URMPI:143']) defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SMPLTotalVol']) defaults['PVs_to_store'].extend([f'CG3:SE:SMPLINF:SMPLFinalConc{i}' for i in range(1,9)])
[docs] def __init__(self, overrides=None): self.app = None Driver.__init__(self, name='BioSANS', defaults=self.gather_defaults(), overrides=overrides) self._caget, self._caput, self._cainfo = self._resolve_epics() self._client = None self.last_scan_id = None self.__instrument_name__ = 'ORNL Bio-SANS instrument' self.status_txt = 'Just started...' self.last_measured_transmission = [0, 0, 0, 0]
@property def client(self): """ Property that returns the EIC client instance. If the client doesn't exist yet, it instantiates a new EICClient using the token and beamline from the configuration. Returns ------- EICClient The client instance for communicating with the instrument. """ if self._client is None: if self.config['mock_mode']: self._client = MockEICClient( ipts_number=str(self.config['ipts_number']), eic_token=self.config['eic_token'], beamline=self.config['beamline'] ) else: if EICClient is None: raise ImportError("eic_client is not available and mock_mode is False") self._client = EICClient( ipts_number=str(self.config['ipts_number']), eic_token=self.config['eic_token'], beamline=self.config['beamline'] ) return self._client
[docs] def reset_client(self): ''' Reset the EICClient instance ''' self._client = None
def _resolve_epics(self): """Return EPICS accessors, honoring mock_mode and availability.""" if self.config['mock_mode']: def _mock_caget(*args, **kwargs): warnings.warn("EPICS in mock mode - caget", stacklevel=2) return 0 def _mock_caput(*args, **kwargs): warnings.warn("EPICS in mock mode - caput", stacklevel=2) return True def _mock_cainfo(*args, **kwargs): warnings.warn("EPICS in mock mode - cainfo", stacklevel=2) return {} return _mock_caget, _mock_caput, _mock_cainfo if caget is None or caput is None or cainfo is None: raise ImportError("epics is not available and mock_mode is False") return caget, caput, cainfo
[docs] def getLastRunNumber(self): return self._caget('CG3:CS:RunControl:LastRunNumber')
# timeout = 120 # success_get, pv_value_read, response_data_get = self.client.get_pv(pv_name, timeout) # if not success_get: # raise RuntimeError(f'Could not read pv {pv_name}. Response: {response_data_get}') # return pv_value_read
[docs] def lastMeasuredTransmission(self): return self.last_measured_transmission
def _resolve_data_path(self, config_key): data_path = self.config[config_key] return pathlib.Path( data_path.format( INST=self.config['beamline'], IPTS=self.config['ipts_number'], RUN_CYCLE=self.config['run_cycle'], CONFIG=self.config['config'] ) ) def _latest_run_number_from_paths(self, paths): run_numbers = [] pattern = re.compile(r'^(?:S_)?r(\d+)_(\d+)_(?:reduction_log\.hdf|1D_(?:main|combined)\.txt)$') for path in paths: if not path.exists() or not path.is_dir(): continue for candidate in path.glob('*'): match = pattern.match(candidate.name) if match is None: continue first = int(match.group(1)) second = int(match.group(2)) if first == second: run_numbers.append(first) if not run_numbers: return None return max(run_numbers) def _get_last_run_number_with_fallback(self): run_number = self.getLastRunNumber() try: run_number = int(run_number) except (TypeError, ValueError): run_number = 0 if run_number > 0: return run_number fallback_run_number = self._latest_run_number_from_paths([ self._resolve_data_path('reduction_log_data_path'), self._resolve_data_path('reduced_file_data_path'), ]) if fallback_run_number is not None: warnings.warn( f'Using fallback run number {fallback_run_number} because EPICS returned {run_number}', stacklevel=2, ) return fallback_run_number return run_number def _candidate_reduced_filenames(self, run_number): return [ f'r{run_number:d}_{run_number:d}_1D_main.txt', f'S_r{run_number:d}_{run_number:d}_1D_combined.txt', f'r{run_number:d}_{run_number:d}_1D_combined.txt', ] def _select_existing_file(self, base_path, filenames): for filename in filenames: filepath = pathlib.Path(base_path) / filename if filepath.exists(): return filepath return pathlib.Path(base_path) / filenames[0]
[docs] @Driver.unqueued() def getLastReductionLogFilePath(self, **kwargs): """ get the currently set file name """ path = self._resolve_data_path('reduction_log_data_path') run_number = self._get_last_run_number_with_fallback() filename = f'r{run_number:d}_{run_number:d}_reduction_log.hdf' filepath = pathlib.Path(path) / filename return str(filepath)
def _readLastTransmission(self): filepath = self.getLastReductionLogFilePath() with h5py.File(filepath, 'r') as h5: transmission = h5['reduction_information']['special_parameters']['sample_transmission']['main']['value'][()] return transmission
[docs] @Driver.unqueued() def getLastTransmission(self, **kwargs): return self.readFileSafely(self._readLastTransmission)
[docs] def setExposure(self, exposure): if self.app is not None: self.app.logger.debug(f'Setting exposure time to {exposure}') self.config['exposure'] = exposure
[docs] @Driver.unqueued() def getLastFilePath(self, **kwargs): """ get the currently set file name """ path = self._resolve_data_path('reduced_file_data_path') run_number = self._get_last_run_number_with_fallback() filepath = self._select_existing_file(path, self._candidate_reduced_filenames(run_number)) return str(filepath)
def _readLastReducedFile(self): filepath = self.getLastFilePath() q, I, dI, dQ = np.loadtxt(filepath, skiprows=2).T return {'q': q, 'I': I, 'dI': dI, 'dQ': dQ}
[docs] @Driver.unqueued(render_hint='2d_img', log_image=True) def readFileSafely(self, file_read_function, attempts_limit=300, attempts_pause_time=1.0, **kwargs): try: data = file_read_function() except (FileNotFoundError, OSError, KeyError): nattempts = 1 while nattempts < attempts_limit: nattempts = nattempts + 1 time.sleep(attempts_pause_time) try: data = file_read_function() except (FileNotFoundError, OSError, KeyError) as e: if nattempts == attempts_limit: raise FileNotFoundError(f'Could not locate file after {nattempts} tries: {e}') else: warnings.warn(f'Failed to load file, trying again, this is try {nattempts}: {e}',stacklevel=2) else: break return data
def _validateExposureType(self, exposure_type): if exposure_type not in ['time']: raise ValueError(f'Exposure type must be one of "time", not {exposure_type}')
[docs] def blockForTableScan(self): status_sucess, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id) loop_count = 0 while not is_done: time.sleep(0.1) status_sucess, is_done, state, status_response_data = self.client.get_scan_status(self.last_scan_id) loop_count += 1
def _simple_expose(self, exposure, name=None, block=False, exposure_type='time', tmax=1800): """ Perform a simple exposure with the specified parameters. This method sets up and performs an exposure of the sample, optionally blocking until the exposure is complete. Parameters ---------- exposure : float The exposure time or counts. name : str, optional The name of the sample (default is None). block : bool, optional If True, block until the exposure is complete (default is False). exposure_type : str, optional The type of exposure, must be one of 'time', 'detector', or 'monitor' (default is 'detector'). tmax : int, optional The maximum time to wait for the exposure in seconds (default is 1800). This is only applicable for the exposure_type 'detector'. Raises ------ ValueError If the exposure type is not one of 'time', 'detector', or 'monitor'. """ self._validateExposureType(exposure_type) self.setExposure(exposure) self.status_txt = f'Starting {exposure} count table scan named {name}' if self.app is not None: self.app.logger.debug(self.status_txt) params = { 'run_mode': 0, #???? 'desc': 'AFL submitted table scan' } headers = ['Title','Wait For','Value'] row = [name,'seconds',exposure] # headers.append('CG3:CS:SANSQUser:SampleToMain') # row.append(15.5) if exposure_type == 'time': success, self.last_scan_id,response_data = self.client.submit_table_scan( parms={ 'run_mode': 0, #???? 'headers': headers, 'rows': [row], }, desc=f'AFL submitted table scan named {name}', simulate_only=False, ) elif exposure_type == 'monitor': raise NotImplementedError('Monitor exposure is not implemented for BioSANS') elif exposure_type == 'detector': raise NotImplementedError('Monitor exposure is not implemented for BioSANS') else: raise NotImplementedError(f"exposure_type not recognized: {exposure_type}") if not success: raise RuntimeError(f'Error in EIC table scan: {response_data}') if success and block: self.blockForTableScan()
[docs] @Driver.quickbar(qb={'button_text': 'Expose', 'params': { 'name': {'label': 'Name', 'type': 'text', 'default': 'test_exposure'}, 'exposure': {'label': 'Exposure (s)', 'type': 'float', 'default': 5}, 'reduce_data': {'label': 'Reduce?', 'type': 'bool', 'default': True}, 'measure_transmission': {'label': 'Measure Trans?', 'type': 'bool', 'default': True} }}) def expose(self, name=None, exposure=None, block=True, save_reduced_data=True, save_nexus=True, exposure_type='time'): """ Perform an exposure with the specified parameters. This method performs an exposure of the sample, optionally measuring the transmission, reducing the data, and saving it in Nexus format. Parameters ---------- name : str, optional The name of the sample (default is None). exposure : float, optional The exposure time or counts (default is None). block : bool, optional If True, block until the exposure is complete (default is True). save_nexus : bool, optional If True, save the data in Nexus format (default is True). exposure_type : str, optional The type of exposure, must be one of 'time', 'detector', or 'monitor' (default is 'detector'). Raises ------ ValueError If the exposure type is not one of 'time', 'detector', or 'monitor'. FileNotFoundError If the data file cannot be located after multiple attempts. """ self._validateExposureType(exposure_type) self._simple_expose(exposure=exposure, name=name, exposure_type=exposure_type, block=block) time.sleep(15) return self.process_data()
[docs] def process_data(self, **kwargs): """Process the SANS data after exposure, writing the data to Tiled""" # Read data and create xarray Dataset data = self.readFileSafely(self._readLastReducedFile) transmission = self.readFileSafely(self._readLastTransmission) ds = xr.Dataset() ds['q'] = ('q', data['q']) ds['I'] = ('q', data['I']) ds['dI'] = ('q', data['dI']) ds['dQ'] = ('q', data['dQ']) ds.attrs['sample_transmission'] = transmission for pv in self.config['PVs_to_store']: try: value = caget(pv) ds.attrs[pv] = value except Exception as e: warnings.warn(f'Could not read PV {pv} to save in dataset attributes: {e}', stacklevel=2) ds.attrs[pv] = None self.status_txt = 'Instrument Idle' return ds
[docs] def status(self): status = [] status.append( f'Last Measured Transmission: scaled={self.last_measured_transmission[0]} using empty cell trans of {self.last_measured_transmission[3]} with {self.last_measured_transmission[1]} raw counts in open {self.last_measured_transmission[2]} sample') status.append(f'Status: {self.status_txt}') return status
_DEFAULT_PORT=5001 if __name__ == '__main__': from AFL.automation.shared.launcher import *