Source code for AFL.automation.instrument.SeabreezeUVVis

from AFL.automation.APIServer.Driver import Driver
#from AFL.automation.instrument.Instrument import Instrument
import numpy as np # for return types in get data
import lazy_loader as lazy
seabreeze = lazy.load("seabreeze", require="AFL-automation[seabreeze]")
import time
import datetime
import h5py
from pathlib import Path
import uuid
import pathlib
import copy
import warnings

from typing import Optional

try:
    from tiled.queries import Eq
except ImportError:
    warnings.warn("Cannot import from tiled...reduction will not work",stacklevel=2)


[docs] class SeabreezeUVVis(Driver): defaults = {} defaults['correctDarkCounts'] = False defaults['correctNonlinearity'] = False defaults['exposure'] = 0.010 defaults['exposure_delay'] = 0 defaults['saveSingleScan'] = False defaults['filename'] = 'test.h5' defaults['filepath'] = '.' defaults['reference_uuid'] = '' #sample_uuid in tiled defaults['air_uuid'] = '' #sample_uuid in tiled
[docs] def __init__(self,backend='cseabreeze', device_serial=None,overrides=None): self.app = None self.name = 'SeabreezeUVVis' Driver.__init__(self,name='SeabreezeUVVis',defaults = self.gather_defaults(),overrides=overrides) print(f'configuring Seabreeze using backend {backend}') seabreeze.use(backend) from seabreeze.spectrometers import Spectrometer,list_devices print(f'attempting to list spectrometers...') print(f'seabreeze sees devices: {list_devices()}') if device_serial is None: print(f'Connecting to first available...') self.spectrometer = Spectrometer.from_first_available() else: print(f'Connecting to fixed serial, {device_serial}') self.spectrometer = Spectrometer.from_serial_number(device_serial) print(f'Connected successfully, to a {self.spectrometer}') self.wl = self.spectrometer.wavelengths() self.setExposure(self.config['exposure'])
[docs] @Driver.unqueued() def getExposure(self): return self.config['exposure']
[docs] @Driver.unqueued() def getExposureDelay(self): return self.config['exposure_delay']
[docs] @Driver.unqueued() def getFilename(self): return self.config['filename']
[docs] @Driver.unqueued() def getSaveSingleScan(self): return self.config['saveSingleScan']
[docs] @Driver.unqueued() def getFilepath(self): return self.config['filepath']
[docs] def setFilepath(self,filepath): self.config['filepath'] = Path(filepath)
[docs] def setFilename(self,filename): self.config['filename'] = filename
[docs] def setSaveSingleScan(self,saveSingleScan): self.config['saveSingleScan'] = saveSingleScan
[docs] def setExposureDelay(self,time): self.config['exposure_delay'] = time
[docs] def setExposure(self,time): self.config['exposure'] = time self.spectrometer.integration_time_micros(1e6*time)
[docs] def collectContinuous(self,duration,start=None,return_data=False,**kwargs): warnings.warn('collectContinuous should be replaced with collect', DeprecationWarning, stacklevel=2) data = [] duration = datetime.timedelta(seconds=duration) if start is None: start = datetime.datetime.now() #print(start) #print(datetime.timedelta(0,duration)) while datetime.datetime.now() < start: pass while datetime.datetime.now() < (start + duration): data.append(self.spectrometer.intensities( correct_dark_counts=self.config['correctDarkCounts'], correct_nonlinearity=self.config['correctNonlinearity'])) time.sleep(self.config['exposure_delay']) if self.data is not None: self.data['mode'] = 'continuous' self.data['wavelength'] = self.wl.tolist() self.data.add_array('wavelength',self.wl.tolist()) self.data.add_array('spectra',data[0]) self._writedata(data) if not return_data: data = f'data written to file: {self.config["filename"]}' return data else: return data
[docs] @Driver.unqueued() def collectSingleSpectrum(self, set_reference=False, set_air=False, **kwargs): warnings.warn('collectSingleSpectrum should be replaced with collect', DeprecationWarning, stacklevel=2) wl = self.wl[1:] raw_data = self.spectrometer.intensities( correct_dark_counts=self.config['correctDarkCounts'], correct_nonlinearity=self.config['correctNonlinearity'] ) raw_data = raw_data[1:] if self.config['saveSingleScan']: self._writedata(raw_data) if set_reference: self.config['reference_uuid'] = copy.deepcopy(self.data['sample_uuid']) if set_air: self.config['air_uuid'] = copy.deepcopy(self.data['sample_uuid']) if self.data is not None: self.data['mode'] = 'single' self.data['wavelength'] = wl self.data['reduced'] = False self.data.add_array('wavelength',wl) self.data.add_array('spectrum_raw',raw_data)
def _writedata(self,data): filepath = pathlib.Path(self.config['filepath']) filename = pathlib.Path(self.config['filename']) data = np.array([self.wl,data]) with h5py.File(filepath/filename, 'w') as f: dset = f.create_dataset(str(uuid.uuid1()), data=data)
[docs] def collect( self, nframes: int, reduced: bool= False, absorbance: bool=True, set_reference: bool =False, set_air: bool=False, exposure: Optional[float] = None, return_data: bool=False, **kwargs ): if exposure is not None: self.setExposure(exposure) wl = self.wl[1:] # remove internal dark reference data_raw = [] for frame in range(nframes): I = self.spectrometer.intensities( correct_dark_counts=self.config['correctDarkCounts'], correct_nonlinearity=self.config['correctNonlinearity'] ) I = I[1:] # remove internal dark reference data_raw.append(I) time.sleep(self.config['exposure_delay']) data_raw_mean = np.mean(data_raw,axis=0) data_raw_std = np.std(data_raw,axis=0) if reduced: data_mean, data_std = self.reduced(data_raw_mean, data_raw_std, absorbance=absorbance) #reduce against air as a 'miss-check' if self.config['air_uuid']: #make sure its set data_mean_air, data_std_air = self.reduced(data_raw_mean, data_raw_std, absorbance=True, reference_uuid='air_uuid') mean_air = np.mean(data_mean_air) std_air = np.mean(data_std_air) else: mean_air = None std_air = None if self.config['saveSingleScan']: self._writedata(data_raw) if set_reference: self.config['reference_uuid'] = copy.deepcopy(self.data['sample_uuid']) if set_air: self.config['air_uuid'] = copy.deepcopy(self.data['sample_uuid']) if self.data is not None: self.data['mode'] = 'collect' self.data['wavelength'] = wl self.data['reference_uuid'] = self.config['reference_uuid'] self.data['air_uuid'] = self.config['air_uuid'] self.data['reduced'] = reduced self.data['absorbance'] = absorbance self.data['mean_air'] = mean_air self.data['std_air'] = std_air self.data.add_array('wavelength',wl) self.data.add_array('all_spectra',data_raw) self.data.add_array('spectrum_raw',data_raw_mean) self.data.add_array('spectrum_raw_std',data_raw_std) if reduced: self.data.add_array('spectrum',data_mean) self.data.add_array('spectrum_std',data_std) if reduced: data_out = [wl.tolist(),data_mean.tolist()] else: data_out = [wl.tolist(),data_raw_mean.tolist()] if return_data: return data_out
[docs] def reduced(self,data_raw_mean,data_raw_std,absorbance=True, reference_uuid='reference_uuid'): if self.data is None: raise ValueError("Cannot reduce without DataTiled...please set tiled parameters in server_script") tiled_result = self.data.tiled_client.search(Eq('sample_uuid',self.config[reference_uuid])) if len(tiled_result)==0: raise ValueError(f"Can't reduce! Could not find tiled entry for measurement {reference_uuid}={self.config[reference_uuid]}") ref_spectrum = tiled_result.search(Eq('array_name','spectrum_raw')).items()[-1][-1][()] #grabs the last entry that matches ref_spectrum_std = tiled_result.search(Eq('array_name','spectrum_raw_std')).items()[-1][-1][()] data_mean = data_raw_mean/ref_spectrum data_std = data_mean*(data_raw_std/np.abs(data_raw_mean) + ref_spectrum_std/np.abs(ref_spectrum_std)) if absorbance: data_mean = 1.0 - data_mean return data_mean,data_std
_DEFAULT_CUSTOM_CONFIG = { '_classname': 'AFL.automation.instrument.SeabreezeUVVis.SeabreezeUVVis', 'backend': 'pyseabreeze' } _DEFAULT_PORT=5051 if __name__ == '__main__': from AFL.automation.shared.launcher import *