Source code for mosaic.process.cusumPlus

# -*- coding: utf-8 -*-
"""
	Analyze a multi-step event with the CUSUM+ algorithm. Implements a modified version of the CUSUM algorithm 
	(used by OpenNanopore for example) in MOSAIC. This approach sacrifices including system information in the 
	analysis in favor of much faster fitting of single- and multi-level events.

	:Created:	2/10/2015
	:Author: 	Kyle Briggs <kbrig035@uottawa.ca>
	:License:	See LICENSE.TXT
	:ChangeLog:             
	.. line-block::
				11/15/19    JR  Updated output: calculates adn includes blockade standard deviation; simplified tab in code
				6/3/17		AB 	Updated docstring.
				8/24/15 	AB 	Rename algorithm to CUSUM+   
				3/20/15 	AB 	Added a new metadata column (mdStateResTime) that saves the residence time of each state to the database.
				3/18/15		KB	Implemented rise time skipping
				3/17/15		KB	Implemented adaptive threshold
				2/12/15		AB	Updated metadata representation to be consistent with stepResponseAnalysis and multiStateAnalysis
				2/10/15		KB	Initial version
"""
import mosaic.commonExceptions
import mosaic.process.metaEventProcessor as metaEventProcessor
import mosaic.utilities.util as util
import mosaic.utilities.mosaicLogging as mlog
import mosaic.utilities.fit_funcs as fit_funcs
import sys
import math
from scipy.optimize import minimize, fsolve
import numpy as np

__all__ = ["cusumPlus", "InvalidEvent"]

[docs]class InvalidEvent(Exception): pass
class datblock: """ Smart data block that holds a time-series of data and keeps track of its mean and SD. """ def __init__(self, dat): self.data=dat self.mean=util.avg(dat) self.sd=util.sd(dat)
[docs]class cusumPlus(metaEventProcessor.metaEventProcessor): """ CUSUM+ will detect jumps that are smaller than `StepSize`, but they will have to be sustained longer. Threshold can be thought of, very roughly, as proportional to the length of time a subevent must be sustained for it to be detected. The algorithm will adjust the actual threshold used on a per-event basis in order to minimize false positive detection of current jumps This algorithm is based on code used in OpenNanopore, which you can read about here: http://pubs.rsc.org/en/Content/ArticleLanding/2012/NR/c2nr30951c#!divAbstract Some known issues with CUSUM+: 1. If the duration of a sub-event is shorter than than the MinLength parameter, CUSUM+ will be unable to detect it. CUSUM+ will not detect events within MinLength of a previous event. 2. CUSUM assumes an instantaneous transition between current states. As a result, if the RC rise time of the system is large, CUSUM+ can trigger and detect intermediate states during the change time. This can be avoided by choosing a number of samples to skip equal to about 2-5RC. 3. As a consequence of using a statistical t-test, CUSUM can have false positives. The algorithm has an adaptive threshold that tries to minimize the chances of this happening while maintaining good sensitivity (expected number of false positives within an event is less than 1). :Keyword Args: In addition to :class:`~mosaic.metaEventProcessor.metaEventProcessor` args, - `StepSize` : The number of baseline standard deviations are considered significant (3 is usually a good starting point). - `MinThreshold` : One of two sensitivity parameters (lower is more sensitive). A good starting point is to set `MinThreshold` equal to `StepSize`. - `MaxThreshold` : One of two sensitivity parameters (lower is more sensitive). Set `MaxThreshold` about 3x higher than `MinThreshold`. - `MinLength` : The number of samples to skip after detecting a jump, in order to avoid triggering during the rise time and returning an artificially high number of states. This number of points is also skipped when averaging levels. About 4 times the RC constant of the system is a good starting value. :Errors: When an event cannot be analyzed, all metadata are set to -1. To use it requires four settings: .. code-block:: javascript "cusumPlus": { "StepSize": 3.0, "MinThreshold": 3.0, "MaxThreshold": 10.0, "MinLength" : 10, } """ def _init(self, **kwargs): """ Initialize the single step analysis class. """ # initialize the object's metadata (to -1) as class attributes self.mdOpenChCurrent=-1 self.mdCurrentStep=[-1] self.mdNStates=-1 self.mdBlockDepth=[-1] self.mdBlockSTD=[-1] self.mdEventDelay=[-1] self.mdStateResTime=[-1] self.mdEventStart=-1 self.mdEventEnd=-1 self.mdResTime = -1 self.mdAbsEventStart = -1 self.mdThreshold=-1 self.nStates=-1 self.cusumLogger=mlog.mosaicLogging().getLogger(__name__) # Settings for detection of changed in current level try: self.StepSize=float(self.settingsDict.pop("StepSize", 3.0)) self.MinThreshold=float(self.settingsDict.pop("MinThreshold", 2.0)) self.MaxThreshold=float(self.settingsDict.pop("MaxThreshold", 10.0)) self.MinLength=float(self.settingsDict.pop("MinLength", 10)) except ValueError as err: raise mosaic.commonExceptions.SettingsTypeError( err ) self.mdThreshold = self.MinThreshold ########################################################################### # Interface functions implemented starting here ########################################################################### def _processEvent(self): """ This function implements the core logic to analyze one single step-event. """ try: # Run CUSUM+ to detect changes in level self.__FitEvent() except: raise def _mdList(self): """ Return a list of meta-data from the analysis of single step events. We explicitly control the order of the data to keep formatting consistent. """ return [ self.mdProcessingStatus, self.mdOpenChCurrent, self.mdNStates, self.mdCurrentStep, self.mdBlockDepth, self.mdBlockSTD, self.mdEventStart, self.mdEventEnd, self.mdEventDelay, self.mdStateResTime, self.mdResTime, self.mdAbsEventStart, self.mdThreshold ] def _mdHeadingDataType(self): """ Return a list of meta-data tags data types. """ return [ 'TEXT', 'REAL', 'INTEGER', 'REAL_LIST', 'REAL_LIST', 'REAL_LIST', 'REAL', 'REAL', 'REAL_LIST', 'REAL_LIST', 'REAL', 'REAL', 'REAL' ] def _mdHeadings(self): """ Explicity set the metadata to print out. """ return [ 'ProcessingStatus', 'OpenChCurrent', 'NStates', 'CurrentStep', 'BlockDepth', 'BlockSTD', 'EventStart', 'EventEnd', 'EventDelay', 'StateResTime', 'ResTime', 'AbsEventStart', 'Threshold' ]
[docs] def mdAveragePropertiesList(self): """ Return a list of meta-data properties that will be averaged and displayed at the end of a run. """ pass
[docs] def formatsettings(self): """ Return a formatted string of settings for display """ self.cusumLogger.info( '\tEvent processing settings:' ) self.cusumLogger.info( '\t\tAlgorithm = CUSUM+' ) self.cusumLogger.info( '\t\tJump Size = {0}'.format(self.StepSize) ) self.cusumLogger.info( '\t\tMin. State Length = {0}'.format(self.MinLength) ) self.cusumLogger.info( '\t\tMin. Threshold = {0}'.format(self.MinThreshold) ) self.cusumLogger.info( '\t\tMax. Threshold = {0}'.format(self.MaxThreshold) )
########################################################################### # Local functions ########################################################################### def _GetThreshold(self, ARL, sigma, mun): ARL = 2*ARL #double since we are doing two-sided CUSUM f = lambda h: (np.exp(-2.0*mun*(h/sigma+1.166))-1.0+2.0*mun*(h/sigma+1.166))/(2.0*mun*mun)-ARL if f(self.MinThreshold)*f(self.MaxThreshold) < 0: #if a root exists in the specified range opth, info, ier, mesg = fsolve(f,self.MaxThreshold,full_output=True) if ier==1: #fit success, return the root Threshold = opth[0] else: #fit failure, default to min Threshold = self.MinThreshold else: #if no root exists, we use the min value f = lambda h: np.abs((np.exp(-2.0*mun*(h/sigma+1.166))-1.0+2.0*mun*(h/sigma+1.166))/(2.0*mun*mun)-ARL) #absolute value to minimize opth = minimize(f,self.MaxThreshold,bounds=((self.MinThreshold,self.MaxThreshold),)) #Find the min within the requested range if opth.success==False: Threshold = self.MinThreshold #Default to more sensitive else: Threshold = opth.x[0] return Threshold def __FitEvent(self): try: dt = 1000./self.Fs # time-step in ms. edat=self.dataPolarity*np.asarray( self.eventData, dtype='float64' ) #make data into an array and make it abs val Threshold = self._GetThreshold(len(edat),self.StepSize,-self.StepSize/2.0) # control numpy error reporting np.seterr(invalid='ignore', over='ignore', under='ignore') #set up variables for the main CUSUM+ loop logp = 0 #instantaneous log-likelihood for positive jumps logn = 0 #instantaneous log-likelihood for negative jumps cpos = np.zeros(len(edat), dtype='float64') #cumulative log-likelihood function for positive jumps cneg = np.zeros(len(edat), dtype='float64') #cumulative log-likelihood function for negative jumps gpos = np.zeros(len(edat), dtype='float64') #decision function for positive jumps gneg = np.zeros(len(edat), dtype='float64') #decision function for negative jumps edges = np.array([0], dtype='int64') #initialize an array with the position of the first subevent - the start of the event anchor = 0 #the last detected change length = len(edat) mean = edat[0] variance = self.baseSD * self.baseSD k = 0 self.nStates = 0 varM = edat[0] varS = 0 mean = edat[0] while k < length-1: k += 1 varOldM = varM #algorithm to calculate running variance, details here: http://www.johndcook.com/blog/standard_deviation/ varM = varM + (edat[k] - varM)/float(k+1-anchor) varS = varS + (edat[k] - varOldM) * (edat[k] - varM) variance = varS / float(k+1-anchor) mean = ((k-anchor) * mean + edat[k])/float(k+1-anchor) if (variance == 0): # with low-precision data sets it is possible that two adjacent values are equal, in which case there is zero variance for the two-vector of sample if this occurs next to a detected jump. This is very, very rare, but it does happen. variance = self.baseSD*self.baseSD # in that case, we default to the local baseline variance, which is a good an estimate as any. logp = self.StepSize*self.baseSD/variance * (edat[k] - mean - self.StepSize*self.baseSD/2.) #instantaneous log-likelihood for current sample assuming local baseline has jumped in the positive direction logn = -self.StepSize*self.baseSD/variance * (edat[k] - mean + self.StepSize*self.baseSD/2.) #instantaneous log-likelihood for current sample assuming local baseline has jumped in the negative direction cpos[k] = cpos[k-1] + logp #accumulate positive log-likelihoods cneg[k] = cneg[k-1] + logn #accumulate negative log-likelihoods gpos[k] = max(gpos[k-1] + logp, 0) #accumulate or reset positive decision function gneg[k] = max(gneg[k-1] + logn, 0) #accumulate or reset negative decision function if (gpos[k] > Threshold or gneg[k] > Threshold): if (gpos[k] > Threshold): #significant positive jump detected jump = anchor + np.argmin(cpos[anchor:k+1]) #find the location of the start of the jump if jump - edges[self.nStates] > self.MinLength: edges = np.append(edges, jump) self.nStates += 1 if (gneg[k] > Threshold): #significant negative jump detected jump = anchor + np.argmin(cneg[anchor:k+1]) if jump - edges[self.nStates] > self.MinLength: edges = np.append(edges, jump) self.nStates += 1 anchor = k cpos[0:len(cpos)] = 0 #reset all decision arrays cneg[0:len(cneg)] = 0 gpos[0:len(gpos)] = 0 gneg[0:len(gneg)] = 0 mean = edat[anchor] varM = edat[anchor] varS = 0 edges = np.append(edges, len(edat)) #mark the end of the event as an edge self.nStates += 1 cusum = dict() if (self.nStates < 3): self.rejectEvent('eInvalidStates') else: minstepflag = 0 while minstepflag == 0: minstepflag = 1 currentlevels = [np.average(edat[int(edges[i]+self.MinLength):int(edges[i+1])]) for i in range(self.nStates)] #detect current levels during detected sub-events currentSTD = [np.std(edat[int(edges[i]+self.MinLength):int(edges[i+1])]) for i in range(self.nStates)] toosmall = np.absolute(np.diff(currentlevels)) < self.StepSize*self.baseSD/2 for i in range(len(toosmall)): if toosmall[i] == True: edges = np.delete(edges,i+1) minstepflag = 0 self.nStates -= 1 break if (self.nStates < 3): self.rejectEvent('eInvalidStates') else: cusum['CurrentLevels'] = currentlevels cusum['CurrentSTD'] = currentSTD cusum['EventDelay'] = edges * dt #locations of sub-events in the data cusum['Threshold'] = Threshold #record the threshold used self.__recordevent(cusum) except KeyboardInterrupt: self.rejectEvent('eFitUserStop') raise except InvalidEvent: self.rejectEvent('eInvalidEvent') except BaseException: self.rejectEvent('eUnknownError') def __recordevent(self, cusum): dt = 1000./self.Fs # time-step in ms. if self.nStates<3: self.rejectEvent('eInvalidStates') else: self.mdOpenChCurrent = 0.5*(cusum['CurrentLevels'][0] + cusum['CurrentLevels'][self.nStates-1]) #this assumes that the event returns to baseline after self.mdCurrentStep = np.diff(np.hstack(([self.mdOpenChCurrent], cusum['CurrentLevels'][1:]))) #these current levels are relative to the open state self.mdNStates = self.nStates - 1 #this does not count padding as separate states. Note also that states can be triggered inside the padding self.mdBlockDepth = cusum['CurrentLevels'][1:-1]/self.mdOpenChCurrent #percentage blockage of each state self.mdBlockSTD = cusum['CurrentSTD'][1:-1] self.mdEventDelay = cusum['EventDelay'][1:-1] # first and last states (or baseline) are removed self.mdStateResTime = np.diff(self.mdEventDelay) self.mdEventStart = self.mdEventDelay[0] #the first nonzero event self.mdEventEnd = self.mdEventDelay[-1] #the last non zero event, this assumes no events triggered in the padding self.mdResTime = self.mdEventEnd - self.mdEventStart self.mdAbsEventStart = self.mdEventStart + self.absDataStartIndex * dt self.mdThreshold = cusum['Threshold'] if math.isnan(self.mdOpenChCurrent): print(self.mdOpenChCurrent) self.rejectEvent('eInvalidOpenChCurr')