import scipy as sp
import scipy.stats
import math
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gs
from sklearn import decomposition as skdecomp
def idfunc(*args,**kwargs):
'''Returns exactly the function's positional arguments, ignoring any keyword arguments'''
if len(args) == 1:
return args[0] #We only passed one argument to the function. We want to return arg, not (arg)
return args
def g_formatter(x):
a,b = '{:.2e}'.format(x).split('e')
b = int(b)
a = float(a)
if not(b > 2) or (b < -1):
mult = 10**b
a = a * mult
prec = min(max(2-b,0),2)
return (r'${:.'+str(prec)+'f}$').format(a)
return r'${}\times10^{{{}}}$'.format(a,b)
def calculate_minmax(data):
"""Determines the maximum and minimum (excluding zero) of a data set. Rounds to one significant figure.
"""
data_max = data.max()
data_min = data[data > 0].min()
#Find the power of 10
log_max = math.log(data_max,10)
log_min = math.log(data_min,10)
decimals_max = int(-1*math.floor(log_max))+1
decimals_min = int(-1*math.floor(log_min))
significance_max = 10 ** decimals_max
significance_min = 10 ** decimals_min
distance_max = math.ceil(data_max * significance_max)/significance_max
distance_min = math.floor(data_min * significance_min)/significance_min
return distance_min,distance_max
[docs]class ExperimentGroup(object):
"""Container for spectral data and methods for analysis
This object contains a group of spectral measurements that are related, usually by being of the same physical object and having the same structure.
:key name: The name of the group of measurements.
:key data: Array of data to be used for the interlab analysis
:key rawdata: Array of unprocessed data, if different from data
:key data_names: List of data sets (labs) with data for each sample
:key distance_metrics: List of distance metrics. Each metric in this list will be used to create a :py:class:`DistanceMetric` object.
:key distribution_function: Which distribution will be assumed when assigning Z scores to each measurement of a sample.
"""
def __init__(self,
name=None,
xdata=None,
rawdata=None,data=None,data_names=None,
distance_metrics=None,distribution=None):
self.name = name
self.xdata = xdata
self.data = data
self.data_names = data_names
self.distance_metrics = []
self.distribution = distribution
for metric in distance_metrics:
self.distance_metrics += [
DistanceMetric(distribution=distribution,**metric)
]
return
@property
def names(self):
names = {}
for item in self.distance_metrics:
names[item.name] = item
return names
def __str__(self):
str_rep = self.name
return str_rep
def __getitem__(self,x):
if type(x) is int:
return self.distance_metrics[x]
else:
try:
return self.names[x]
except KeyError:
print('No metric with name ' + x)
return
def distance(self,metric):
"""Pass-through to :py:class:`.DistanceMetric`
"""
self[metric].distance(self.data)
return
def fit_zscores(self,metric):
"""Pass-through to :py:class:`.DistanceMetric`
"""
self[metric].fit_zscores()
return
def find_outliers(self,metric,**kwargs):
"""Pass-through to :py:class:`.DistanceMetric`
"""
self[metric].find_outliers(**kwargs)
return
def histogram(self,metric,*args,**kwargs):
"""Pass-through to :py:class:`.DistanceMetric`
"""
self[metric].histogram(*args,**kwargs)
return
def plot_zscores(self,metric,*args,**kwargs):
"""Pass-through to :py:class:`.DistanceMetric`
"""
ax = self[metric].plot_zscores(*args,**kwargs)
ax.set_xlim(-0.5,len(self.data_names)-0.5)
ax.set_xticks(np.arange(len(self.data_names)))
ax.set_xticklabels(self.data_names,rotation='vertical')
return
def minmax(self,metric):
return self[metric].population.minmax()
def plot_data(self,ax,linecolor='k',**kwargs):
"""Line plot of the spectral data in this group.
"""
numsets = self.data.shape[0]
if self.xdata is not None:
pl = ax.plot(self.xdata,
self.data.T/self.data.max(axis=1)+np.array(range(numsets))-0.5/float(1),
linecolor)
if self.xdata[0] > self.xdata[1]:
ax.invert_xaxis()
else:
pl = ax.plot(self.data.T/self.data.max(axis=1)+np.array(range(numsets))-0.5/float(1),linecolor)
ax.set_ylim([-0.5,numsets-0.5])
ax.text(0.1,0.95,self.name,ha='left', va='center',color='k',size=9,
bbox=dict(facecolor='w',alpha=1,lw=0,pad=0),transform=ax.transAxes)
return
def distance_measure_plot(self,ax_row,plot_data=False,distance_metrics=None,
data_kwargs={},distance_kwargs={},show_titles=False):
"""Heatmaps of interspectral distances and (optionally) line plots of data
"""
if distance_metrics is None:
metrics_to_plot = self.distance_metrics
else:
metrics_to_plot = []
for metric in self.distance_metrics:
for name in distance_metrics:
if metric.name == name:
metrics_to_plot += [metric]
ax_dist = ax_row
if plot_data:
ax_dist = ax_row[1:]
self.plot_data(ax_row[0],**data_kwargs)
for metric,ax in zip(metrics_to_plot,ax_dist):
im = metric.distance_plot(ax,show_titles=show_titles,
**distance_kwargs)
cb = plt.colorbar(im,ax=ax,fraction=0.2)
cb.locator = plt.MaxNLocator(nbins=4,prune='lower',integer=True,min_n_ticks=3)
cb.update_ticks()
ax.set_yticks(range(len(self.data_names)))
ax.set_yticklabels([])
ax.set_xticks([])
ax_row[0].set_yticklabels(self.data_names,size=7)
[docs]class DistanceMetric(object):
"""A distance metric and the interspectral distances associated with it
:param metric: The name of the metric function
:param function: The metric function. Can be a binary function f(x,y) or a string identifying a metric recognized by :py:func:`scipy.spatial.distance.pdist`
:key distribution_function: Which distribution will be assumed when assigning Z scores to each measurement of a sample.
:key mahalanobis_dict:
"""
def __init__(self,metric,function,distribution=None,mahalanobis_dict=None):
self.name = metric
self.function = function
self.distribution = distribution
self.distance_matrix = None
self.distance_max = None
self.distance_min = None
self.population = None
self.mahalanobis_dict = mahalanobis_dict
return
@property
def zscores(self):
return self.population.zscores
@property
def values(self):
return self.population.values
@property
def outlier_mask(self):
return self.population.outlier_mask
def distance(self,data):
"""Calculates the pairwise interspectral distances and the average interspectral distance, then loads the average distances into the :py:class:`Population`.
:param data: The data for which distances will be calculated
"""
VI=None
#significant_components=data.shape[0]
#Empty containers
distance_dict = dict()
distance_square_dict = dict()
if self.function == 'mahalanobis':
if not(self.mahalanobis_dict):
raise ValueError('Must define the inverse covariance matrix if metric is mahalanobis')
VI=self.mahalanobis_dict['VI']
#significant_components=mahalanobis_dict['significant_components']
distance_condensed = sp.spatial.distance.pdist(data,self.function,VI=VI)
self.distance_matrix = sp.spatial.distance.squareform(distance_condensed)
self.population = Population(name=self.name,
distribution=self.distribution,
values=self.distance_matrix.sum(axis=1)/self.distance_matrix.shape[0]
)
self.distance_min,self.distance_max = self.population.minmax()
return
def distance_plot(self,ax,vmin=None,vmax=None,cmap='Greys',origin='lower',
aspect='equal',show_titles=False,**kwargs):
"""Plots a heatmap of the pairwise interspectral distance.
:param ax: The axis to make the plot
:key vmin: Passed to imshow
:key vmax: Passed to imshow
:key cmap: Passed to imshow
:key origin: Passed to imshow
:key aspect: Passed to imshow
:key show_titles:
"""
if vmin is None: vmin,vmax = self.minmax()
if show_titles:
ax.set_title(self.name.replace(' ','\n'),size=10)
im = ax.imshow(self.distance_matrix,cmap=cmap,origin=origin,
vmin=vmin,vmax=vmax,aspect=aspect,**kwargs
)
return im
def fit_zscores(self,**kwargs):
"""Pass-through to :py:class:`.Population`
"""
self.population.fit_zscores(**kwargs)
return
def find_outliers(self,recursive=False,**kwargs):
"""Pass-through to :py:class:`.Population`
"""
self.population.find_outliers(recursive=recursive,**kwargs)
if recursive:
self.distance_min,self.distance_max = calculate_minmax(self.values[self.outlier_mask])
return
def histogram(self,*args,**kwargs):
"""Pass-through to :py:class:`.Population`
"""
self.population.histogram(*args,**kwargs)
return
def plot_zscores(self,*args,**kwargs):
"""Pass-through to :py:class:`.Population`
"""
ax = self.population.plot_zscores(*args,**kwargs)
return ax
def minmax(self):
return calculate_minmax(self.distance_matrix)
[docs]class Population(object):
"""Object containing some values, a distribution function fit to those values, and corresponding scores
:param name: The name assigned to this Population
:key distribution: Which distribution will be assumed when assigning Z scores to laboratories.
:key values: The values to which the distribution will be fit
"""
def __init__(self,name,distribution=None,values=None):
self.name = name
self.values = values
self.distribution = distribution
self.params = None
self.zscores = None
self.outlier_mask = None
return
def fit_zscores(self,data=None,mask=None):
"""Fit the distribution to the data using a provided mask and calculate the scores of the data
:key data: The data that will be fit to the distribution. Normally, this is the values of the Population
:key mask: If present, masks some elements of data
"""
if data is None: data = self.values
if mask is None: mask = np.ones_like(data,dtype=np.bool)
std = self.distribution(1)
self.params = self.distribution.fit(data[mask],floc=0)
rv = self.distribution(*self.params)
CDF = rv.cdf(data)
self.zscores = std.ppf(CDF)
#Explicitly calculate Z if the distribution is lognorm, because this will give more reliable results for large Z
if type(self.distribution) is type(sp.stats.lognorm):
mean = self.params[2]
stdev = self.params[0]
self.zscores = np.exp( (np.log(data) - np.log(mean)) / stdev)
#self.outlier_mask = self.zscores < zlimit
return
def find_outliers(self,recursive=False,support_fraction=0.6,final_screen=False):
"""Finds the outliers of the distribution
:key recursive: If true, finds outliers and refit until all outliers have been removed
:key support_fraction: Fraction of values that are guaranteed to be retained
"""
max_percentile = 0.9495 #95% confidence limint, respecting support fraction (respecting 3 sig figs rounding)
max_ignore_support_fraction = 0.9895 #99% confidence limit, ignoring support fraction (3 sig fig rounding)
std = self.distribution(1)
zlimit = std.ppf(max_percentile)
zlimit_ignore = std.ppf(max_ignore_support_fraction)
self.fit_zscores(self.values)
max_num_outliers = int((1-support_fraction)*len(self.values))
self.outlier_mask = np.ones_like(self.values,dtype=np.bool)
# if not recursive:
# for i in range(max_num_outliers):
# self.outlier_mask = self.zscores < zlimit #If not recursive, just get all outliers and leave
# return
# Note that, because we always use a support fraction, we always want to remove outliers one-by-one
for i in range(max_num_outliers):
zmax = self.zscores[self.outlier_mask].max() #Get the biggest z-score still considered in the distribution
if zmax < zlimit: break #If the biggest z-score is within the zlimit, break from the loop
self.outlier_mask = self.zscores < zmax
#Recalculate z-scores only if recursive
if recursive: self.fit_zscores(self.values,self.outlier_mask)
if final_screen:
self.outlier_mask = self.zscores < zlimit_ignore
if recursive: self.fit_zscores(self.values,self.outlier_mask)
return
def minmax(self):
return calculate_minmax(self.values)
def histogram(self,ax,num_bins=20,max_bin=None,min_bin=None,xvals=None):
"""Plots a histogram of the populations `values` with the corresponding distribution function
"""
if max_bin is None: min_bin,max_bin=self.minmax()
if xvals is None: xvals = np.linspace(min_bin,max_bin,num_bins*10)
binsize = (max_bin - min_bin)/float(num_bins)
hist_norm,bins = np.histogram(self.values,range=(min_bin,max_bin), bins=num_bins,density=True)
dist_vals = self.distribution.pdf(xvals,*self.params)
confidence_limit = self.distribution.ppf(0.9495,*self.params)
ax.plot(xvals,dist_vals,'k')
ax.bar(bins[:-1]+0.5*binsize,hist_norm,binsize,color='r')
ax.axvline(x=confidence_limit,color='k',ls=':')
return
def plot_zscores(self,ax,rotation=None):
"""Plots a bar chart of the populations `values` and annotates it with the corresponding zscores
"""
width = 0.4
#Get the number of data sets and number of spectra
numsets = len(self.zscores)
bar_colors = ['w'] * numsets
bar_edge = ['k'] * numsets
box_trans = [0.7] * numsets
for int_num,inlier in enumerate(self.outlier_mask):
#Color and transparency if the point is an outlier
if not(inlier):
bar_colors[int_num] = 'r'
bar_edge[int_num] = 'r'
box_trans[int_num] = 0
#bar chart of projected distances
ax.bar(np.arange(numsets),self.values,
2*width,alpha=1,color=bar_colors,edgecolor=bar_edge)
(ymin,ymax) = ax.get_ylim()
sample_label_pos = 0.05*ymax + 0.95*ymin
for int_num,(zscore,value,inlier) in enumerate(zip(self.zscores,self.values,self.outlier_mask)):
if(inlier):
label_pos = sample_label_pos + value
else:
label_pos = sample_label_pos
ax.text(int_num,label_pos,
g_formatter(zscore),
ha='center', va='bottom',
color='k',size=9,rotation=rotation,
bbox=dict(facecolor='w',alpha=box_trans[int_num],lw=0)
)
return ax
[docs]class InterlabArray(object):
"""Object for executing interlaboratory comparison
:key name: Name for the interlab object
:key data_array: Z-score array for the measurements in this interlaboratory study
:key lablist: List of laboratories that have data in this interlab
:key datasets: List of datasets that are in this interlab
:key distribution_function: Which distribution will be assumed when assigning Z scores to laboratories. The default is sp.stats.lognorm
"""
def __init__(self,
name=None,
data_array=None,
lablist=None,
datasets=None,
distribution_function=sp.stats.lognorm,):
#Data array that will be fit
self.data_array = data_array
#Distribution function to fit the zscores to
self.distribution_function = distribution_function
#Metric name
self.name = name
#PCA object
self.pca = skdecomp.PCA()
self.pca_scores = None
self.population = Population('Projected statistical distance',
distribution=self.distribution_function)
return
def __str__(self):
return self.name
@property
def zscores(self):
return self.population.zscores
@property
def values(self):
return self.population.values
@property
def outlier_mask(self):
return self.population.outlier_mask
def fit_transform(self):
"""Fits the sklearn.pca object and then removes the mean-centering. Calculates projected statistical distance and stores that in :py:class:`Population`
"""
#Fit PCA
self.pca.fit(self.data_array.T)
#Find where the zero point of our z-score space lies in PCA space
effective_zero = self.pca.transform(np.zeros_like(self.pca.mean_.reshape(1,-1)))
#Transform the scores into the translated PCA space
self.pca_scores = self.pca.transform(self.data_array.T) - effective_zero
#Get the active components (minimum number of components that explains at least 95% of variance)
#self.active_components = self.pca.explained_variance_ratio_.cumsum() < 0.95
self.active_components = self.pca.explained_variance_ratio_ > 0.01
# ^^^ this definition actually has one fewer component than what we want so we have to do some manipulation
#print(self.active_components)
self.active_components[1] = True #Obviously there should be at least one active component
#self.active_components = np.roll(self.active_components,1) #Move the active component mask one element. Combined with the statement above, this ensures that the second component will be active, and we want two active components.
#self.active_components[0] = True #Obviously the first component should be active
#print(self.active_components)
#Store the projected statistical distance
num_active_components = np.count_nonzero(self.active_components)
self.population.values = np.linalg.norm(self.pca_scores[:,0:num_active_components],axis=1)
return
#def plot_zscore_heatmap(self):
def fit_zscores(self,**kwargs):
"""Pass-through for :py:class:`.Population.fit_zscores`
"""
#Fit the projected statistical distance
self.population.fit_zscores(**kwargs)
return
def find_outliers(self,**kwargs):
"""Pass-through for :py:class:`.Population.find_outliers`
"""
self.population.find_outliers(**kwargs)
return
def plot_components(self,ax):
"""Plots the principal component loadings for the statistical distances
:param ax: A list of the distance metrics for which loadings will be plotted. If None, plot loadings for all metrics in this project
:key xlabel_buffer: Space allocated for x-axis labels (in inches)
"""
PCAsign = np.sign(self.pca.components_[0,:].mean())
#Get the number of active components in the PCA
num_active_components = np.count_nonzero(self.active_components)
#print(self.name,num_active_components,self.active_components)
#Calculate the positions where the bars will go
total_width = 0.4 # total width available for all components
width = total_width/num_active_components #width allocated to each component
positions = np.arange(-total_width,total_width,2*width) #list of offsets for all components
#Create the spectrum of colors that will be used for the plot
bar_properties = np.arange(1,0,-1.0/num_active_components)
#Get the number of data sets and number of spectra
numsets = len(self.lablist)
numspecs = len(self.datasets)
#Plot the principal components
for component_num in range(num_active_components):
#intensity = bar_properties[component_num]
#Intensity corresponds to the explained variance ratio
intensity = 1 - self.pca.explained_variance_ratio_[component_num]
#Edges should be darker to provide contrast for lighter bars
edge_intensity = (1 - self.pca.explained_variance_ratio_[component_num])/1.3
bar_colors = [[intensity] * 3] * len(self.lablist)
edge_colors = [[edge_intensity] * 3] * len(self.lablist)
position = positions[component_num]
im = ax.bar(np.arange(len(self.datasets))+position,
self.pca.components_[component_num,:]*PCAsign,
2*width,
alpha=1,color=bar_colors,edgecolor=edge_colors)
#X tick labels
ax.set_xlim(-1,numspecs + 2) # We'll want 2 labels for the explained variance ratios
ax.set_xticks(range(numspecs))
ax.set_xticklabels(self.datasets,rotation='vertical')
#Draw dividing lines between components
for spec_num,spectrum in enumerate(self.datasets):
ax.axvline(x=spec_num+0.5,color='k',ls=':')
#Print the explained variance ratios
#(ymin,ymax) = ax.get_ylim()
#center = (ymax+ymin)/2.0
#interval = (ymin-ymax)/(num_active_components + 1)
#positions = np.arange(ymax+interval,ymin,interval)
interval = -1/(num_active_components + 1)
positions = np.arange(1+interval,0,interval)
for component_number in range(num_active_components):
component_label_position = positions[component_number]
component_horizontal_position = len(self.datasets)/(len(self.datasets) + 1)
componentscore = "%4.1f" % (self.pca.explained_variance_ratio_[component_number] * 100)
componentscore_string = componentscore + " %"
ax.text(component_horizontal_position,component_label_position,componentscore_string,
ha='center', va='center',transform=ax.transAxes)
return
def plot_zscores(self,*args,**kwargs):
"""Plots the projected statistical distances annotated with the corresponding laboratory-level Z scores. Pass-through to :py:class:Population.plot_zscores.
"""
ax = self.population.plot_zscores(*args,**kwargs)
ax.set_xlim(-0.5,len(self.lablist)-0.5)
ax.set_xticks(np.arange(len(self.lablist)))
ax.set_xticklabels(self.lablist,rotation='vertical')
return
def plot_outliers(self,ax,y_component=1):
"""Plots the principal component scores for each lab along with the final distribution used to calculate the outliers
:param ax: The axis on which the plot will be made
:key y_component: Which principal component to use on the Y axis, if not the first
"""
zscores_projected = self.pca_scores
z_mask = self.outlier_mask
#Create the scatterplot, blue for outliers and red for inliers
scatterplot = ax.scatter(self.pca_scores[z_mask,0],
self.pca_scores[z_mask,y_component],
c='r',edgecolors='none',s=60,zorder=3)
scatterplot = ax.scatter(self.pca_scores[np.invert(z_mask),0],
self.pca_scores[np.invert(z_mask),y_component],
c='b',edgecolors='none',s=60,zorder=4)
#Get the x and y limits of the plot
(xmin,xmax) = ax.get_xlim()
(ymin,ymax) = ax.get_ylim()
#Make the grid
numgrid = 60
xstep = (xmax-xmin)/numgrid
ystep = (ymax-ymin)/numgrid
xdata = np.arange(xmin,xmax,xstep)
ydata = np.arange(ymin,ymax,ystep)
xx,yy = np.meshgrid(xdata,ydata)
dist = np.sqrt(xx ** 2 + yy ** 2)
rv = self.population.distribution(*self.population.params)
std = self.population.distribution(1)
pdf_for_plot = std.ppf(rv.cdf(dist))
levelsc = [1, 2, 3, 4, 5]
levelsf = [0.25, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5]
levelsc = list(np.arange(1,std.ppf(0.95))) + [std.ppf(0.95)]
levelsf = [0.25] + list(np.arange(0.5,std.ppf(0.95),0.5)) + [std.ppf(0.95)]
cformats = ['{:1.0f}'] * (len(levelsc) - 1)
cformats += ['{:3.1f}']
cformats_dict = {x:fmt_str.format(x) for x,fmt_str in zip(levelsc,cformats) }
ccolors = ['w'] * int(len(levelsc)/2)
ccolors += ['k'] * int( (len(levelsc) + 1)/2)
ctr1 = ax.contourf(xx,yy,pdf_for_plot,extend='min',cmap='Greys_r',levels=levelsf,zorder=1)
ctr2 = ax.contour(xx,yy,pdf_for_plot,extend='neither',colors=ccolors,levels=levelsc,zorder=2)
contourlabels = plt.clabel(ctr2,fmt=cformats_dict,colors=ccolors)
component1score = "%5.2f" % (self.pca.explained_variance_ratio_[0] * 100)
component2score = "%5.2f" % (self.pca.explained_variance_ratio_[y_component] * 100)
xaxislabel = 'Component 1: ' + component1score + " %"
yaxislabel = 'Component ' + str(y_component + 1) +': ' + component2score + " %"
ax.set_xlabel(xaxislabel,size=20)
ax.set_ylabel(yaxislabel,size=20)
for j,labeltext in enumerate(self.lablist):
if not z_mask[j]:
ax.text(self.pca_scores[j,0]+1.0,self.pca_scores[j,y_component],
labeltext,
ha='left',va='center',
size='9',
bbox = dict(facecolor='w'))