""" MCR Main Class for Computation"""
import sys as _sys
import numpy as _np
import logging as _logging
from pymcr.regressors import OLS, NNLS
from pymcr.constraints import ConstraintNonneg
from pymcr.metrics import mse
# create logger for mcr.py and set default level
_logger = _logging.getLogger(__name__)
_logger.setLevel(_logging.INFO)
[docs]class McrAR:
"""
Multivariate Curve Resolution - Alternating Regression
D = CS^T
Parameters
----------
c_regr : str, class
Instantiated regression class (or string, see Notes) for calculating
the C matrix
st_regr : str, class
Instantiated regression class (or string, see Notes) for calculating
the S^T matrix
c_fit_kwargs : dict
kwargs sent to c_regr.fit method
st_fit_kwargs : dict
kwargs sent to st_regr.fit method
c_constraints : list
List of constraints applied to calculation of C matrix
st_constraints : list
List of constraints applied to calculation of S^T matrix
max_iter : int
Maximum number of iterations. One iteration calculates both C and S^T
err_fcn : function
Function to calculate error/differences after each least squares
calculation (ie twice per iteration). Outputs to err attribute.
tol_increase : float
Factor increase to allow in err attribute. Set to 0 for no increase
allowed. E.g., setting to 1.0 means the err can double per iteration.
tol_n_increase : int
Number of consecutive iterations for which the err attribute can
increase
tol_err_change : float
If err changes less than tol_err_change, per iteration, break.
tol_n_above_min : int
Number of half-iterations that can be performed without reaching a
new error-minimum
Attributes
----------
err : list
List of calculated errors (from err_fcn) after each least squares (ie
twice per iteration)
C_ : ndarray [n_samples, n_targets]
Most recently calculated C matrix (that did not cause a tolerance
failure)
ST_ : ndarray [n_targets, n_features]
Most recently calculated S^T matrix (that did not cause a tolerance
failure)
C_opt_ : ndarray [n_samples, n_targets]
[Optimal] C matrix for lowest err attribute
ST_opt_ : ndarray [n_targets, n_features]
[Optimal] ST matrix for lowest err attribute
n_iter : int
Total number of iterations performed
n_features : int
Total number of features, e.g. spectral frequencies.
n_samples : int
Total number of samples (e.g., pixels)
n_targets : int
Total number of targets (e.g., pure analytes)
n_iter_opt : int
Iteration when optimal C and ST calculated
exit_max_iter_reached : bool
Exited iterations due to maximum number of iteration reached (max_iter
parameter)
exit_tol_increase : bool
Exited iterations due to maximum fractional increase in error metric
(via err_fcn)
exit_tol_n_increase : bool
Exited iterations due to maximum number of consecutive increases in
error metric (via err fcn)
exit_tol_err_change : bool
Exited iterations due to error metric change that is smaller than
tol_err_change
exit_tol_n_above_min : bool
Exited iterations due to maximum number of half-iterations for which
the error metric increased above the minimum error
Notes
-----
- Built-in regressor classes (str can be used): OLS (ordinary least
squares), NNLS (non-negatively constrained least squares). See
mcr.regressors.
- Built-in regressor methods can be given as a string to c_regr, st_regr;
though instantiating an imported class gives more flexibility.
- Setting any tolerance to None turns that check off
"""
def __init__(self, c_regr=OLS(), st_regr=OLS(), c_fit_kwargs={},
st_fit_kwargs={}, c_constraints=[ConstraintNonneg()],
st_constraints=[ConstraintNonneg()],
max_iter=50, err_fcn=mse,
tol_increase=0.0, tol_n_increase=10, tol_err_change=None,
tol_n_above_min=10
):
"""
Multivariate Curve Resolution - Alternating Regression
"""
self.max_iter = max_iter
self.tol_increase = tol_increase
self.tol_n_increase = tol_n_increase
self.tol_err_change = tol_err_change
self.tol_n_above_min = tol_n_above_min
self.err_fcn = err_fcn
self.err = None
self.c_constraints = c_constraints
self.st_constraints = st_constraints
self.c_regressor = self._check_regr(c_regr)
self.st_regressor = self._check_regr(st_regr)
self.c_fit_kwargs = c_fit_kwargs
self.st_fit_kwargs = st_fit_kwargs
self.C_ = None
self.ST_ = None
self.C_opt_ = None
self.ST_opt_ = None
self.n_iter_opt = None
self.n_iter = None
self.n_increase = None
self.n_above_min = None
self.exit_max_iter_reached = False
self.exit_tol_increase = False
self.exit_tol_n_increase = False
self.exit_tol_err_change = False
self.exit_tol_n_above_min = False
# Saving every C or S^T matrix at each iteration
# Could create huge memory usage
self._saveall_st = False
self._saveall_c = False
self._saved_st = []
self._saved_c = []
[docs] def _check_regr(self, mth):
"""
Check regressor method. If acceptable strings, instantiate and
return object. If instantiated class, make sure it has a fit
attribute.
"""
if isinstance(mth, str):
if mth.upper() == 'OLS':
return OLS()
elif mth.upper() == 'NNLS':
return NNLS()
else:
raise ValueError('{} is unknown. Use NNLS or OLS.'.format(mth))
elif hasattr(mth, 'fit'):
return mth
else:
raise ValueError('Input class '
'{} does not have a \'fit\' method'.format(mth))
@property
def D_(self):
""" D matrix with current C and S^T matrices """
return _np.dot(self.C_, self.ST_)
@property
def D_opt_(self):
""" D matrix with optimal C and S^T matrices """
return _np.dot(self.C_opt_, self.ST_opt_)
@property
def n_features(self):
""" Number of features """
if self.ST_ is not None:
return self.ST_.shape[-1]
else:
return None
@property
def n_targets(self):
""" Number of targets """
if self.C_ is not None:
return self.C_.shape[1]
else:
return None
@property
def n_samples(self):
""" Number of samples """
if self.C_ is not None:
return self.C_.shape[0]
else:
return None
[docs] def _ismin_err(self, val):
""" Is the current error the minimum """
if len(self.err) == 0:
return True
else:
return ([val > x for x in self.err].count(True) == 0)
[docs] def fit(self, D, C=None, ST=None, st_fix=None, c_fix=None, c_first=True,
verbose=False, post_iter_fcn=None, post_half_fcn=None):
"""
Perform MCR-AR. D = CS^T. Solve for C and S^T iteratively.
Parameters
----------
D : ndarray
D matrix
C : ndarray
Initial C matrix estimate. Only provide initial C OR S^T.
ST : ndarray
Initial S^T matrix estimate. Only provide initial C OR S^T.
st_fix : list
The spectral component numbers to keep fixed.
c_fix : list
The concentration component numbers to keep fixed.
c_first : bool
Calculate C first when both C and ST are provided. c_fix and st_fix
must also be provided in this circumstance.
verbose : bool
Log iteration and per-least squares err results. See Notes.
post_iter_fcn : function
Function to perform after each iteration
post_half_fcn : function
Function to perform after half-iteration
Notes
-----
- pyMCR (>= 0.3.1) uses the native Python logging module
rather than print statements; thus, to see the messages, one will
need to log-to-file or stream to stdout. More info is available in
the docs.
"""
if verbose:
_logger.setLevel(_logging.DEBUG)
else:
_logger.setLevel(_logging.INFO)
# Ensure only C or ST provided
if (C is None) & (ST is None):
raise TypeError('C or ST estimate must be provided')
elif (C is not None) & (ST is not None) & ((c_fix is None) |
(st_fix is None)):
err_str1 = 'Only C or ST estimate must be provided, '
raise TypeError(
err_str1 + 'unless c_fix and st_fix are both provided')
else:
self.C_ = C
self.ST_ = ST
self.n_increase = 0
self.n_above_min = 0
self.err = []
# Both C and ST provided. special_skip_c comes into play below
both_condition = (self.ST_ is not None) & (self.C_ is not None)
for num in range(self.max_iter):
self.n_iter = num + 1
# Both st and c provided, but c_first is False
if both_condition & (num == 0) & (not c_first):
special_skip_c = True
else:
special_skip_c = False
if (self.ST_ is not None) & (not special_skip_c):
# Debugging feature -- saves every S^T matrix in a list
# Can create huge memory usage
if self._saveall_st:
self._saved_st.append(self.ST_)
# * Target is the feature of the regression
self.c_regressor.fit(self.ST_.T, D.T, **self.c_fit_kwargs)
C_temp = self.c_regressor.coef_
# Apply fixed C's
if c_fix:
C_temp[:, c_fix] = self.C_[:, c_fix]
# Apply c-constraints
for constr in self.c_constraints:
C_temp = constr.transform(C_temp)
# Apply fixed C's
if c_fix:
C_temp[:, c_fix] = self.C_[:, c_fix]
D_calc = _np.dot(C_temp, self.ST_)
err_temp = self.err_fcn(C_temp, self.ST_, D, D_calc)
if self._ismin_err(err_temp):
self.C_opt_ = 1 * C_temp
self.ST_opt_ = 1 * self.ST_
self.n_iter_opt = num + 1
self.n_above_min = 0
else:
self.n_above_min += 1
if self.tol_n_above_min is not None:
if self.n_above_min > self.tol_n_above_min:
err_str1 = 'Half-iterated {} times since ' \
'min '.format(self.n_above_min)
err_str2 = 'error. Exiting.'
_logger.info(err_str1 + err_str2)
self.exit_tol_n_above_min = True
break
# Calculate error fcn and check for tolerance increase
if len(self.err) == 0:
self.err.append(1 * err_temp)
self.C_ = 1 * C_temp
elif self.tol_increase is None:
self.err.append(1 * err_temp)
self.C_ = 1 * C_temp
elif err_temp <= self.err[-1] * (1 + self.tol_increase):
self.err.append(1 * err_temp)
self.C_ = 1 * C_temp
else:
err_str1 = 'Error increased above fractional' \
'ctol_increase (C iter). Exiting'
_logger.info(err_str1)
self.exit_tol_increase = True
break
# Check if err went up
if len(self.err) > 1:
if self.err[-1] > self.err[-2]: # Error increased
self.n_increase += 1
else:
self.n_increase *= 0
# Break if too many error-increases in a row
if self.tol_n_increase is not None:
if self.n_increase > self.tol_n_increase:
out_str1 = 'Maximum error increases reached '
_logger.info(
out_str1 + '({}) (C iter). '
'Exiting.'.format(self.tol_n_increase))
self.exit_tol_n_increase = True
break
_logger.debug('Iter: {} (C)\t{}: '
'{:.4e}'.format(self.n_iter,
self.err_fcn.__name__,
err_temp))
if post_half_fcn is not None:
post_half_fcn(self.C_, self.ST_, D, D_calc)
if self.C_ is not None:
# Debugging feature -- saves every C matrix in a list
# Can create huge memory usage
if self._saveall_c:
self._saved_c.append(self.C_)
# * Target is the feature of the regression
self.st_regressor.fit(self.C_, D, **self.st_fit_kwargs)
ST_temp = self.st_regressor.coef_.T
# Apply fixed ST's
if st_fix:
ST_temp[st_fix] = self.ST_[st_fix]
# Apply ST-constraints
for constr in self.st_constraints:
ST_temp = constr.transform(ST_temp)
# Apply fixed ST's
if st_fix:
ST_temp[st_fix] = self.ST_[st_fix]
D_calc = _np.dot(self.C_, ST_temp)
err_temp = self.err_fcn(self.C_, ST_temp, D, D_calc)
# Calculate error fcn and check for tolerance increase
if self._ismin_err(err_temp):
self.ST_opt_ = 1 * ST_temp
self.C_opt_ = 1 * self.C_
self.n_iter_opt = num + 1
self.n_above_min = 0
else:
self.n_above_min += 1
if self.tol_n_above_min is not None:
if self.n_above_min > self.tol_n_above_min:
err_str1 = 'Half-iterated {} times ' \
'since min '.format(self.n_above_min)
err_str2 = 'error. Exiting.'
_logger.info(err_str1 + err_str2)
self.exit_tol_n_above_min = True
break
if len(self.err) == 0:
self.err.append(1 * err_temp)
self.ST_ = 1 * ST_temp
elif self.tol_increase is None:
self.err.append(1 * err_temp)
self.ST_ = 1 * ST_temp
elif err_temp <= self.err[-1] * (1 + self.tol_increase):
self.err.append(1 * err_temp)
self.ST_ = 1 * ST_temp
else:
err_str1 = 'Error increased above fractional ' \
'tol_increase (ST iter). Exiting'
_logger.info(err_str1)
self.exit_tol_increase = True
break
# Check if err went up
if len(self.err) > 1:
if self.err[-1] > self.err[-2]: # Error increased
self.n_increase += 1
else:
self.n_increase *= 0
# Break if too many error-increases in a row
if self.tol_n_increase is not None:
if self.n_increase > self.tol_n_increase:
out_str = 'Maximum error increases reached '
_logger.info(out_str +
'({}) (ST iter). '
'Exiting.'.format(self.tol_n_increase))
self.exit_tol_n_increase = True
break
_logger.debug('Iter: {} (ST)\t{}: '
'{:.4e}'.format(self.n_iter,
self.err_fcn.__name__, err_temp))
if post_half_fcn is not None:
post_half_fcn(self.C_, self.ST_, D, D_calc)
if post_iter_fcn is not None:
post_iter_fcn(self.C_, self.ST_, D, D_calc)
if self.n_iter >= self.max_iter:
_logger.info('Max iterations reached ({}).'.format(num + 1))
self.exit_max_iter_reached = True
break
self.n_iter = num + 1
# Check if err changed (absolute value), per iteration, less
# than abs(tol_err_change)
if (self.tol_err_change is not None) & (len(self.err) > 2):
err_differ = _np.abs(self.err[-1] - self.err[-3])
if err_differ < _np.abs(self.tol_err_change):
_logger.info('Change in err below tol_err_change '
'({:.4e}). Exiting.'.format(err_differ))
self.exit_tol_err_change = True
break
if __name__ == '__main__': # pragma: no cover
# PyMCR uses the Logging facility to capture messaging
# Sends logging messages to stdout (prints them)
stdout_handler = _logging.StreamHandler(stream=_sys.stdout)
stdout_format = _logging.Formatter('%(message)s')
stdout_handler.setFormatter(stdout_format)
_logger.addHandler(stdout_handler)
M = 21
N = 21
P = 101
n_components = 2
C_img = _np.zeros((M, N, n_components))
C_img[..., 0] = _np.dot(_np.ones((M, 1)), _np.linspace(0, 1, N)[None, :])
C_img[..., 1] = 1 - C_img[..., 0]
St_known = _np.zeros((n_components, P))
St_known[0, 40:60] = 1
St_known[1, 60:80] = 2
C_known = C_img.reshape((-1, n_components))
D_known = _np.dot(C_known, St_known)
mcrar = McrAR()
mcrar.fit(D_known, ST=St_known, verbose=True)
# assert_equal(1, mcrar.n_iter_opt)
assert ((mcrar.D_ - D_known) ** 2).mean() < 1e-10
assert ((mcrar.D_opt_ - D_known) ** 2).mean() < 1e-10
mcrar = McrAR()
mcrar.fit(D_known, C=C_known)
# assert_equal(1, mcrar.n_iter_opt)
assert ((mcrar.D_ - D_known) ** 2).mean() < 1e-10
assert ((mcrar.D_opt_ - D_known) ** 2).mean() < 1e-10