Source code for AFL.automation.instrument.APSUSAXS_Phantom

import time
import numpy as np
from AFL.automation.instrument.APSUSAXS import APSUSAXS
from AFL.automation.APIServer.Driver import Driver

[docs] class APSUSAXS_Phantom(APSUSAXS):
[docs] def __init__(self, overrides=None): # We bypass APSUSAXS.__init__ to avoid any hardware initialization it might do # (though currently it seems mostly safe, it loads Matilda which we might not want) # Driver.__init__ will call gather_defaults which walks MRO. # So we get APSUSAXS.defaults automatically. self.app = None Driver.__init__(self, name='APSUSAXS_Phantom', defaults=self.gather_defaults(), overrides=overrides) self.__instrument_name__ = 'APS USAXS Phantom Instrument' self.status_txt = 'Phantom Initialized' self.filename = 'default' self.filename_prefix = 'AFL' self.project = 'AFL' self.xpos = 0 self.ypos = 0 self._run_in_progress = False
def _writeUSAXSScript(self): pass def _safe_read_file(self, filepath, filename, is_usaxs=True, is_blank=False): pass
[docs] @Driver.quickbar(qb={'button_text':'Expose Phantom', 'params':{ 'name':{'label':'Name','type':'text','default':'test_exposure'}, 'exposure':{'label':'Exposure (s)','type':'float','default':5}, 'reduce_data':{'label':'Reduce?','type':'bool','default':True}, 'measure_transmission':{'label':'Measure Trans?','type':'bool','default':True} }}) def expose(self, name=None, block=True, read_USAXS=True, read_SAXS=True, **kwargs): if name is None: name = self.getFilenamePrefix() else: self.setFilenamePrefix(name) self.status_txt = f'Starting Phantom USAXS/SAXS/WAXS scan named {name}' if self.app is not None: self.app.logger.debug(f'Starting Phantom exposure {name}') self._run_in_progress = True self.status_txt = 'Phantom Running...' # Simulate exposure time # Use exposure from kwargs if available, default to 1s exposure = float(kwargs.get('exposure', 1.0)) time.sleep(exposure) self._run_in_progress = False self.status_txt = 'Instrument Idle' if "blank" in name.lower(): is_blank = True else: is_blank = False # Generate fake data # USAXS q range approx 1e-4 to 1e-1 # SAXS q range approx 1e-2 to 1.0 if read_USAXS: q_usaxs = np.logspace(np.log10(1e-4), np.log10(1e-1), 200) I_usaxs, dI_usaxs = self._generate_fake_signal(q_usaxs, is_blank) self.data.add_array('USAXS_q', q_usaxs) self.data.add_array('USAXS_I', I_usaxs) self.data.add_array('USAXS_dI', dI_usaxs) self.data['USAXS_Filepath'] = '/tmp/phantom/usaxs' self.data['USAXS_Filename'] = f'{name}.h5' self.data['USAXS_name'] = name self.data['USAXS_blank'] = is_blank if read_SAXS: q_saxs = np.logspace(np.log10(1e-2), np.log10(1.0), 200) I_saxs, dI_saxs = self._generate_fake_signal(q_saxs, is_blank) self.data.add_array('SAXS_q', q_saxs) self.data.add_array('SAXS_I', I_saxs) self.data.add_array('SAXS_dI', dI_saxs) self.data['SAXS_Filepath'] = '/tmp/phantom/saxs' self.data['SAXS_Filename'] = f'{name}.h5' self.data['SAXS_name'] = name self.data['SAXS_blank'] = is_blank
def _generate_fake_signal(self, q, is_blank): if is_blank: # White noise around low value I = np.random.normal(1e-4, 1e-5, size=q.shape) I = np.abs(I) else: # Simple scattering model: Guinier + Power law + Background # Parameters Rg = 50.0 # Radius of gyration I0 = 100.0 # Forward scattering P = 4.0 # Power law slope (Porod) B = 1e-3 # Background # Unified fit-ish # Guinier term I_guinier = I0 * np.exp(-(q * Rg)**2 / 3.0) # Power law term (with cutoff to avoid singularity) # Using simple crossover q_crossover = 1.0/Rg I_porod = (I0 * np.exp(-1/3)) * ((q/q_crossover)**(-P)) # Smooth combine (very approximate) I_signal = np.where(q < q_crossover, I_guinier, I_porod) I = I_signal + B + np.random.normal(0, 1e-4, size=q.shape) I = np.abs(I) dI = 0.05 * I # 5% error return I, dI
[docs] def block_for_run_finish(self): while self.getRunInProgress(): time.sleep(0.1)
[docs] def getRunStatus(self): return "Phantom Running" if self._run_in_progress else "Phantom Idle"
[docs] def getRunInProgress(self): return self._run_in_progress
[docs] def setPosition(self, plate, row, col, x_offset=0, y_offset=0): # Calculate coordinates using parent method logic but update local state (self.xpos, self.ypos) = self._coords_from_tuple(plate, row, col) self.xpos += x_offset self.ypos += y_offset if self.app is not None: self.app.logger.debug(f'Phantom: Position set to {self.xpos}, {self.ypos}')
[docs] def status(self): status = [] status.append(f'Status: {self.status_txt}') status.append(f'Phantom Status: {self.getRunStatus()}') status.append(f'Next X: {self.xpos}') status.append(f'Next Y: {self.ypos}') status.append(f'Next filename: {self.filename}') status.append(f'Next project: {self.project}') return status
if __name__ == '__main__': from AFL.automation.shared.launcher import * # This allows the file to be run directly to start a server for this driver