Source code for ml_uncertainty

import numpy as np
import sklearn.decomposition
import sklearn.cross_decomposition
import sklearn.model_selection
import sklearn.pipeline as skpipe
import math
import tqdm
import scipy

def get_probabilities(class_predicted=None,data_error=None,PLS_model=None,class_value=0.5):
    #class_predicted = PLS_model.predict(data) # Get the predicted classes for this data set
    probability_zero = [] # Empty list to be filled with the 
    for mol_num,value in enumerate(class_predicted):
        error = data_error[mol_num]
        prob_zero = 0.5 * (
            1 + math.erf(
                (class_value - value)/(error/2 * math.sqrt(2))
            )
        ) # Find where 0.5 falls on the cumulative distribution function
        probability_zero += [prob_zero]
    return np.array(probability_zero)
def class_assignment(data=None,data_error=None,PLS_model=None,class_value=0.5):
    class_predicted = data
    if PLS_model is not None:
        class_predicted = PLS_model.predict(data) # Get the predicted classes for this data set
    class_assigned = np.zeros_like(class_predicted,dtype=int)
    class_assigned[class_predicted > class_value] = 1
    return class_assigned,class_predicted
def class_assignment_boot(class_predicted_bootstrap=None,data_error=None,PLS_model=None,class_value=0.5):
    #class_predicted_bootstrap = PLS_model.predict(data)
    class_predicted = np.median(class_predicted_bootstrap,keepdims=True,axis=1)
    class_assigned = np.zeros_like(class_predicted,dtype=int)
    class_assigned[class_predicted > class_value] = 1
    return class_assigned,class_predicted,class_predicted_bootstrap
def find_misclassified(true_class=None,assigned_class=None):
    misclass = np.squeeze(abs(true_class - assigned_class.T)) # If true class == assigned class, this will be 0, otherwise 1 or -1
    misclass_mask = misclass == 1 
    return misclass,misclass_mask
def _estimate_class_boundary(predicted_class,actual_class):
    #Find which samples correspond to actual classes 1 and 0
    class_one = predicted_class[actual_class==1]
    class_zero = predicted_class[actual_class==0]
    
    #Find the mean prediction and standard deviation for all of the class 1 samples
    class_one_mean = class_one.mean()
    class_one_std = class_one.std()
    
    #Find the mean prediction and standard deviation for all of the class 0 samples
    class_zero_mean = class_zero.mean()
    class_zero_std = class_zero.std()
    
    #Compute the location where class 1 and class 0 probability density functions are equal
    log_std_ratio = np.log(class_one_std / class_zero_std)
    mean_diff = (class_one_mean - class_zero_mean) / 2
    mean_avg = (class_one_mean + class_zero_mean) / 2
    
    class_boundary = ((class_one_std) ** 2) * ((class_zero_std) ** 2) / mean_diff * log_std_ratio + mean_avg
    return class_boundary

def estimate_class_boundary(predicted_class,actual_class):
    #Find which samples correspond to actual classes 1 and 0
    class_one = predicted_class[actual_class==1]
    class_zero = predicted_class[actual_class==0]
    
    #print(class_one)
    #print(class_zero)
    
    #Find the mean prediction and standard deviation for all of the class 1 samples
    class_one_mean = class_one.mean()
    class_one_std = class_one.std()
    
    #Find the mean prediction and standard deviation for all of the class 0 samples
    class_zero_mean = class_zero.mean()
    class_zero_std = class_zero.std()
    
    #Compute the location where class 1 and class 0 probability density functions are equal
    #These are constants needed in the calculation
    log_std_ratio = np.log(class_zero_std / class_one_std)
    var_product = (class_zero_std * class_one_std) ** 2
    var_diff = class_zero_std ** 2 - class_one_std ** 2
    mean_diff_squared = (class_one_mean - class_zero_mean)**2
    
    #Coefficients in the quadratic for x
    a = var_diff
    minus_b = 2 * (class_one_mean * (class_zero_std ** 2) - class_zero_mean * (class_one_std ** 2))
    c = ((class_zero_mean * class_one_std)**2 - (class_one_mean * class_zero_std)**2) - 2*(log_std_ratio*var_product)
    
    #Simplify the discriminant
    discriminant = 2*np.sqrt(2*log_std_ratio*var_product*var_diff + var_product*mean_diff_squared)
    
    center = minus_b/(2*a)
    
    #Quadratic, so two roots
    #Quadratic, so two roots
    plus_boundary = (minus_b + discriminant)/(2*a)
    minus_boundary = (minus_b - discriminant)/(2*a)
    
    upper_boundary = max(plus_boundary,minus_boundary)
    lower_boundary = min(plus_boundary,minus_boundary)
    
    #Choose boundary based on which std is bigger (equivalent to sign of var_diff)
    class_boundary = lower_boundary 
    if upper_boundary < class_one_mean : class_boundary = upper_boundary

    return class_boundary

def bootstrap_stratified(data,classes,samples=1000,axis=0):
    num_data,num_classes = classes.shape#Get the shape of the class data
    
    full_indices = np.arange(num_data)#Get an array of the indices
    
    data_boot = []
    boot_indices = []
    for class_num in range(num_classes):
        class_mask = classes[:,class_num] == 1 #Find which samples are in this class
        this_data = data[class_mask] #Get the x data
        this_indices = full_indices[class_mask]
        #Boot the straps
        this_boot,this_boot_indices = bootstrap_data(this_data,samples=samples,axis=axis)
        #Put the data into a list
        data_boot += [this_boot]
        #use the local indices to get where these data came from in the full matrix
        boot_indices += [this_indices[this_boot_indices]]
        
    data_boot = np.concatenate(data_boot,axis=1)
    boot_indices = np.concatenate(boot_indices,axis=1)
    return data_boot,boot_indices


def bootstrap_data(data,samples=1000,axis=0):
    """Creates random indices in order to conduct bootstrap uncertainty analysis. Returns also the permuted data matrix.
    """
    array_shape = data.shape
    boot_shape = (samples,) + (array_shape[axis],)
    
    boot_indices = np.random.randint(0,array_shape[axis],boot_shape)
    
    boot_out = data[boot_indices]
    
    if len(boot_out.shape) < 3: boot_out = np.expand_dims(boot_out,axis=-1)
    
    return boot_out,boot_indices


[docs]def simple_bootstrap(xdata=None,ydata=None, PLS_model=None,PLS_cv=None,PLS_bootstrap=None,sk_model=sklearn.cross_decomposition.PLSRegression, cv_object=None,class_value=0.5,samples=1000,PLS_kw=None,return_boot=False): """Conducts a simple residual bootstrap analysis on a set of data. Computes cross-validation uncertainty. This function relies on the Y-data being bootstrapped to be one-dimensional. It also requires the model to be accept two-dimensional data. The bootstrapping is done by generating :math:`samples` random variations on the Y-data and then concatenating them into a two-dimensional array. If PLS_model is None, then PLS_cv and PLS_bootstrap are ignored. The function will create independent instances of :py:class:sk_model for each of PLS_model, PLS_cv, and PLS_boostrap. If PLS_model is not None, then it will be reused for PLS_cv and PLS_bootstrap. :key xdata: The X data used to fit the model (default None) :key ydata: The Y data used to fit the model (default None) :key PLS_model: The scikit-learn model that will be fit using X and Y :key PLS_cv: The scikit-learn model that will be used for cross-validation :key PLS_bootstrap: The scikit-learn model that will be used for bootstrapping :key sk_model: If PLS_model,PLS_cv,or PLS_bootstrap is None, this scikit-learn model will be used to create them :key cv_object: The cross-validation model that will be used for calculating cross-validation statistics :key class_value: The value separating the classes in PLS-DA :key samples: The number of samples for bootstrapping :key PLS_kw: The keyword arguments that will be passed to sk_model :key return_boot: If True, returns the PLS_bootstrap model as part of the output :type xdata: ndarray :type ydata: 1-d array :type PLS_model: scikit-learn model instance :type PLS_cv: scikit-learn model instance :type PLS_bootstrap: scikit-learn model instance :type skmodel: scikit-learn model :type cv_object: scikit-learn model selection instance :type class_value: scalar :type samples: int :type PLS_kw: dict :type return_boot: Boolean """ if PLS_model is None: #PLS_model = sklearn.cross_decomposition.PLSRegression(**PLS_kw) #PLS_cv = sklearn.cross_decomposition.PLSRegression(**PLS_kw) #PLS_bootstrap = sklearn.cross_decomposition.PLSRegression(**PLS_kw) PLS_model = sk_model(**PLS_kw) PLS_cv = sk_model(**PLS_kw) PLS_bootstrap = sk_model(**PLS_kw) print (PLS_model) if PLS_cv is None: PLS_cv = PLS_model if PLS_bootstrap is None: PLS_bootstrap = PLS_model tpls = PLS_model.fit(xdata,ydata) #Get class assignments for the base PLS model class_assigned_train,class_predicted_train = class_assignment(data=xdata,PLS_model=PLS_model,class_value=class_value) if cv_object is None: cv_object = sklearn.model_selection.StratifiedKFold(n_splits=6) #Cross-validate class_assigned_cv,class_predicted_cv = cross_validate(xdata=xdata,ydata=ydata, PLS_model=PLS_cv,cv_object=cv_object,class_value=class_value) #Calculate residuals and mean squared errors of regression residual,err,mse = get_residual_stats(ydata,class_predicted_train) residual_cv,err_cv,msecv = get_residual_stats(ydata,class_predicted_cv) print('Y data shape',ydata.shape) print('Output shapes',class_predicted_train.shape,class_predicted_cv.shape) print('Meas squared error:',mse,msecv) #Calculate the pseudo degrees of freedom and the corresponding bootstrap weighting factor num_train = len(ydata) pseudo_dof = num_train * (1 - np.sqrt(mse/msecv)) bootstrap_weight = np.sqrt(1 - pseudo_dof/num_train) #Caclulate the weighted residual vector and bootstrap it to generate the bootstrap perturbations residual_weighted = residual / bootstrap_weight residual_boot,boot_indices = bootstrap_data(residual_weighted,samples=samples) #Get the new y data generated by bootstrapping and fit the PLS model to it class_y_boot = np.transpose(np.squeeze(class_predicted_train) + np.squeeze(residual_boot)) print ('predicted_train = ', class_predicted_train.shape) print ('residuals shape = ', residual_boot.shape) print ('boot data shape = ', class_y_boot.shape) print ('x data shape = ',xdata.shape) PLS_bootstrap.fit(xdata,class_y_boot) #Get the bootstrapped predictions from the PLS model class_predicted_boot = PLS_bootstrap.predict(xdata) return_out = (residual_cv,err_cv,msecv,class_predicted_boot,class_predicted_train,) if return_boot: return_out += (PLS_bootstrap,) return return_out#residual_cv,err_cv,msecv,class_predicted_boot,class_predicted_train
[docs]def bootstrap(xdata=None,ydata=None,validdata=None, PLS_model=None,PLS_cv=None,PLS_bootstrap=None,sk_model=sklearn.cross_decomposition.PLSRegression,regression=False, cv_object=None,class_value=0.5,samples=1000,PLS_kw=None,return_scores=False,return_loadings=False,tq=True): """Conducts a simple residual bootstrap analysis on a set of data. Computes cross-validation uncertainty. This function performs a full bootstrap and makes no assumption about the shape or structure of the Y data. Each bootstrap sample will have an independent model fit to it. If PLS_model is None, then PLS_cv and PLS_bootstrap are ignored. The function will create independent instances of :py:class:sk_model for each of PLS_model, PLS_cv, and PLS_boostrap. If PLS_model is not None, then it will be reused for PLS_cv and PLS_bootstrap. :key xdata: The X data used to fit the model (default None) :key ydata: The Y data used to fit the model (default None) :key validdata: Additional data not used to fit the model but for which uncertainty will be calculated :key PLS_model: The scikit-learn model that will be fit using X and Y. If None, a new model will be created from sk_model :key PLS_cv: The scikit-learn model that will be used for cross-validation. If None, same as PLS_model. :key PLS_bootstrap: The scikit-learn model that will be used for bootstrapping. If None, same as PLS_model. :key sk_model: If PLS_model,PLS_cv,or PLS_bootstrap is None, this scikit-learn model will be used to create them :key cv_object: The cross-validation model that will be used for calculating cross-validation statistics :key class_value: The value separating the classes in PLS-DA :key samples: The number of samples for bootstrapping :key PLS_kw: The keyword arguments that will be passed to sk_model :key return_scores: If True, returns the scores of the PLS_bootstrap model as part of the output :key return_loadings: If True, returns the loadings of the PLS_bootstrap model as part of the output :type xdata: ndarray :type ydata: 1-d array :type PLS_model: scikit-learn model instance :type PLS_cv: scikit-learn model instance :type PLS_bootstrap: scikit-learn model instance :type s_model: scikit-learn model :type cv_object: scikit-learn model selection instance :type class_value: scalar :type samples: int :type PLS_kw: dict :type return_scores: Boolean :type return_loadings: Boolean """ if PLS_model is None: #PLS_model = sklearn.cross_decomposition.PLSRegression(**PLS_kw) #PLS_cv = sklearn.cross_decomposition.PLSRegression(**PLS_kw) #PLS_bootstrap = sklearn.cross_decomposition.PLSRegression(**PLS_kw) PLS_model = sk_model(**PLS_kw) PLS_cv = sk_model(**PLS_kw) PLS_bootstrap = sk_model(**PLS_kw) if PLS_cv is None: PLS_cv = PLS_model if PLS_bootstrap is None: PLS_bootstrap = PLS_model if cv_object is None: cv_object = sklearn.model_selection.StratifiedKFold(n_splits=6) tpls = PLS_model.fit(xdata,ydata) scores_base = None try: scores_base = PLS_model.transform(xdata) except AttributeError: #Not all classification models have a transform attribute, so print('No transform function, skipping scores') #Get class assignments for the base PLS model (do not do this for regression models) class_assigned_train,class_predicted_train = class_assignment(data=xdata,PLS_model=PLS_model,class_value=class_value) #Cross-validate class_assigned_cv,class_predicted_cv = cross_validate(xdata=xdata,ydata=ydata, PLS_model=PLS_cv,cv_object=cv_object,class_value=class_value) #Calculate residuals and mean squared errors of regression residual,err,mse = get_residual_stats(ydata,class_predicted_train.T) residual_cv,err_cv,msecv = get_residual_stats(ydata,class_predicted_cv.T) #Calculate the pseudo degrees of freedom and the corresponding bootstrap weighting factor num_train = len(ydata) pseudo_dof = num_train * (1 - np.sqrt(mse/msecv)) bootstrap_weight = np.sqrt(1 - pseudo_dof/num_train) #Caclulate the weighted residual vector and bootstrap it to generate the bootstrap perturbations residual_weighted = residual / bootstrap_weight residual_boot,boot_indices = bootstrap_data(residual_weighted,samples=samples) #print residual_boot.shape #class_predicted_boot = np.empty((len(ydata),0,)) #if validdata is not None: # class_valid_boot = np.empty((len(validdata),0,)) #scores_boot = np.empty((0,PLS_kw['n_components'])) #scores_valid = np.empty((0,PLS_kw['n_components'])) #load_boot = np.empty((xdata.shape[1],0,)) class_predicted_boot = [] if validdata is not None: class_valid_boot = [] scores_boot = [] scores_valid = [] load_boot = [] if tq: iterate = tqdm.tqdm_notebook(residual_boot) else: iterate = residual_boot for bt in iterate: #print (class_predicted_train.shape) #print (bt.shape) if len(class_predicted_train.shape) > 1: #Check to make sure the shapes of class_predicted_train and bt align properly class_y_boot = class_predicted_train + bt else: class_y_boot = class_predicted_train + np.squeeze(bt) #print (class_y_boot.shape) #print bt.shape #print class_y_boot.shape #print class_predicted_train.shape #raise ValueError PLS_bootstrap.fit(xdata,class_y_boot) cp_boot = PLS_bootstrap.predict(xdata) #class_predicted_boot = np.concatenate((class_predicted_boot,cp_boot),axis=1) array_dims = len(cp_boot.shape) if array_dims > 1: #the model gives a 2-D array class_predicted_boot += [cp_boot] else: #the model gives a 1-D array and we have to add a new singleton axis to it class_predicted_boot += [cp_boot[:,np.newaxis]] try: sc_star = PLS_bootstrap.transform(xdata) procrustes_rotation,procrustes_scale = scipy.linalg.orthogonal_procrustes(sc_star,scores_base) sc_boot = np.dot(sc_star,procrustes_rotation) #scores_boot = np.concatenate((scores_boot,sc_boot),axis=0) scores_boot += [sc_boot] ld_star = PLS_bootstrap.x_loadings_ ld_boot = np.dot(ld_star,procrustes_rotation) #load_boot = np.concatenate((load_boot,ld_boot),axis=1) load_boot += [ld_boot] except AttributeError: pass if validdata is not None: valid_boot = PLS_bootstrap.predict(validdata) #class_valid_boot = np.concatenate((class_valid_boot,valid_boot),axis=1) if array_dims > 1: class_valid_boot += [valid_boot] else: class_valid_boot += [valid_boot[:,np.newaxis]] try: sc_v = PLS_bootstrap.transform(validdata) #scores_valid = np.concatenate((scores_valid,sc_v),axis=0) scores_valid += [sc_v] except AttributeError: pass #print(class_predicted_boot[0].shape) class_predicted_boot = np.concatenate(class_predicted_boot,axis=1) return_out = (residual_cv,err_cv,msecv,class_predicted_boot,class_predicted_train,) if validdata is not None: class_valid_boot = np.concatenate(class_valid_boot,axis=1) return_out += (class_valid_boot,) if return_scores: scores_boot = np.concatenate(scores_boot,axis=0) return_out += (scores_boot,) if validdata is not None: scores_valid = np.concatenate(scores_valid,axis=0) return_out += (scores_valid,) if return_loadings: load_boot = np.concatenate(load_boot,axis=1) return_out += (load_boot,) return return_out
[docs]def bootstrap_unc(xdata=None,ydata=None,valid_data=None, cv_object=None, samples=1000,class_value=0.5, PLS_kw=None,return_scores=False,tq=True): """Computes the uncertainty in a bootstrap analysis by leave-one-out cross-validation. For each sample, the uncertainty is calculated by fitting the other samples to the model, calculating the bootstrap uncertainty and then calculating the uncertainty in the held-out sample. :key xdata: The X data used to fit the model (default None) :key ydata: The Y data used to fit the model (default None) :key valid_data: Additional data not used to fit the model but for which uncertainty will be calculated :key cv_object: The cross-validation model that will be used for calculating cross-validation statistics :key samples: The number of samples for bootstrapping :key class_value: The value separating the classes in PLS-DA :key PLS_kw: The keyword arguments that will be passed to sk_model :key return_scores: If True, returns the scores of the PLS_bootstrap model as part of the output :type xdata: ndarray :type ydata: ndarray :type cv_object: scikit-learn model selection instance :type class_value: scalar :type samples: int :type PLS_kw: dict :type return_scores: Boolean """ pls_comps = PLS_kw['n_components'] ci_boot = np.empty((0,2)) predict_boot = np.empty((0,samples)) scores_full = np.empty((0,pls_comps)) ci_boot = [] predict_boot = [] scores_full = [] #This should be the base PLS model base_pls = sklearn.cross_decomposition.PLSRegression(**PLS_kw) _ = base_pls.fit(xdata,ydata) class_predicted_train = base_pls.predict(xdata) if tq: iterate = tqdm.tqdm_notebook(cv_object.split(xdata,ydata),total=len(ydata)) else: iterate = cv_object.split(xdata,ydata) #for train_index,test_index in tqdm.tqdm_notebook(cv_object.split(xdata,ydata),total=len(ydata)): for train_index,test_index in iterate: # This is the element that is being removed and having its uncertainty calculated this_x = xdata[test_index] this_y = ydata[test_index] #This is everything else not_this_x = xdata[train_index] not_this_y = ydata[train_index] #Create the local PLS models and the stratified K fold cross validation object this_pls = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_pls_cv = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_pls_boot = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_cv = sklearn.model_selection.StratifiedKFold(n_splits=6) #Boot the straps boot_out = bootstrap(xdata=not_this_x, ydata=not_this_y, validdata=this_x, PLS_model=this_pls, PLS_cv = this_pls_cv, PLS_bootstrap=this_pls_boot, PLS_kw=PLS_kw, cv_object=this_cv, samples=samples, class_value=class_value, return_scores=return_scores, tq=tq) #class_predicted_boot is the bootstrap predictions for not_this_x #class_y_base_predict is the single-model class prediction for not_this_x #class_y_boot_predict is the bootstrap prediction for this_x #rcv,ecv,msecv,class_predicted_boot,class_y_base_predict,class_y_boot_predict,scores_boot,scores_valid = boot_out rcv,ecv,msecv,boot_ydata,train_ydata,boot_this_x,scores_boot,scores_valid=boot_out #rcv,ecv,msecv,class_predicted_boot,class_predicted_train,scores_boot = lbt_out #return_out = (residual_cv,err_cv,msecv,class_predicted_boot,class_predicted_train,class_valid_boot,scores_boot,) #Compute the confidence interval train_ydata = np.median(boot_this_x,axis=1) ci = np.percentile(boot_this_x,[2.5,97.5],axis=1) ci = np.array([ci]) ci = ci - train_ydata + class_predicted_train[test_index] #ci_boot = np.concatenate((ci_boot,ci),axis=0) ci_boot += [ci] #rtrt = (ci,class_y_base_predict,class_y_boot_predict,scores_boot) #ci,this_predict,class_boot,scores = rtrt class_y_boot_predict = boot_this_x - train_ydata + class_predicted_train[test_index] #predict_boot = np.concatenate((predict_boot,class_y_boot_predict),axis=0) predict_boot += [class_y_boot_predict] #scores_full = np.concatenate((scores_full,scores_valid),axis=0) scores_full += [scores_valid] ci_boot = np.concatenate(ci_boot,axis=0) predict_boot = np.concatenate(predict_boot,axis=0) scores_full = np.concatenate(scores_full,axis=0) return ci_boot,predict_boot,scores_full
[docs]def pca_bootstrap(xdata=None,ydata=None,groups=None,validdata=None, PCA_model=None,PCA_cv=None,PCA_bootstrap=None,skmodel=sklearn.decomposition.PCA,scaler=None, cv_object=None,samples=1000,PCA_kw=None,tq=True): """Conducts a residual bootstrap analysis on a set of data. Computes cross-validation uncertainty. This function is the same as :py:func:`bootstrap` but works for unsupervised models such as PCA If PLS_model is None, then PLS_cv and PLS_bootstrap are ignored. The function will create independent instances of :py:class:sk_model for each of PLS_model, PLS_cv, and PLS_boostrap. If PLS_model is not None, then it will be reused for PLS_cv and PLS_bootstrap. :key xdata: The X data used to fit the model (default None) :key ydata: The Y data used to fit the model (default None) :key PCA_model: The scikit-learn model that will be fit using X :key PCA_cv: The scikit-learn model that will be used for cross-validation :key PCA_bootstrap: The scikit-learn model that will be used for bootstrapping :key sk_model: If PLS_model,PLS_cv,or PLS_bootstrap is None, this scikit-learn model will be used to create them :key scaler: The scikit-learn preprocessing object used to preprocess the data. This will be put into a :key cv_object: The cross-validation model that will be used for calculating cross-validation statistics :key samples: The number of samples for bootstrapping :key PCA_kw: The keyword arguments that will be passed to sk_model :key return_boot: If True, returns the PLS_bootstrap model as part of the output :type xdata: ndarray :type ydata: ndarray :type PCA_model: scikit-learn estimator instance :type PCA_cv: scikit-learn estimator instance :type PCA_bootstrap: scikit-learn estimator instance :type sk_model: scikit-learn estimator class :type sk_model: scikit-learn preprocessing instance :type cv_object: scikit-learn model selection instance :type class_value: scalar :type samples: int :type PCA_kw: dict :type return_boot: Boolean """ estimator_step_name = 'PCA' scaler_step_name = 'scaler' if PCA_model is None: PCA_model = skmodel(**PCA_kw) PCA_cv = skmodel(**PCA_kw) PCA_bootstrap = skmodel(**PCA_kw) steps_model = [(estimator_step_name,PCA_model)] steps_cv = [(estimator_step_name,PCA_cv)] steps_bootstrap = [(estimator_step_name,PCA_bootstrap)] if scaler is not None: steps_model = [(scaler_step_name, scaler),(estimator_step_name,PCA_model)] steps_cv = [(scaler_step_name, scaler),(estimator_step_name,PCA_cv)] steps_bootstrap = [(scaler_step_name, scaler),(estimator_step_name,PCA_bootstrap)] pipe_model = skpipe.Pipeline(steps_model) pipe_cv = skpipe.Pipeline(steps_cv) pipe_bootstrap = skpipe.Pipeline(steps_bootstrap) tpca = pipe_model.fit(xdata) scores = pipe_model.transform(xdata) x_trans = pipe_model.inverse_transform(scores) #cross_validation #x_cv = pca_cross_validate(xdata=xdata,ydata=ydata,groups=groups,PCA_model=pipe_cv,cv_object=cv_object,PCA_kw=PCA_kw) #residual,err,mse = get_residual_stats(xdata,x_trans) #residual_cv,err_cv,msecv = get_residual_stats(xdata,x_cv) #print mse,msecv #num_train = xdata.shape[0] #pseudo_dof = num_train * (1 - np.sqrt(mse/msecv)) #bootstrap_weight = np.sqrt(1 - pseudo_dof/num_train) #print bootstrap_weight #residual_weighted = residual / bootstrap_weight #x_boot,boot_indices = pls_uncertainty.bootstrap_data(residual_weighted,samples=samples) x_boot,boot_indices = bootstrap_data(xdata,samples=samples) #x_boot,boot_indices = bootstrap_stratified(xdata,classes=ydata,samples=samples) #print boot_indices.shape #print boot_indices #print x_boot.shape #x_boot = x_boot * 2 if tq: iterate = tqdm.tqdm_notebook(zip(x_boot,boot_indices)) else: iterate = zip(x_boot,boot_indices) #scores_boot = np.empty((0,PCA_kw['n_components'])) #class_boot = np.empty((0,ydata.shape[1])) #comps_boot = np.empty((0,xdata.shape[1])) scores_boot = [] class_boot = [] comps_boot = [] components_base = pipe_model.named_steps[estimator_step_name].components_.T base_scores_boot = pipe_model.transform(xdata) #Get the scores for the base X for the base model for bt,index in iterate: #print bt.shape #print xdata.shape #print scores.shape #break x_bt = bt# xdata + bt y_bt = ydata#[index] #sc_boot = PCA_model.transform(x_bt) bpca = pipe_bootstrap.fit(x_bt) #Fit the model to the current bootstrap X #scores_boot_this = PCA_bootstrap.transform(x_bt) #Get the scores for the current bootstrap X for the bootstrap model #base_scores_boot = PCA_model.transform(x_bt) #Get the scores for the current bootstrap X for the base model scores_boot_this = pipe_bootstrap.transform(xdata) #Get the scores for the base X for the bootstrap model #print base_scores_boot.shape #print components_base.shape # This is the scores + components matrix that Procrustes must match procrustes_target = np.concatenate((base_scores_boot,components_base),axis=0) # This is the matrix that Procrustes will operate on procrustes_operand = np.concatenate((scores_boot_this,pipe_bootstrap.named_steps[estimator_step_name].components_.T),axis=0) #Procrustes the bootstrap scores matrix so that our bootstrap scores match the scores of the base model #procrustes_rotation,procrustes_scale = scipy.linalg.orthogonal_procrustes(scores_boot_this,base_scores_boot) procrustes_rotation,procrustes_scale = scipy.linalg.orthogonal_procrustes(procrustes_operand,procrustes_target) #Compute the Procrustesed scores and use that as the bootstrap scores result sc_boot = np.dot(scores_boot_this,procrustes_rotation) cp_boot = np.dot(pipe_bootstrap.named_steps[estimator_step_name].components_.T,procrustes_rotation) #print comps_boot.shape #print cp_boot.shape #if PCA_bootstrap.components_[0,0] > 0: sc_boot[0] = sc_boot[0] * -1 scores_boot += [sc_boot] class_boot += [y_bt] comps_boot += [cp_boot.T] scores_boot = np.concatenate((scores_boot),axis=0) class_boot = np.concatenate((class_boot),axis=0) comps_boot = np.concatenate((comps_boot),axis=0) return scores,scores_boot,class_boot,comps_boot
def pca_cross_validate(xdata=None,ydata=None,groups=None,PCA_model=None,cv_object=None,PCA_kw=None): """Conducts a cross-validation analysis on a set of data using an unsupervised classification algorithm """ if PCA_model is None: PCA_model = sklearn.decomposition.PCA(**PCA_kw) num_features = xdata.shape[1] #Create empty arrays for the cross_validation results x_transcv = np.transpose(np.array([[]] * num_features)) if groups is None: iterate = cv_object.split(xdata,ydata) else: iterate = cv_object.split(xdata,ydata,groups) for train_index,test_index in iterate: #Break out the training and test sets for cross-validation x_train,x_test = xdata[train_index],xdata[test_index] #Calibrate the PLS model against the CV training set and get the class assignments for the test set PCA_model.fit(x_train) scores = PCA_model.transform(x_test) x_trans = PCA_model.inverse_transform(scores) #print x_transcv.shape,x_trans.shape #print class_predicted_cv.shape #print this_y.shape x_transcv = np.concatenate((x_transcv,x_trans)) return x_transcv def _cross_validate(xdata=None,ydata=None,PLS_model=None,cv_object=None, sk_model=sklearn.cross_decomposition.PLSRegression,PLS_kw=None,class_value=0.5): """Conducts a cross-validation analysis on a set of data using a regression algorithm :param xdata: :param ydata: :param PLS_model: :param cv_object: :param PLS_kw: :param class_value: """ if PLS_model is None: PLS_model = sk_model(**PLS_kw) try: #The number of classes is equal to the number of class values supplied num_classes = len(class_value) except TypeError: #If the class value is a float, it won't have a len(), so len(class_value) returns TypeError. If we get TypeError, assume the user simply gave a single float for class_value and meant for there to be one class num_classes = 1 #Create empty arrays for the cross_validation results #class_predicted_cv = np.transpose(np.array([[]] * num_classes)) #class_assigned_cv = np.transpose(np.array([[]] * num_classes)) class_predicted_cv = [] class_assigned_cv = [] for train_index,test_index in cv_object.split(xdata,ydata): #Break out the training and test sets for cross-validation x_train,x_test = xdata[train_index],xdata[test_index] y_train,y_test = ydata[train_index],ydata[test_index] #Calibrate the PLS model against the CV training set and get the class assignments for the test set PLS_model.fit(x_train,y_train) this_assigned,this_y = class_assignment(data=x_test,PLS_model=PLS_model,class_value=class_value) #print class_predicted_cv.shape #print this_y.shape #class_predicted_cv = np.concatenate((class_predicted_cv,this_y)) #class_assigned_cv = np.concatenate((class_assigned_cv,this_assigned)) #array_dims = len(this_y.shape) #print(array_dims) #if array_dims < 2: # this_y = this_y[:,np.newaxis] # this_assigned = this_assigned[:,np.newaxis] class_predicted_cv += [this_y] class_assigned_cv += [this_assigned] class_predicted_cv = np.concatenate(class_predicted_cv) class_assigned_cv = np.concatenate(class_assigned_cv) return class_assigned_cv,class_predicted_cv
[docs]def cross_validate(xdata=None,ydata=None,PLS_model=None,cv_object=None, sk_model=sklearn.cross_decomposition.PLSRegression,PLS_kw=None,class_value=0.5): """Conducts a cross-validation analysis on a set of data using a regression algorithm. This function is essentially a pass-through to :py:class:`sklearn.model_selection.cross_val_predict`, and then does PLS-DA class assignments :key xdata: The X data used to fit the model (default None) :key ydata: The Y data used to fit the model (default None) :key PLS_model: The scikit-learn model that will be fit using X and Y :key cv_object: The cross-validation model that will be used for calculating cross-validation statistics :key sk_model: If PLS_model,PLS_cv,or PLS_bootstrap is None, this scikit-learn model will be used to create them :key PLS_kw: The keyword arguments that will be passed to sk_model :key class_value: The value separating the classes in PLS-DA :type xdata: ndarray :type ydata: ndarray :type PLS_model: scikit-learn model instance :type sk_model: scikit-learn model :type cv_object: scikit-learn model selection instance :type class_value: scalar :returns: class_assigned_cv, which is the dummy variable array in PLS-DA, and class_predicted_cv, the array of PLS predictions """ if PLS_model is None: PLS_model = sk_model(**PLS_kw) try: #The number of classes is equal to the number of class values supplied num_classes = len(class_value) except TypeError: #If the class value is a float, it won't have a len(), so len(class_value) returns TypeError. If we get TypeError, assume the user simply gave a single float for class_value and meant for there to be one class num_classes = 1 y_cv = sklearn.model_selection.cross_val_predict(PLS_model,xdata,ydata,cv=cv_object) class_assigned_cv,class_predicted_cv = class_assignment(data=y_cv,class_value=class_value) return class_assigned_cv,class_predicted_cv
def get_residual_stats(true_y,predicted_y): """Calculates the residual, the squared error, and mean squared error given a true y and model-estimated y """ residual = np.squeeze(predicted_y) - np.squeeze(true_y) # Use numpy squeeze to avoid length-1 mismatches if len(residual.shape) < 2: np.expand_dims(residual,axis=-1) err = residual ** 2 mse = err.sum() / len(true_y) return residual,err,mse def _bootstrap_uncertainty(element_number, xdata=None,ydata=None,valid_data=None, samples=1000,class_value=0.5, PLS_kw=None,return_scores=False): """Conduct a residual bootstrap uncertainty analysis based on leave-one-out cross-validation (leave one sample out, conduct bootstrap on the remaining samples, and use the predictions of the left-out sample as a measure of uncertainty) """ #Create the array mask to remove one element from the model array # this is the element whose uncertainty we are calculating mask=np.ones(ydata.shape[0],dtype=bool) mask[element_number] = False #print #Pop out the X and Y values for this element # This is the data set against which we will do PLS and bootstrap not_this_x = xdata[mask] not_this_y = ydata[mask] # This is the element whose uncertainty we are calculating using PLS and bootstrap this_x = xdata[~mask] this_y = ydata[~mask] #Create the PLS and cross-validation models this_pls = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_pls_cv = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_pls_boot = sklearn.cross_decomposition.PLSRegression(**PLS_kw) this_cv = sklearn.model_selection.KFold(n_splits=6) #Fit the base PLS model, leaving out the element of interest this_pls.fit(not_this_x,not_this_y) boot_out = simple_bootstrap(xdata=not_this_x, ydata=not_this_y, PLS_model=this_pls, PLS_cv = this_pls_cv, PLS_bootstrap=this_pls_boot, cv_object=this_cv, samples=samples, class_value=class_value) residual_cv,err_cv,msecv,class_predicted_boot,class_predicted_train = boot_out #Generate predictions for the element whose uncertainty we want class_y_boot_predict = this_pls_boot.predict(this_x) class_y_base_predict = this_pls.predict(this_x) x_score_this = this_pls_boot.transform(xdata) class_y_validation = None #Get 95% confidence intervals for this element ci = np.percentile(class_y_boot_predict,[2.5,97.5]) ci = np.array([ci]) class_y_base_predict = np.median(class_y_boot_predict) return_out = (ci,class_y_base_predict,class_y_boot_predict,) if valid_data is not None: class_y_validation = this_pls_boot.predict(valid_data) return_out += (class_y_validation,) #return ci,class_y_base_predict,class_y_boot_predict,class_y_validation if return_scores: return_out += (x_score_this,) return return_out #return ci,class_y_base_predict,class_y_boot_predict
[docs]def misclass_probability(probability_zero,misclass_mask): """Estimate the misclassification probability of a sample, which is based on the confidence level of the prediction compared to the true value. """ misclass_prob = np.zeros_like(probability_zero) for element,(p_zero,misclass) in enumerate(zip(probability_zero,misclass_mask)): if p_zero > 0.5: misclass_prob[element] = 1 - p_zero else: misclass_prob[element] = p_zero #if misclass: # misclass_prob[element] = 1 - misclass_prob[element] return misclass_prob
class bootstrap_estimator(object): def __init__(self,estimator=None,X=None,y=None,nsamples=1000,cv=None,estimator_kw={}, samples=None): self.nsamples = nsamples if samples is not None: self.nsamples = samples self.data_ = X self.ydata_ = y self.estimator_ = estimator self.cv_= cv self.boot_data_ = None self.boot_indices_ = None #if cv is None: # self.cv = sklearn.model_selection.StratifiedKFold(n_splits=6) #if type(cv) is type(int): # self.cv = sklearn.model_selection.StratifiedKFold(n_splits=cv) self.base_estimator_ = estimator(**estimator_kw) self.estimators_ = [] if estimator is not None: self.estimators_ = [estimator(**estimator_kw) for i in range(nsamples)] #for i in range(nsamples): # self.estimator_list += [estimator(**estimator_kw)] def fit(self,X=None,y=None,*args,**kwargs): if X is not None: self.data_ = X if y is not None: self.ydata_ = y if self.boot_data_ is None: self.bootstrap(samples=self.nsamples) if self.ydata_ is None: self.base_fit(self.data_,self.ydata_,*args,**kwargs) for est,data in zip(self.estimators_,self.boot_data_): est.fit(data,*args,**kwargs) else: self.base_fit(self.data_,self.ydata_,*args,**kwargs) for est,data in zip(self.estimators_,self.boot_data_): est.fit(self.data_,self.ydata_+data,*args,**kwargs) def predict(self,data=None,with_boot=False,with_median=False,*args,**kwargs): #If no data is given, use the stored x data if data is None: data = self.data_ if with_median: with_boot=True #with_median implies with_boot base_predict = self.base_estimator_.predict(data,*args,**kwargs) boot_predict = [] if not with_boot: return base_predict boot_predict = [est.predict(data,*args,**kwargs) for est in self.estimators_ ] #for est in self.estimators_: # boot_predict += [est.predict(data,*args,**kwargs)] boot_predict = np.stack(boot_predict, axis=0) if with_median: base_predict = np.median(boot_predict,axis=0) return base_predict,boot_predict def transform(self,X=None,y=None,with_boot=False,with_y=False,*args,**kwargs): #If no data is given, use the stored x data if X is None: X = self.data_ if y is not None: args = (y,) + args if with_y and y is None: args = (self.ydata_,) + args base_scores = self.base_estimator_.transform(X,*args,**kwargs) if not with_boot: return base_scores boot_scores = [] boot_scores = [est.transform(X,*args,**kwargs) for est in self.estimators_] #for est in self.estimators_: # boot_scores += [est.transform(data,*args,**kwargs)] if y is None: boot_scores = np.stack(boot_scores, axis=0) return base_scores,boot_scores else: boot_x_scores = [x for x,y in boot_scores] boot_y_scores = [y for x,y in boot_scores] boot_x_scores = np.stack(boot_x_scores, axis=0) boot_y_scores = np.stack(boot_y_scores, axis=0) return base_scores,boot_x_scores,boot_y_scores #return base_scores,boot_scores def procrustes_transform(self,data=None,*args,**kwargs): base_scores,boot_scores = self.transform(data,with_boot=True) components_base = self.base_estimator_.components_.T procrustes_target = np.concatenate((base_scores,components_base),axis=0) boot_scores_procrustes = [] for est,boot_score in zip(self.estimators_,boot_scores): procrustes_operand = np.concatenate( (boot_score,est.components_.T), axis=0) procrustes_rotation,procrustes_scale = scipy.linalg.orthogonal_procrustes( procrustes_operand, procrustes_target) boot_scores_procrustes += [np.dot(boot_score,procrustes_rotation)] boot_scores_procrustes = np.stack(boot_scores_procrustes, axis=0) return base_scores,boot_scores,boot_scores_procrustes def base_fit(self,*args,**kwargs): self.base_estimator_.fit(*args,**kwargs) def base_predict(self,*args,**kwargs): return self.base_estimator_.predict(*args,**kwargs) def bootstrap(self,*args,samples=1000,fit_params={},**kwargs): #If there is no stored y data, bootstrapping should be done on the x data (and bootstrap_weight is just 1) squeeze = False if self.ydata_ is None: data = self.data_ bootstrap_weight = 1.0 else: #If there is y data, we need to calculate cross-validation and residual statistics, as well as the bootstrap weighting #If y has a trailing singleton dimension, it should be preserved in boot_data, otherwise it should not be preserved if len(self.ydata_.shape) < 2: squeeze = True try: y_pred = self.base_predict(self.data_, *args,**fit_params) except sklearn.exceptions.NotFittedError: self.base_fit(self.data_,self.ydata_, *args,**fit_params) y_pred = self.base_predict(self.data_, *args,)#**fit_params) y_cv = self.cross_validate() residual,err,mse = get_residual_stats(self.ydata_,y_pred) residual_cv,err_cv,msecv = get_residual_stats(self.ydata_,y_cv) #Calculate the pseudo degrees of freedom and the corresponding bootstrap weighting factor num_train = len(self.ydata_) pseudo_dof = num_train * (1 - np.sqrt(mse/msecv)) bootstrap_weight = np.sqrt(1 - pseudo_dof/num_train) #If using stored y data, bootstrapping is done on the weighted residuals data = residual / bootstrap_weight self.boot_data_,self.boot_indices_ = bootstrap_data(data,samples=self.nsamples) if squeeze: self.boot_data_ = np.squeeze(self.boot_data_) def cross_validate(self,): return sklearn.model_selection.cross_val_predict(self.base_estimator_, self.data_, self.ydata_, cv=self.cv_) def bootstrap_uncertainty_bounds(self,data=None,*args,**kwargs): #If no data is given, use the stored x data if data is None: data = self.data_ ypred,ypboot = self.predict(data,with_boot=True,*args,**kwargs) y_lower = np.percentile(ypboot,2.5,axis=0) y_upper = np.percentile(ypboot,97.5,axis=0) yplus = y_upper - ypred yminus = ypred - y_lower bounds = np.squeeze(np.stack((y_lower,y_upper))) error = np.squeeze(np.stack((yminus,yplus))) return ypred,ypboot,bounds,error class _bootstrap_estimator(object): def __init__(self,estimator=None,X=None,y=None,samples=1000,cv=None,estimator_kw={}): self.samples = samples self.data = X self.ydata = y self.estimator = estimator self.cv = cv self.boot_data = None self.boot_indices = None #if cv is None: # self.cv = sklearn.model_selection.StratifiedKFold(n_splits=6) #if type(cv) is type(int): # self.cv = sklearn.model_selection.StratifiedKFold(n_splits=cv) self.base_estimator = estimator(**estimator_kw) self.estimator_list = [] if estimator is not None: for i in range(samples): self.estimator_list += [estimator(**estimator_kw)] def fit(self,X=None,y=None,*args,**kwargs): if X is not None: self.data = X if y is not None: self.ydata = y if self.boot_data is None: self.bootstrap(samples=self.samples) if self.ydata is None: self.base_fit(self.data,self.ydata,*args,**kwargs) for est,data in zip(self.estimator_list,self.boot_data): est.fit(data,*args,**kwargs) else: self.base_fit(self.data,self.ydata,*args,**kwargs) ymodel = np.squeeze(self.base_predict(self.data,*args,**kwargs)) for est,data in zip(self.estimator_list,self.boot_data): est.fit(self.data,ymodel+data,*args,**kwargs) def predict(self,data=None,with_boot=False,with_median=False,*args,**kwargs): #If no data is given, use the stored x data if data is None: data = self.data if with_median: with_boot=True #with_median implies with_boot base_predict = self.base_estimator.predict(data,*args,**kwargs) boot_predict = [] if not with_boot: return base_predict for est in self.estimator_list: boot_predict += [est.predict(data,*args,**kwargs)] boot_predict = np.stack(boot_predict, axis=0) return base_predict,boot_predict def transform(self,data=None,ydata=None,*args,**kwargs): #If no data is given, use the stored x data if data is None: data = self.data if ydata is None: if self.ydata is not None: args = (self.ydata,) + args base_scores = self.base_estimator.transform(data,*args,**kwargs) boot_scores = [] for est in self.estimator_list: boot_scores += [est.transform(data,*args,**kwargs)] boot_scores = np.stack(boot_scores, axis=0) return base_scores,boot_scores def procrustes_transform(self,data=None,*args,**kwargs): base_scores,boot_scores = self.transform(data) components_base = self.base_estimator.components_.T procrustes_target = np.concatenate((base_scores,components_base),axis=0) boot_scores_procrustes = [] for est,boot_score in zip(self.estimator_list,boot_scores): procrustes_operand = np.concatenate( (boot_score,est.components_.T), axis=0) procrustes_rotation,procrustes_scale = scipy.linalg.orthogonal_procrustes( procrustes_operand, procrustes_target) boot_scores_procrustes += [np.dot(boot_score,procrustes_rotation)] boot_scores_procrustes = np.stack(boot_scores_procrustes, axis=0) return base_scores,boot_scores,boot_scores_procrustes def base_fit(self,*args,**kwargs): self.base_estimator.fit(*args,**kwargs) def base_predict(self,*args,**kwargs): return self.base_estimator.predict(*args,**kwargs) def bootstrap(self,samples=1000,fit_params={},*args,**kwargs): #If there is no stored y data, bootstrapping should be done on the x data (and bootstrap_weight is just 1) if self.ydata is None: data = self.data bootstrap_weight = 1.0 else: #If there is y data, we need to calculate cross-validation and residual statistics, as well as the bootstrap weighting #Get the base model predictions. If the base model is not fitted, fit it first. try: y_pred = self.base_predict(self.data, *args,**fit_params) except sklearn.exceptions.NotFittedError: self.base_fit(self.data,self.ydata, *args,**fit_params) y_pred = self.base_predict(self.data, *args,)#**fit_params) y_cv = self.cross_validate() residual,err,mse = get_residual_stats(self.ydata,y_pred) residual_cv,err_cv,msecv = get_residual_stats(self.ydata,y_cv) #Calculate the pseudo degrees of freedom and the corresponding bootstrap weighting factor num_train = len(self.ydata) pseudo_dof = num_train * (1 - np.sqrt(mse/msecv)) bootstrap_weight = np.sqrt(1 - pseudo_dof/num_train) #If using stored y data, bootstrapping is done on the weighted residuals data = residual / bootstrap_weight self.boot_data,self.boot_indices = bootstrap_data(data,samples=samples) self.boot_data = np.squeeze(self.boot_data) def cross_validate(self,): return sklearn.model_selection.cross_val_predict(self.base_estimator,self.data,self.ydata,cv=self.cv) def bootstrap_uncertainty_bounds(self,data=None,*args,**kwargs): #If no data is given, use the stored x data if data is None: data = self.data ypred,ypboot = self.predict(data,with_boot=True,*args,**kwargs) y_lower = np.percentile(ypboot,2.5,axis=0) y_upper = np.percentile(ypboot,97.5,axis=0) ypm = np.median(ypboot,axis=0) yplus = y_upper - ypm yminus = ypm - y_lower bounds = np.squeeze(np.stack((y_lower,y_upper))) error = np.squeeze(np.stack((yminus,yplus))) return ypred,ypboot,bounds,error