Source code for pymcr.constraints

"""
Built-in constraints

All classes need a transform class. Note, unlike sklearn, transform can copy
or overwrite input depending on copy attribute.
"""

from abc import (ABC as _ABC, abstractmethod as _abstractmethod)

import numpy as _np

__all__ = ['Constraint', 'ConstraintNonneg', 'ConstraintCumsumNonneg',
           'ConstraintZeroEndPoints', 'ConstraintZeroCumSumEndPoints',
           'ConstraintNorm', 'ConstraintCutBelow', 'ConstraintCutAbove',
           'ConstraintCompressBelow', 'ConstraintCutAbove',
           'ConstraintCompressAbove', 'ConstraintReplaceZeros',
           'ConstraintPlanarize']


[docs]class Constraint(_ABC): """ Abstract class for constraints Parameters ---------- copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, copy=True): self.copy = copy
[docs] @_abstractmethod def transform(self, A): """ Transform A input based on constraint """
[docs]class ConstraintNonneg(Constraint): """ Non-negativity constraint. All negative entries made 0. Parameters ---------- copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, copy=False): """ A must be non-negative""" super().__init__(copy)
[docs] def transform(self, A): """ Apply nonnegative constraint""" if self.copy: return A*(A > 0) else: A *= (A > 0) return A
[docs]class ConstraintCumsumNonneg(Constraint): """ Cumulative-Summation non-negativity constraint. All negative entries made 0. Parameters ---------- copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, axis=-1, copy=False): """ A must be non-negative""" super().__init__(copy) self.axis = axis
[docs] def transform(self, A): """ Apply cumsum nonnegative constraint""" if self.copy: return A*(_np.cumsum(A, self.axis) > 0) else: A *= (_np.cumsum(A, self.axis) > 0) return A
[docs]class ConstraintZeroEndPoints(Constraint): """ Enforce the endpoints (or the mean over a range) is zero Parameters ---------- copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) axis : int Axis to operate on span : int Number of pixels along the ends to average. """ def __init__(self, axis=-1, span=1, copy=False): """ A must be non-negative""" super().__init__(copy) if [0, 1, -1].count(axis) != 1: raise TypeError('Axis must be 0, 1, or -1') self.axis = axis self.span = span
[docs] def transform(self, A): """ Apply cumsum nonnegative constraint""" pix_vec = _np.arange(A.shape[self.axis]) if (self.axis == 0): if self.span == 1: slope = (A[-1, :] - A[0, :]) / (pix_vec[-1] - pix_vec[0]) intercept = A[0, :] else: slope = ((A[-self.span:, :].mean(axis=0) - A[:self.span, :].mean(axis=0)) / (pix_vec[-self.span:].mean() - pix_vec[:self.span].mean())) intercept = (A[:self.span, :] - _np.dot(pix_vec[:self.span, None], slope[None, :])).mean(axis=0) if self.copy: return A - _np.dot(pix_vec[:, None], slope[None, :]) - intercept[None, :] else: A -= (_np.dot(pix_vec[:, None], slope[None, :]) + intercept[None, :]) return A else: if self.span == 1: slope = (A[:, -1] - A[:, 0]) / (pix_vec[-1] - pix_vec[0]) intercept = A[:, 0] else: slope = ((A[:, -self.span:].mean(axis=1) - A[:, :self.span].mean(axis=1)) / (pix_vec[-self.span:].mean() - pix_vec[:self.span].mean())) intercept = (A[:, :self.span] - _np.dot(slope[:, None], pix_vec[None, :self.span])).mean(axis=1) if self.copy: return A - _np.dot(slope[:, None], pix_vec[None, :]) - intercept[:, None] else: A -= (_np.dot(slope[:, None], pix_vec[None, :]) + intercept[:, None]) return A
[docs]class ConstraintZeroCumSumEndPoints(Constraint): """ Enforce the endpoints of the cumsum (or the mean over a range) is near-zero. Note: this is an approximation. Parameters ---------- copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) nodes : list of int In addition to end-points, other points to ensure are approximately 0 axis : int Axis to operate on span : int Number of pixels along the ends to average. """ def __init__(self, nodes=None, axis=-1, copy=False): """ A must be non-negative""" super().__init__(copy) self.nodes = nodes if [0, 1, -1].count(axis) != 1: raise TypeError('Axis must be 0, 1, or -1') self.axis = axis
[docs] def transform(self, A): """ Apply cumsum nonnegative constraint""" meaner = A.mean(self.axis) if self.nodes: self.nodes = set(self.nodes) self.nodes.update({0, A.shape[self.axis]}) self.nodes.discard(-1) self.nodes = list(self.nodes) if (self.axis == 0): if self.copy: temp = 1.0*A for num in range(len(self.nodes) - 1): n0 = self.nodes[num] n1 = self.nodes[num+1] temp[n0:n1, :] -= temp[n0:n1, :].mean(self.axis)[None, :] return temp else: for num in range(len(self.nodes) - 1): n0 = self.nodes[num] n1 = self.nodes[num+1] A[n0:n1, :] -= A[n0:n1, :].mean(self.axis)[None, :] return A else: if self.copy: temp = 1.0*A for num in range(len(self.nodes) - 1): n0 = self.nodes[num] n1 = self.nodes[num+1] temp[:, n0:n1] -= temp[:, n0:n1].mean(self.axis)[:, None] return temp else: for num in range(len(self.nodes) - 1): n0 = self.nodes[num] n1 = self.nodes[num+1] A[:, n0:n1] -= A[:, n0:n1].mean(self.axis)[:, None] return A else: if (self.axis == 0): if self.copy: return A - meaner[None, :] else: A -= meaner[None, :] return A else: if self.copy: return A - meaner[:, None] else: A -= meaner[:, None] return A
[docs]class ConstraintNorm(Constraint): """ Normalization constraint. Parameters ---------- axis : int Which axis of input matrix A to apply normalization acorss. fix : list Keep fix-axes as-is and normalize the remaining axes based on the residual of the fixed axes. set_zeros_to_feature : int Set all samples which sum-to-zero across axis to 1 for a particular feature (See Notes) copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) Notes ----- - For set_zeros_to_feature, assuming the data represents concentration with a matrix [n_samples, n_features] and the axis is across the features, for every sample that sums to 0 across axis, would be replaced with a vector [n_features] of zeros except at set_zeros_to_feature, which would equal 1. I.e., this pixel is now pure substance of index value set_zeros_to_feature. """ def __init__(self, axis=-1, fix=None, copy=False): """Normalize along axis""" super().__init__(copy) if fix is None: self.fix = fix elif isinstance(fix, int): self.fix = [fix] elif isinstance(fix, (list, tuple)): self.fix = fix elif isinstance(fix, _np.ndarray): if _np.issubdtype(fix.dtype, _np.integer): self.fix = fix.tolist() else: raise TypeError('ndarrays must be of dtype int') else: raise TypeError('Parameter fix must be of type None, int, list,', 'tuple, ndarray') if not ((axis == 0) | (axis == 1) | (axis == -1)): raise ValueError('Axis must be 0,1, or -1') self.axis = axis
[docs] def transform(self, A): """ Apply normalization constraint """ if self.copy: if self.axis == 0: if not self.fix: # No fixed axes return A / A.sum(axis=self.axis)[None, :] else: # Fixed axes fix_locs = self.fix not_fix_locs = [v for v in _np.arange(A.shape[0]).tolist() if self.fix.count(v) == 0] scaler = _np.ones(A.shape) div = A[not_fix_locs, :].sum(axis=0)[None, :] div[div == 0] = 1 scaler[not_fix_locs, :] = ((1 - A[fix_locs, :].sum(axis=0)[None, :]) / div) return A * scaler else: # Axis = 1 / -1 if not self.fix: # No fixed axes return A / A.sum(axis=self.axis)[:, None] else: # Fixed axis fix_locs = self.fix not_fix_locs = [v for v in _np.arange(A.shape[-1]).tolist() if self.fix.count(v) == 0] scaler = _np.ones(A.shape) div = A[:, not_fix_locs].sum(axis=-1)[:, None] div[div == 0] = 1 scaler[:, not_fix_locs] = ((1 - A[:, fix_locs].sum(axis=-1)[:, None]) / div) return A * scaler else: # Overwrite original data if A.dtype != _np.float: raise TypeError('A.dtype must be float for', 'in-place math (copy=False)') if self.axis == 0: if not self.fix: # No fixed axes A /= A.sum(axis=self.axis)[None, :] return A else: # Fixed axes fix_locs = self.fix not_fix_locs = [v for v in _np.arange(A.shape[0]).tolist() if self.fix.count(v) == 0] scaler = _np.ones(A.shape) div = A[not_fix_locs, :].sum(axis=0)[None, :] div[div == 0] = 1 scaler[not_fix_locs, :] = ((1 - A[fix_locs, :].sum(axis=0)[None, :]) / div) A *= scaler return A else: # Axis = 1 / -1 if not self.fix: # No fixed axes A /= A.sum(axis=self.axis)[:, None] return A else: # Fixed axis fix_locs = self.fix not_fix_locs = [v for v in _np.arange(A.shape[-1]).tolist() if self.fix.count(v) == 0] scaler = _np.ones(A.shape) div = A[:, not_fix_locs].sum(axis=-1)[:, None] div[div == 0] = 1 scaler[:, not_fix_locs] = ((1 - A[:, fix_locs].sum(axis=-1)[:, None]) / div) A *= scaler return A
[docs]class ConstraintReplaceZeros(Constraint): """ Samples that sum-to-zero across axis are replaced with a vector of 0's except for a 1 at feature if a single value. In a concentration context, e.g., samples with no concentration are replaced with 100% concentration of a set feature. If multiple features given, equal amounts of each feature (summing to 1) are used. Parameters ---------- axis : int Which axis of input matrix A to apply normalization acorss. feature : int, list, tuple Set all samples which sum-to-zero across axis to fval for a particular feature (or fractional) for multiple features. fval : float Value of summation across axis of replacement vector. copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, axis=-1, feature=None, fval=1, copy=False): """Replace sum-to-zero samples with new feature vector along axis""" super().__init__(copy) self.fval = fval if feature is None: self.feature = feature elif isinstance(feature, int): self.feature = [feature] elif isinstance(feature, (list, tuple)): self.feature = feature elif isinstance(feature, _np.ndarray): if _np.issubdtype(feature.dtype, _np.integer): self.feature = feature.tolist() else: raise TypeError('ndarrays must be of dtype int') else: raise TypeError('Parameter feature must be of type None, int, list, tuple, ndarray') if not ((axis == 0) | (axis == 1) | (axis == -1)): raise ValueError('Axis must be 0,1, or -1') self.axis = axis
[docs] def transform(self, A): """ Apply constraint """ if self.feature: replacement = _np.zeros(A.shape[self.axis]) replacement[self.feature] = self.fval replacement /= replacement.sum() replacement *= self.fval if self.copy: A_out = 1*A if self.axis == 0: A_out[:, A_out.sum(axis=0) == 0] = replacement[:, None] else: # Axis 1 / -1 A_out[A_out.sum(axis=-1) == 0] = replacement return A_out else: if self.axis == 0: A[:, A.sum(axis=0) == 0] = replacement[:, None] else: # Axis 1 / -1 A[A.sum(axis=-1) == 0] = replacement return A else: return A
class _CutExclude(Constraint): """ Parent class for methods that cut and can exclude Parameters ---------- value : float Cutoff value axis_sumnz : int If not None, cut below value only applied where sum across specified axis does not go to 0, i.e. all values cut. exclude : int, list , tuple, ndarray Exclude targets exclude_axis : int Along which axis to enumerate targets copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, value=0, axis_sumnz=None, exclude=None, exclude_axis=-1, copy=False): """ """ super().__init__(copy) self.value = value self.axis = axis_sumnz self.exclude = exclude self.exclude_axis = exclude_axis self._excl_mat = None def _make_excl_mat(self, A_shape): X, Y = _np.meshgrid(_np.arange(A_shape[1]), _np.arange(A_shape[0])) if self.exclude is None: self._excl_mat = _np.zeros(X.shape, dtype=_np.bool) else: if self.exclude_axis == 0: self._excl_mat = _np.in1d(Y.ravel(), self.exclude).reshape(Y.shape) else: self._excl_mat = _np.in1d(X.ravel(), self.exclude).reshape(X.shape)
[docs]class ConstraintCutBelow(_CutExclude): """ Cut values below (and not-equal to) a certain threshold. Parameters ---------- value : float Cutoff value axis_sumnz : int If not None, cut below value only applied where sum across specified axis does not go to 0, i.e. all values cut. exclude : int, list , tuple, ndarray Exclude targets exclude_axis : int Along which axis to enumerate targets copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, value=0, axis_sumnz=None, exclude=None, exclude_axis=-1, copy=False): """ Initialize """ super().__init__(value=value, axis_sumnz=axis_sumnz, exclude=exclude, exclude_axis=exclude_axis, copy=copy)
[docs] def transform(self, A): """ Apply cut-below value constraint""" if self._excl_mat is None: self._make_excl_mat(A.shape) if self.axis is None: if self.copy: return A*((A >= self.value) | self._excl_mat) else: A *= ((A >= self.value) | self._excl_mat) return A else: if self.copy: return A*(_np.alltrue(A < self.value, axis=self.axis, keepdims=True) + (A >= self.value) + self._excl_mat) else: A *= (_np.alltrue(A < self.value, axis=self.axis, keepdims=True) + (A >= self.value) + self._excl_mat) return A
[docs]class ConstraintCompressBelow(Constraint): """ Compress values below (and not-equal to) a certain threshold (set to value) Parameters ---------- value : float Cutoff value copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, value=0, copy=False): """ """ super().__init__(copy) self.value = value
[docs] def transform(self, A): """ Apply compress-below value constraint""" if self.copy: return A*(A >= self.value) + self.value*(A < self.value) else: temp = self.value*(A < self.value) A *= (A >= self.value) A += temp return A
[docs]class ConstraintCutAbove(_CutExclude): """ Cut values above (and not-equal to) a certain threshold Parameters ---------- value : float Cutoff value axis_sumnz : int If not None, cut above value only applied where sum across specified axis does not go to 0, i.e. all values cut. exclude : int, list , tuple, ndarray Exclude targets exclude_axis : int Along which axis to enumerate targets copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, value=0, axis_sumnz=None, exclude=None, exclude_axis=-1, copy=False): """ """ super().__init__(value=value, axis_sumnz=axis_sumnz, exclude=exclude, exclude_axis=exclude_axis, copy=copy)
[docs] def transform(self, A): """ Apply cut-above value constraint""" if self._excl_mat is None: self._make_excl_mat(A.shape) if self.axis is None: if self.copy: return A*((A <= self.value) | self._excl_mat) else: A *= ((A <= self.value) | self._excl_mat) return A else: if self.copy: return A*(_np.alltrue(A > self.value, axis=self.axis, keepdims=True) + (A <= self.value) + self._excl_mat) else: A *= (_np.alltrue(A > self.value, axis=self.axis, keepdims=True) + (A <= self.value) + self._excl_mat) return A
[docs]class ConstraintCompressAbove(Constraint): """ Compress values above (and not-equal to) a certain threshold (set to value) Parameters ---------- value : float Cutoff value copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) """ def __init__(self, value=0, copy=False): """ """ super().__init__(copy) self.value = value
[docs] def transform(self, A): """ Apply compress-above value constraint""" if self.copy: return A*(A <= self.value) + self.value*(A > self.value) else: temp = self.value*(A > self.value) A *= (A <= self.value) A += temp return A
[docs]class ConstraintPlanarize(Constraint): """ Set a particular target to a plane Parameters ---------- target : int, list, tuple Target numbers to set to a fitted plane shape : tuple, list Shape of array (M,N) which is (Y,X) use_vals_above : float Only calculate based on values above (not including) use_vals_below : float Only calculate based on values below (not including) lims_to_plane : bool The returned plane will be limited to the range of the optionally supplied use_vals_below, use_vals above. scaler : float A large value that is much bigger than any values in the input array. Needed to ensure SVD properly creates plane. If None, auto-calculates. recalc_scaler : bool Auto-calculate for every new input (does not use previously provided or calculated value) copy : bool Make copy of input data, A; otherwise, overwrite (if mutable) Notes ----- - This uses an SVD to calculate the vector normal to the plane that fits the input data. It assumes that the 3rd singular vector is the normal; thus, the x and y vectors for the data need be larger than the variance of the input data. Scaler enables this by scaling the auto-generated x and y vectors to be much larger than the max-min of the input data """ def __init__(self, target, shape, use_vals_above=None, use_vals_below=None, lims_to_plane=True, scaler=None, recalc_scaler=False, copy=False): super().__init__(copy) if isinstance(target, int): self.target = [target] elif isinstance(target, (list, tuple, _np.ndarray)): self.target = target else: raise TypeError('target must be an int, list, 2D ndarray, or tuple') self.shape = shape self.copy = copy self.scaler = scaler self.recalc = recalc_scaler self.use_above = use_vals_above self.use_below = use_vals_below self.lims_to_plane = lims_to_plane self._x = None self._y = None self._X = None self._Y = None if scaler is not None: self._setup_xy(scaler)
[docs] def _setup_xy(self, scaler): self.scaler = scaler self._x = scaler*_np.arange(self.shape[1], dtype=_np.float) self._y = scaler*_np.arange(self.shape[0], dtype=_np.float) self._X, self._Y = _np.meshgrid(self._x, self._y) self._X = self._X.ravel() self._Y = self._Y.ravel()
[docs] def transform(self, A): """ Set targets, t, to fit planes """ if (self.scaler is None) | (self.recalc): self._setup_xy(1e3 * _np.abs(A.max() - A.min())) if self.copy: A2 = 1*A for t in self.target: X2 = 1*self._X Y2 = 1*self._Y Z2 = 1*A[:, t] Stack = _np.vstack((X2, Y2, Z2)) if self.use_above is not None: X2 = X2[Z2 > self.use_above] Y2 = Y2[Z2 > self.use_above] Z2 = Z2[Z2 > self.use_above] Stack = _np.vstack((X2, Y2, Z2)) if self.use_below is not None: X2 = X2[Z2 < self.use_below] Y2 = Y2[Z2 < self.use_below] Z2 = Z2[Z2 < self.use_below] Stack = _np.vstack((X2, Y2, Z2)) Stack = Stack - Stack.mean(axis=-1)[:, None] U, s, Vh = _np.linalg.svd(Stack, full_matrices=False) norm_to_plane = 1*U[:, -1] plane = (((-norm_to_plane[0] * (self._X - X2.mean())) - (norm_to_plane[1] * (self._Y - Y2.mean()))) / norm_to_plane[2]) + Z2.mean() if self.lims_to_plane: if self.use_above is not None: plane[plane < self.use_above] = self.use_above if self.use_below is not None: plane[plane > self.use_below] = self.use_below if not self.copy: A[:, t] = plane else: A2[:, t] = plane if self.copy: return A2 else: return A
if __name__ == '__main__': # pragma: no cover A = _np.array([[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 10]]).astype(_np.float) A_transform = _np.array([[1, 2, 3, 4], [4, 0, 0, 0], [0, 0, 0, 0]]).astype(_np.float) constr = ConstraintCutAbove(copy=True, value=4) out = constr.transform(A) from numpy.testing import assert_allclose as _assert_allclose _assert_allclose(out, A_transform)