import numpy as np
import xarray as xr
import itertools
from numpy.polynomial import chebyshev, legendre, polynomial
# Import lazy_loader for optional dependencies
import lazy_loader as lazy
# Machine learning dependencies
tf = lazy.load("tensorflow", require="AFL-automation[ml]")
gpflow = lazy.load("gpflow", require="AFL-automation[ml]")
AFLagent = lazy.load("AFL.agent", require="AFL-agent")
# Geometry dependencies
alphashape = lazy.load("alphashape", require="AFL-automation[geometry]")
[docs]
class Interpolator():
[docs]
def __init__(self, dataset):
self.defaults = {}
self.defaults['X_data_pointer'] = 'components'
self.defaults['X_data_exclude'] = 'P188' #typically the solutes components
self.defaults['Y_data_filter'] = ('SAS_savgol_xlo', 'SAS_savgol_xhi')
self.defaults['Y_data_pointer'] = 'SAS'
self.defaults['Y_data_coord'] = 'q'
self.dataset = dataset
self.set_trainable = lazy.load("gpflow.set_trainable", require="AFL-automation[ml]")
self.NaturalGradient = lazy.load("gpflow.optimizers.NaturalGradient", require="AFL-automation[ml]")
self.STRtree = lazy.load("shapely.STRtree", require="AFL-automation[geometry]")
self.unary_union = lazy.load("shapely.unary_union", require="AFL-automation[geometry]")
self.Point = lazy.load("shapely.Point", require="AFL-automation[geometry]")
#construct a kernel for fitting a GP model
self.optimizer = tf.optimizers.Adam(learning_rate=0.005)
self.kernel = gpflow.kernels.Matern52(variance=1.0, lengthscales=(1e-1))
self.opt_HPs = []
[docs]
def get_defaults(self):
return self.defaults
[docs]
def set_defaults(self,default_dict):
self.defaults = default_dict
[docs]
def load_data(self, dataset=None):
if isinstance(self.dataset,xr.core.dataset.Dataset)==False:
self.dataset = dataset
print('default params')
for k,v in self.defaults.items():
print(k,v)
self.defaults['X_data_range'] = [f'{component}_range' for component in self.dataset.attrs[self.defaults['X_data_pointer']] if component not in self.defaults['X_data_exclude']]
try:
#produces an N x M array N is the number of samples, M is the number of dimensions N is the number of input points
self.X_raw = xr.DataArray(np.array([self.dataset[i].values for i in self.dataset.attrs[self.defaults['X_data_pointer']] if i not in self.defaults['X_data_exclude']]).T)
self.X_ranges = [self.dataset.attrs[i] for i in self.defaults['X_data_range']]
#produces an N x K array N is the number of training data points, K is the number of scattering values
if None not in self.defaults['Y_data_filter']:
print('-----------------------')
print("nones in Y_data_filter")
self.Y_raw = self.dataset[self.defaults['Y_data_pointer']].sel({self.defaults['Y_data_coord']:slice(self.dataset.attrs[self.defaults['Y_data_filter'][0]],self.dataset.attrs[self.defaults['Y_data_filter'][1]])}).T
if self.defaults['Y_data_coord'] == []:
self.Y_coord = []
else:
self.Y_coord = self.dataset[self.defaults['Y_data_coord']]
else:
self.Y_raw = self.dataset[self.defaults['Y_data_pointer']].T
if self.defaults['Y_data_coord'] == []:
self.Y_coord = []
else:
try:
self.Y_coord = self.dataset[self.defaults['Y_data_coord']]
except:
raise ValueError('check the Y_data_coord it can be none or linked to another coordinate/dimension')
# print('The set Y_coords')
# print(self.Y_coord)
except:
raise ValueError("One or more of the inputs X or Y are not correct. Check the defaults")
# print(self.X_raw.shape, self.Y_raw.shape)
[docs]
def standardize_data(self):
#####
#if there is only one data point, there is an issue in how the X_train data standardizes. It needs to be set to a default that falls between the range.
#####
#the grid of compositions does not always start at 0, this shifts the minimum
if len(self.X_raw) > 1:
self.Y_mean = self.Y_raw.mean(dim='sample') #this should give a mean and stdev for each q value or coefficient
self.Y_std = self.Y_raw.std(dim='sample') #this should give a mean and stdev for each q value or coefficient
#sets X_train to be from 0. to 1. along each dimension
#should be determined based on the range of the dataset....
self.X_train = np.array([(i - self.X_ranges[idx][0])/(self.X_ranges[idx][1] - self.X_ranges[idx][0]) for idx, i in enumerate(self.X_raw.T)]).T
#sets Y_train to be within -1. to 1. for every target parameter
self.Y_train = (self.Y_raw - self.Y_mean)/self.Y_std
print(self.Y_train.shape)
if len(self.Y_train.shape) == 1:
self.Y_train = np.expand_dims(self.Y_train.values,axis=1).T
self.Y_train = self.Y_train.T
print('data standardized')
print('Y_train shape',self.Y_train.shape)
print('X_train shape',self.X_train.shape)
print('Y_coord',self.Y_coord)
else:
self.Y_mean = self.Y_raw.mean(dim='sample')
self.Y_std = self.Y_raw.std(dim='sample')
self.X_train = np.array([(i - self.X_ranges[idx][0])/(self.X_ranges[idx][1] - self.X_ranges[idx][0]) for idx, i in enumerate(self.X_raw.T)]).T
## the best I can do is subtract the mean of the data. If the training data are scalars, not spectra, then there will be no stdev for one point
self.Y_train = self.Y_raw.values.T
print('X_train shape', self.X_train.shape)
print('Y_train shape', self.Y_train.shape)
[docs]
def construct_model(self, kernel=None, noiseless=False, heteroscedastic=False):
if kernel != None:
self.kernel = kernel
### Due to the difficulty of doing the heteroscedastic modeling, I would avoid this for now
if (heteroscedastic):# & (self.Y_unct!=None):
likelihood = AFLagent.HscedGaussianProcess.HeteroscedasticGaussian()
data = (self.X_train,np.stack((self.Y_train,self.Y_unct),axis=1))
# print(data[0].shape,data[1].shape)
self.model = gpflow.models.VGP(
data = data,
kernel = kernel,
likelihood = likelihood,
num_latent_gps=1
)
self.natgrad = self.NaturalGradient(gamma=0.5)
self.adam = tf.optimizers.Adam()
self.set_trainable(self.model.q_mu, False)
self.set_trainable(self.model.q_sqrt, False)
else:
## this is the standard GPR model
self.model = gpflow.models.GPR(
data = (self.X_train,self.Y_train),
kernel = self.kernel
)
## this will force the model to go through the training points
if noiseless:
# print("assuming noiseless data")
self.model.likelihood.variance = gpflow.likelihoods.Gaussian(variance=1.00001e-6).parameters[0]
# print(self.model.parameters)
self.set_trainable(self.model.likelihood.variance, False)
return self.model
[docs]
def train_model(self,kernel=None, optimizer=None, noiseless=False, heteroscedastic=False, tol=1e-4, niter=21):
#print(self.X_train.shape,self.Y_train.shape)
if kernel != None:
self.kernel = kernel
if optimizer != None:
self.optimizer = optimizer
## load the model if it is not there already (because it wasn't instantiated on __init__ not sure how to use isinstance() here
if 'X_train' not in list(self.__dict__):#isinstance(self.X_train, type(np.ndarray)) == False:
print('standardizing X_data and constructing model')
self.standardize_data()
self.construct_model(kernel=kernel, noiseless=noiseless, heteroscedastic=heteroscedastic)
if 'model' not in list(self.__dict__):
print('constructing model')
self.construct_model(kernel=kernel, noiseless=noiseless, heteroscedastic=heteroscedastic)
print('training data shapes')
print(self.model.data[0].shape,self.model.data[1].shape)
## optimize the model
# print(self.kernel,self.optimizer)
print(self.model.parameters)
print(self.model)
print(self.optimizer)
print()
i = 0
break_criteria = False
while (i <= niter) or (break_criteria==True):
# for i in range(niter):
if heteroscedastic == False:
pre_step_HPs = np.array([i.numpy() for i in self.model.parameters])
self.optimizer.minimize(self.model.training_loss, self.model.trainable_variables)
self.opt_HPs.append([i.numpy() for i in self.model.parameters])
post_step_HPs = np.array([i.numpy() for i in self.model.parameters])
i+=1
if all(abs(pre_step_HPs-post_step_HPs) <= tol):
break_criteria=True
break
else:
self.natgrad.minimize(self.model.training_loss, [(self.model.q_mu, self.model.q_sqrt)])
self.adam.minimize(self.model.training_loss, self.model.trainable_variables)
i+=1
# print('test_prediction')
# mean,var = self.model.predict_f(np.array([[1.,0.5,0.5]]))
# print('mean shape', mean.shape)
return self.model
[docs]
def predict(self, X_new=[[0,0,0],[1,2,3]]):
"""
Returns the simulated scattering pattern given the specified coordinates and polynomial type if reduced
"""
#convert the requested X_new into standardized coordinates
print('X_new non-standardized should be DxM', np.array(X_new), np.array(X_new).shape)
print('X_ranges should be length D', self.X_ranges)
try:
X_new = np.array([(i - min(self.X_ranges[idx]))/(max(self.X_ranges[idx]) - min(self.X_ranges[idx])) for idx, i in enumerate(np.array(X_new))]).T
except:
raise ValueError("Check the dimensions of X_new and compare to the input dimensions on X_raw")
print('X_new standardized', X_new)
# for col in X_new.T:
# print(min(col),max(col))
#check to see if the input coordinates and dimensions are correct:
if np.any(np.round(X_new,5)< 0.) or np.any(np.round(X_new,5)> 1.00001):
raise ValueError('check requested values for X_new, data not within model range')
print('requested point ', X_new, X_new.shape)
self.predictive_mean, self.predictive_variance = self.model.predict_f(X_new)
self.predictive_mean = self.predictive_mean.numpy()[0]
self.predictive_variance = self.predictive_variance.numpy()[0]
#un-standardize
# print(self.predictive_mean,self.predictive_variance)
# print(range(len(self.predictive_mean)), range(len(self.predictive_variance)))
mean = np.array(self.predictive_mean*self.Y_std.values + self.Y_mean.values)
variance = np.array(self.predictive_variance*self.Y_std.values + self.Y_mean.values)
# mean = np.array([self.predictive_mean[i] * self.Y_std + self.Y_mean for i in range(len(self.predictive_mean))])
# variance = np.array([self.predictive_variance[i] * self.Y_std + self.Y_mean for i in range(len(self.predictive_variance))])
# print(mean, variance)
return mean, variance
[docs]
def print_diagnostics(self):
for item in self.__dict__:
print(item, self.__dict__[item])
print('')
return
[docs]
class ClusteredGPs():
"""
This is the wrapper class that enables scattering interpolation within the classifier labels. It presupposes that the dataset or manifest contains a datavariable called labels and that it is
"""
[docs]
def __init__(self, dataset):
self.ds_manifest = dataset
self.datasets = [dataset.where(dataset.labels == cluster).dropna(dim='sample') for cluster in np.unique(dataset.labels)]
self.independentGPs = [Interpolator(dataset=dataset.where(dataset.labels == cluster).dropna(dim='sample')) for cluster in np.unique(dataset.labels)]
self.concat_GPs = None
#####
#Note: Edge cases can make this difficult. overlapping clusters are bad (should be merged), and clusters of size N < (D + 1) input dimensions will fault
# on concave hull concstruction. so the regular point, or line constructions will be used instead. (for two input dimensions)
#####
[docs]
def get_defaults(self):
"""
returns a list of dictionaries corresponding to the default data pointers for each GP model
"""
return self.independentGPs[0].defaults
[docs]
def set_defaults(self,default_dict):
"""
check the default params, but these are pointers to the values in the dataset
"""
for idx in range(len(self.independentGPs)):
self.independentGPs[idx].defaults = default_dict
[docs]
def load_datasets(self,gplist=None):
"""
This function instantiates the X_raw, Y_raw data with the appropriate filtering given the defaults dictionaries
"""
if isinstance(gplist,type(None)):
gplist = self.independentGPs
for gpmodel in gplist:
gpmodel.load_data()
gpmodel.standardize_data()
gpmodel.construct_model()
[docs]
def define_domains(self, gplist=None, alpha=0.1, buffer=0.01):
"""
define domains will generate the shapely geometry objects for the given datasets X_raw data points.
"""
## this isn't written well for arbitrary dimensions...
self.domain_geometries = []
if isinstance(gplist,type(None)):
gplist = self.independentGPs
for idx, gpmodel in enumerate(gplist):
# print(len(gpmodel.X_raw))
if len(gpmodel.X_raw.values) == 1:
domain_polygon = geometry.point.Point(gpmodel.X_raw.values)
elif len(gpmodel.X_raw.values) == 2:
domain_polygon = geometry.LineString(gpmodel.X_raw.values)
else:
domain_polygon = alphashape.alphashape(gpmodel.X_raw.values,alpha)
self.domain_geometries.append(domain_polygon.buffer(buffer))
return self.domain_geometries
[docs]
def unionize(self,gplist=None, geomlist=None, dslist=None, buffer=0.01):
"""
determines the union of and indices of potentially conflicting domains. Creates the new indpendent_GPs list that corresponds.
Note! buffer is important here! if a cluster contains a single point or a line, there is a bunch of stuff that breaks.
specifying a non-zero buffer will force these lesser dimensional objects to be polygonal and help with all the calculations.
a large buffer will potentially merge polygons though. exercise caution
"""
#### this bit of header is for generalizability. probably needs to be corrected
if isinstance(gplist,type(None)):
print('setting GP list')
gplist = self.independentGPs
if isinstance(geomlist,type(None)):
print('setting geometries')
geomlist = self.domain_geometries
if isinstance(dslist,type(None)):
print('setting dataset list')
dslist = self.datasets
### this finds the union between the list of shapely geometries and does the apapropriate tree search for all combinations
self.union = self.unary_union(geomlist).buffer(buffer)
tree = self.STRtree(list(self.union.geoms))
common_indices = []
for gpmodel in gplist:
test_point = gpmodel.X_raw[0]
common_indices.append(tree.query(self.Point(test_point), predicate='intersects')[0])
### this is to concatinate ovelapping datasets into one
self.union_datasets = []
for i in np.unique(common_indices):
#print(i)
store = [ds for j, ds in zip(common_indices,dslist) if j==i]
if len(store)>1:
#print(len(store))
ds = xr.concat(store,dim='sample')
else:
ds = store[0]
self.union_datasets.append(ds)
self.union_geometries = self.union
self.concat_GPs = [Interpolator(dataset=ds) for ds in self.union_datasets]
return self.concat_GPs, self.union_geometries, common_indices
[docs]
def train_all(self,kernel=None, optimizer=None, noiseless=True, heteroscedastic=False, niter=21, tol=1e-4, gplist=None):
# if isinstance(gplist,type(None)):
# gplist = self.independentGPs
if isinstance(self.concat_GPs, type(None)):
gplist = self.independentGPs
else:
gplist = self.concat_GPs
if isinstance(optimizer, type(None)) or isinstance(optimizer, type(tf.optimizers.Adam())):
optimizerlist = [tf.optimizers.Adam(learning_rate=0.005) for i in range(len(gplist))]
else:
optimizerlist = optimizer
# for op in optimizerlist
self.all_models = [gpmodel.train_model(
kernel=kernel,
optimizer=optimizerlist[idx],
noiseless=noiseless,
heteroscedastic=heteroscedastic,
niter=niter,
tol=tol) for idx, gpmodel in enumerate(gplist)]
[docs]
def predict(self, X_new=None,gplist=None,domainlist=None, shapely_domains=False):
"""
Returns posterior mean and uncertainty of the pattern for the model closest to the input coordinate. Note that the coordinate should be in natural units specified by the Interpolator.predict function
"""
if isinstance(gplist,type(None)):
gplist = self.independentGPs
### first find the model with boundaries closest to the requested input
distances = []
### 2D input data will work but not for highter dimensions. this needs to be an option, otherwise it should just call the
# GP model that has a point closest to the X_new
if shapely_domains:
if isinstance(domainlist,type(None)):
domains = self.domain_geometries
for idx, geom in enumerate(list(self.union_geometries.geoms)):
dist = geom.exterior.distance(self.Point(X_new))
distances.append(dist)
# print(distances)
else:
# print("not using shapely shapes")
# print('X_new = ', X_new.T)
for idx, model in enumerate(gplist):
# print(X_new.shape, model.X_raw.values[0].shape)
subset_X = model.X_raw
sub_min = min([np.linalg.norm(X_new.T[0] - point) for point in model.X_raw.values])
print(f'model {idx} min: {sub_min}')
distances.append(sub_min)
model_idx = np.argmin(distances)
print(f"found model index = {model_idx}")
gpmodel = gplist[model_idx]
print(gplist[model_idx])
if len(X_new.shape) < 2:
X_new = np.expand_dims(X_new,axis=1)
print(X_new.T.shape, type(X_new))
mean, variance = gpmodel.predict(X_new=X_new)
print('mean shape',mean.shape)
return mean, variance, model_idx
[docs]
def print_diagnostics(self):
for item in self.__dict__:
print(self.dict[item])