Source code for AFL.automation.instrument.I22SAXS

import time
import datetime
from AFL.automation.APIServer.Driver import Driver

import h5py #for Nexus file writing
import os
import pathlib
import warnings
import json
import datetime
import xarray as xr
import numpy as np

import lazy_loader as lazy
# SSH client for remote instrument connections
paramiko = lazy.load("paramiko", require="AFL-automation[remote-access]") #general ssh connection client

[docs] class I22SAXS(Driver): defaults = {} defaults['username'] = '' defaults['address'] = '' defaults['port'] = 2222 defaults['ssh_key_path'] = '' defaults['processed_base_path'] = '/mnt/i22_processed/i22-' defaults['reduced_data_suffix'] = '_saxs_Transmission_Averaged_Subtracted_IvsQ_processed.nxs' defaults['nframes'] = 1 defaults['acq_time'] = 1 defaults['acq_timeout'] = 120 defaults['file_read_timeout'] = 30 defaults['pos_list'] = [] defaults['empty_scan_id'] = '' defaults['data_read_cooldown'] = 5
[docs] def __init__(self,overrides=None,**kwargs): self.app = None Driver.__init__(self,name='I22SAXS',defaults=self.gather_defaults(),overrides=overrides) self.I22_client = paramiko.SSHClient() self.I22_client.load_system_host_keys() try: self.I22_client.connect(self.config['address'],username=self.config['username'],port=self.config['port'],key_filename=self.config['ssh_key_path']) except: raise ValueError("cannot connect to host. check the username and address") self.raw_written=False self.integrated_written=False self.filename=None
def _expose(self, filename, xpos, ypos, empty = False, nframes=None, acq_time=None): """ Perform an AFL acquision sequence on I22 """ if nframes is not None: self.config['nframes'] = nframes if acq_time is not None: self.config['acq_time'] = acq_time if not empty: self.run_command = f"AFL_Acquisition(scan_title='{filename}',background_frame = '{self.config['empty_scan_id']}' ,x_position={xpos},y_position={ypos},num_frames={self.config['nframes']},frame_time={self.config['acq_time']})" else: self.run_command = f"AFL_Background(scan_title='{filename}',x_position={xpos},y_position={ypos},num_frames={self.config['nframes']},frame_time={self.config['acq_time']})" self.raw_written = False self.app.logger.info(f'initiated run with command {self.run_command}') ssh_stdin, ssh_stdout, ssh_stderr = self.I22_client.exec_command(self.run_command) start_time = datetime.datetime.now() timeout = start_time + datetime.timedelta(seconds=self.config['acq_timeout']) ssh_stdout.channel.recv_exit_status() counter_headers = [] counter_values = [] counters = {} for output in ssh_stdout.readlines(): if len(output) > 0: self.app.logger.info(f' {output}') output.replace('\n','') if len(output.split('\t'))>1: if len(counter_headers) > 0: # these are values counter_values = output.split('\t') assert(len(counter_values) == len(counter_headers),'Mismatch or error parsing counters') for i,name in enumerate(counter_headers): counters[name] = float(counter_values[i]) else: counter_headers = output.split('\t') elif 'successfully' in output: scanid = int(output.split(". Scan ")[1].split(" ended successfully")[0]) time.sleep(0.1) return (scanid,counters)
[docs] def expose(self, name, empty = False, nframes=None, acq_time=None, set_empty = False): ''' Perform a sequence of exposures at positions defined in self.config['pos_list']. Return the integrated data of the measurement with the lowest measured sample transmission. ''' scan_id_list = [] counters_list = [] transmission_list = [] for (xpos,ypos) in self.config['pos_list']: scanid,counters = self._expose(filename=name, xpos=xpos, ypos=ypos, empty = empty, nframes = nframes, acq_time = acq_time) scan_id_list.append(scanid) counters_list.append(counters) transmission_list.append(counters['transmission']) if not empty: selected_run = np.argmin(transmission_list) else: selected_run = np.argmax(transmission_list) self.app.logger.info(f'Selected scan #{selected_run} | {scan_id_list[selected_run]} with transmission of {transmission_list[selected_run]}. Other options were {transmission_list}.') self.data['scan_ids'] = scan_id_list self.data['counters'] = counters_list self.data['transmissions'] = transmission_list if empty and set_empty: self.config['empty_scan_id'] = scan_id_list[selected_run] self.data['scan_id'] = scan_id_list[selected_run] self.data['counter'] = counters_list[selected_run] self.data['transmission'] = transmission_list[selected_run] self.read_integrated(scan_id_list[selected_run]) return transmission_list[selected_run]
[docs] def read_integrated(self,scanid): """ Scans the appropriate directory for the filename of the data collected with filename """ fname = f"{self.config['processed_base_path']}{scanid}{self.config['reduced_data_suffix']}" #check if file exists start_time = datetime.datetime.now() timeout = start_time + datetime.timedelta(seconds = self.config['file_read_timeout']) while not os.path.isfile(fname) and datetime.datetime.now() < timeout: time.sleep(0.1) while datetime.datetime.now() < timeout: try: with h5py.File(fname) as f: I = f['processed']['result']['data'][:] break except Exception: pass with h5py.File(fname) as f: I = f['processed']['result']['data'][:] q = f['processed']['result']['q'][:] err = f['processed']['result']['errors'][:] # construct an xarray with this and return it? data = xr.Dataset(data_vars={'I':(['position','frameid','q'],I),'dI':(['position','frameid','q'],err)},coords={'q':q}) self.data['q'] = data['q'].values self.data.add_array('I',data['I'].values.squeeze()) self.data.add_array('dI',data['dI'].values.squeeze()) return scanid
_DEFAULT__PORT = 5001 if __name__ == '__main__': from AFL.automation.shared.launcher import *