Source code for AFL.automation.instrument.VirtualSpec_data

import time
import datetime
from AFL.automation.APIServer.Driver import Driver
import numpy as np # for return types in get data
import h5py #for Nexus file writing
import os
import pathlib
import PIL
import uuid

from AFL.automation.instrument.GPInterpolator import Interpolator, ClusteredGPs
import lazy_loader as lazy
# Lazy load ML dependencies
gpflow = lazy.load("gpflow", require="AFL-automation[ml]")
tf = lazy.load("tensorflow", require="AFL-automation[ml]")
# class DummySAS(ScatteringInstrument,Driver):
[docs] class VirtualSpec_data(Driver): defaults = {} defaults['save_path'] = '/home/afl642/2305_SINQ_SANS_path'
[docs] def __init__(self,overrides=None, clustered=False): ''' Generates smoothly interpolated scattering data via a noiseless GPR from an experiments netcdf file ''' self.app = None Driver.__init__(self,name='VirtualSpec_data',defaults=self.gather_defaults(),overrides=overrides) # ScatteringInstrument.__init__(self) if clustered: self.clustered=True self.sg = None self.kernel = None self.optimizer = None self.dataset = None self.params_dict = {} self.len_GPs = 0
[docs] def set_params_dict(self,params_dict): self.sg.set_defaults(params_dict) self.params_dict = params_dict
[docs] def get_params_dict(self): self.params_dict = self.sg.get_defaults() print(self.params_dict)
# self.params_dict = self.sg.defaults
[docs] def generate_model(self,alpha=0.1): if self.clustered: try: self.sg.load_datasets() self.sg.define_domains(alpha=alpha) new_gplist,union,common_idx = self.sg.unionize() self.sg.load_datasets(gplist=new_gplist) except: self.sg.load_datasets() else: self.sg.load_data()
[docs] def load_model_dataset(self): # this class uses the information in dataset, specifically 'SAS_savgol_xlo' and 'SAS_savgol_xhi' to determine the q range # it also points to the 'components' attribute of the dataset to get the composition range and dimensions # the dataset is stored in the scattering generator object if self.dataset is None: raise ValueError("must set variable dataset in driver before load_model_dataset") # instantiate the interpolators if self.clustered: self.sg = ClusteredGPs(dataset=self.dataset) else: self.sg = Interpolator(dataset=self.dataset) self.kernel = gpflow.kernels.Matern52(lengthscales=0.1,variance=1.) self.optimizer = tf.optimizers.Adam(learning_rate=0.005)
[docs] def measure(self,name=None,exposure=None,nexp=1,block=True,write_data=False,return_data=True,save_nexus=True): ## sample_data is a protected key in the self.data dictionary from Driver.py ## composition, which is required to reproduce scattering data, has to be a parameter in the composition dictionary if 'sample_composition' not in self.data: raise ValueError("'sample_composition' is not in self.data") ## subject to change when data structure is finalized. X must have the shape (M, D) where M is the number of evaluation points and D is the number of dimensions ## extra axes are squeezed out here ## look at isinstance if isinstance(self.data['sample_composition'],dict): X = np.array([self.data['sample_composition'][component]['values'] for component in list(self.data['sample_composition'])]) components = list(self.data['sample_composition']) if len(X.shape) < 2: X = np.expand_dims(X,axis=1) print('correcting array dims') print('New Data point requested') print(X, X.shape) elif isinstance(self.data['sample_composition'],list): X = np.array(self.data['sample_composition']) else: print('something went wrong on import') X = np.array([[1.5,7]]).T ### predict from the model and add to the self.data dictionary print("X input dimeions should be D points representing the dimensionality of the space (2-many) by N columns (typically 1 point being predicted)") print("X input is the following ",X, X.shape,type(X)) ### scattering output is MxD where M is the number of points to evaluate the model over and D is the number of dimensions if self.clustered: if isinstance(self.sg.concat_GPs, type(None)): gplist = self.sg.independentGPs else: gplist = self.sg.concat_GPs mean, var, idx = self.sg.predict(X_new=X, gplist=gplist) else: mean, var = self.sg.predict(X_new=X) self.data['model_mu'], self.data['model_var'] = mean.squeeze(), var.squeeze() data_pointers = self.sg.get_defaults() print(data_pointers['Y_data_coord']) if self.clustered: self.data[data_pointers['Y_data_coord']] = self.sg.independentGPs[0].Y_coord.values else: try: self.data[data_pointers['Y_data_coord']] = self.sg.Y_coord.values except: pass self.data['X_*'] = X self.data['components'] = components ### store just the predicted mean for now... data = self.data['model_mu'] self.data['main_array'] = self.data['model_mu'] print(self.data['main_array'].shape) ### write out the data to disk as a csv or h5? if write_data: self._writedata(data)
[docs] def status(self): status = ['Dummy SPECTROSCOPY data'] return status
[docs] def train_model(self, kernel=None, niter=1000, optimizer=None, noiseless=True, tol=1e-6, heteroscedastic=False): ### Hyperparameter evaluation and model "training". Can consider augmenting these in a separate call. if kernel != None: self.kernel = kernel if optimizer != None: self.optimizer = optimizer if self.clustered: # print('you made it here!!!') # for gpmodel in self.sg.concat_GPs: # print('attrs: ', list(gpmodel.__dict__)) self.sg.train_all( kernel = self.kernel, niter = niter, optimizer = self.optimizer, noiseless = noiseless, tol = tol, heteroscedastic = heteroscedastic, gplist = self.sg.concat_GPs ) else: print('not clustered') self.sg.train_model( kernel = self.kernel, niter = niter, optimizer = self.optimizer, noiseless = noiseless, tol = tol, heteroscedastic = heteroscedastic )
def _writedata(self,data): filename = pathlib.Path(self.config['filename']) filepath = pathlib.Path(self.config['filepath']) print(f'writing data to {filepath/filename}') with h5py.File(filepath/filename, 'w') as f: f.create_dataset(str(uuid.uuid1()), data=data)