Source code for AFL.automation.instrument.ScatteringInstrument

from AFL.automation.APIServer.Driver import Driver
import lazy_loader as lazy
pyFAI = lazy.load("pyFAI", require="AFL-automation[scattering-processing]")
fabio = lazy.load("fabio", require="AFL-automation[scattering-processing]")

import numpy as np
import datetime
import h5py,six

[docs] class ScatteringInstrument(): defaults = {} defaults['poni1'] = 0.0251146 defaults['poni2'] = 0.150719 defaults['rot1'] = 0 defaults['rot2'] = 0 defaults['rot3'] = 0 defaults['wavelength'] = 1.3421e-10 defaults['dist'] = 3.4925 defaults['npts'] = 500 defaults['detector_name'] = 'pilatus300kw'#set to empty for custom detector defaults['mask_path'] = '' #only used if detector_name='' (empty string) defaults['pixel1'] = 0.075 #pixel y size in m defaults['pixel2'] = 0.075 #pixel x size in m defaults['num_pixel1'] = 128 defaults['num_pixel2'] = 128
[docs] def __init__(self): self.generateIntegrator()
[docs] def cell_in_beam(self,cellid): raise NotImplementedError
[docs] def expose(self,exposuretime,nexp): raise NotImplementedError
[docs] def generateIntegrator(self): if self.config['detector_name']:#if there isn't an empty string self.detector = pyFAI.detector_factory(name=self.config['detector_name']) else: self.detector = pyFAI.detectors.Detector( pixel1=self.config['pixel1'], pixel2=self.config['pixel2'], max_shape=(self.config['num_pixel1'],self.config['num_pixel2']) ) if(self.config['mask_path'] is None): self.mask=None else: self.mask = fabio.open(self.config['mask_path']).data self.integrator = pyFAI.azimuthalIntegrator.AzimuthalIntegrator( detector=self.detector, wavelength = self.config['wavelength'], dist = self.config['dist'], poni1 = self.config['poni1'], poni2 = self.config['poni2'], rot1 = self.config['rot1'], rot2 = self.config['rot2'], rot3 = self.config['rot3'] )
[docs] def setReductionParams(self,reduction_params): self.config.update(reduction_params) self.generateIntegrator()
[docs] def setMaskPath(self,mask_path): self.config['mask_path'] = mask_path self.generateIntegrator()
[docs] def setDetectorName(self,detector_name): self.config['detector_name']=detector_name
[docs] @Driver.unqueued() def getReductionParams(self): params = ['poni1', 'poni2', 'rot1', 'rot2', 'rot3', 'wavelength', 'dist', 'npts'] return {k:self.config[k] for k in params}
[docs] @Driver.unqueued() def getMaskPath(self): return self.config['mask_path']
[docs] @Driver.unqueued() def getDetectorName(self): return self.config['detector_name']
[docs] @Driver.unqueued(render_hint='1d_plot',xlin=False,ylin=False,xlabel='q (A^-1)',ylabel='Intensity (AU)') def getReducedData(self,reduce_type='1d',write_data=False,filename=None,filename_kwargs={},**kwargs): start_time = datetime.datetime.now() img = self.getData(**kwargs) got_image = datetime.datetime.now() if self.mask is None: mask = np.zeros(np.shape(img)) else: mask = self.mask if write_data: if filename is None: filename = self.getLastFilePath(**filename_kwargs).parts[-1] filename1d = filename+'_r1d.csv' filename2d = filename+'_r2d.edf' else: filename1d = None filename2d = None normalization_factor=1 # normalized_sample_transmission = self.last_measured_transmission[0] # open_flux = self.last_measured_transmission[1] # sample_flux = self.last_measured_transmission[2] # empty_cell_transmission = self.last_measured_transmission[3] # sample_transmission = normalized_sample_transmission*empty_cell_transmission # measurement_time = self.getElapsedTime() # sample_thickness = self.config['sample_thickness'] # calibration_factor = self.config['absolute_calibration_factor'] # normalization_factor = (open_flux*sample_transmission*measurement_time*sample_thickness)/calibration_factor #pyFAI divides this value # print('getReducedData normalization calculation:') # print(f'sample_transmission={sample_transmission}') # print(f'open_flux (I0)={open_flux}') # print(f'measurement_time={measurement_time}') # print(f'sample_thickness={sample_thickness}') # print(f'calibration_factor={calibration_factor}') # print(f'normalization_factor={normalization_factor}') if reduce_type == '1d' or write_data: res = self.integrator.integrate1d(img, self.config['npts'], unit='q_A^-1', mask=mask, error_model='azimuthal', normalization_factor=normalization_factor, filename=filename1d) if reduce_type == '1d': retval = np.array(res) if reduce_type == '2d' or write_data: res = self.integrator.integrate2d(img, self.config['npts'], unit='q_A^-1', mask=mask, error_model='azimuthal', normalization_factor=normalization_factor, filename=filename2d) if reduce_type == '2d': retval = np.array(res.intensity) if reduce_type != '1d' and reduce_type != '2d': raise ValueError('unsupported return_type') reduced_image = datetime.datetime.now() try: self.app.logger.info(f'Reduced an image, image fetch took {got_image - start_time}, reduction took {reduced_image - got_image}') except AttributeError: pass return retval
def _writeNexus(self,data,filename,sample_name,transmission): timestamp = 'T'.join(str(datetime.datetime.now())) with h5py.File(filename+'.h5','w') as f: f.attrs[u'default'] = u'entry' f.attrs[u'file_name'] = filename f.attrs[u'file_time'] = timestamp f.attrs[u'instrument'] = self.__instrument_name__ f.attrs[u'creator'] = u'AFL ScatteringInstrument driver' f.attrs[u'NeXus_version'] = u'4.3.0' f.attrs[u'HDF5_version'] = six.u(h5py.version.hdf5_version) f.attrs[u'h5py_version'] = six.u(h5py.version.version) nxentry = f.create_group(u'entry') nxentry.attrs[u'NX_class'] = u'NXentry' nxentry.attrs[u'canSAS_class'] = u'SASentry' nxentry.attrs[u'default'] = u'data' nxentry.create_dataset(u'title',data=filename) nxinstr = nxentry.create_group(u'instrument') nxinstr.attrs[u'NX_class'] = u'NXinstrument' nxinstr.attrs[u'canSAS_class'] = u'SASinstrument' try: nxinstr.create_dataset(u'temp_pyfai_calib',data=self.getReductionParams()) except: pass nxsrc = nxentry.create_group(u'source') nxsrc.attrs[u'NX_class'] = u'NXsource' nxsrc.attrs[u'canSAS_class'] = u'SASsource' wl = nxsrc.create_dataset(u'wavelength',data=self.config['wavelength']) #@TODO: are these units right? wl.attrs[u'unit'] = u'm' nxsamp = nxentry.create_group(u'sample') nxsamp.attrs[u'NX_class'] = u'sample' nxsamp.attrs[u'canSAS_class'] = u'sample' nxsamp.create_dataset(u'name',data=sample_name) nxtrans = nxsamp.create_dataset(u'transmission',data=transmission[0]) nxtrans.attrs[u'open_cts'] = transmission[1] nxtrans.attrs[u'sample_cts'] = transmission[2] try: nxtrans.attrs[u'empty_trans'] = transmission[3] except IndexError: pass nxdata = nxentry.create_group('sasdata') nxdata.attrs[u'NX_class'] = u'NXdata' nxdata.attrs[u'canSAS_class'] = u'SASdata' nxdata.attrs[u'signal'] = u'I' nxdata.attrs[u'I_axes'] = u'pix_x,pix_y' ds = nxdata.create_dataset(u'I',data=data) ds.attrs[u'units'] = u'arbitrary' ds.attrs[u'long_name'] = u'Intensity (arbitrary units)' ds.attrs[u'signal'] = 1 def _appendReducedToNexus(self,data,filename,sample_name): with h5py.File(filename+'.h5','a') as f: nxentry = f['entry'] nxdata = nxentry.create_group('sasdata_reduced') nxdata.attrs[u'NX_class'] = u'NXdata' nxdata.attrs[u'canSAS_class'] = u'SASdata' nxdata.attrs[u'signal'] = u'I' nxdata.attrs[u'I_axes'] = u'Q' Ids = nxdata.create_dataset(u'I',data=data[1]) Ids.attrs[u'units'] = u'arbitrary' Ids.attrs[u'long_name'] = u'Intensity (arbitrary units)' Ids.attrs[u'signal'] = 1 Qds = nxdata.create_dataset(u'Q',data=data[0]) Qds.attrs[u'units'] = u'inverse angstrom' Qds.attrs[u'long_name'] = u'q'