Source code for mosaic.trajio.metaTrajIO

# -*- coding: utf-8 -*-
"""
	Read binary ionic current data into numpy arrays

	:Created:	7/17/2012
 	:Author: 	Arvind Balijepalli <arvind.balijepalli@nist.gov>
	:License:	See LICENSE.TXT	
	:ChangeLog:
	.. line-block::
		6/10/21 	AB 	Allow filter settings to be passed as keyword argument.
		4/13/17 	AB 	Negative end values enable runnning an analysis on all available data.
		7/29/16 	AB 	Add additional filtering when constructing a list of data files to process.
		1/27/17 	AB 	Perform a lexical sort of input data files
		9/13/15 	AB 	Updated logging to use mosaicLog class
		4/4/15		AB 	Merge changes from devel-1.0 	
		4/1/15 		AB 	Added a new property (DataLengthSec) to estimate the length of a data set.
		3/28/15 	AB 	Optimized file read interface for improved large file support.
		1/17/15 	AB 	Store names of processed files in an array.
		8/22/14 	AB 	Setup a new property ('LastDataFile') that tracks the current
						data file being processed.
		5/27/14		AB 	Added dcOffset kwarg to initialization to allow 
						for offset correction in the ionic current data.
		2/13/14		AB 	Fixed a potential infinite recursion bug in the
						initialization. 
		7/17/12		AB	Initial version
"""
import sys
from abc import ABCMeta, abstractmethod
import glob
import os
import numpy as np

import mosaic.settings as settings
from mosaic.utilities.resource_path import format_path, path_separator
import mosaic.utilities.mosaicLogging as mlog
from mosaic.utilities.mosaicLogFormat import _dprop, mosaic_property
import mosaic.utilities.mosaicTiming as mtime

__all__ = ["metaTrajIO", "IncompatibleArgumentsError", "IncorrectDataFormat", "EndOfFileError", "SamplingRateChangedError", "EmptyDataPipeError", "FileNotFoundError"]

# define custom exceptions
class IncompatibleArgumentsError(Exception):
	pass
class InsufficientArgumentsError(Exception):
	pass
class IncorrectDataFormat(Exception):
	pass
class EndOfFileError(Exception):
	pass
class SamplingRateChangedError(Exception):
	pass
class EmptyDataPipeError(Exception):
	pass
class FileNotFoundError(Exception):
	pass
class InsufficientDataError(Exception):
	pass

trajTimer=mtime.mosaicTiming()

ignorelist=[
		"eventProcessing*.log", 
		"*.sqlite"
	]

[docs]class metaTrajIO(object, metaclass=ABCMeta): """ .. warning:: |metaclass| Initialize a TrajIO object. The object can load all the data in a directory, N files from a directory or from an explicit list of filenames. In addition to the arguments defined below, implementations of this meta class may require the definition of additional arguments. See the documentation of those classes for what those may be. For example, the qdfTrajIO implementation of metaTrajIO also requires the feedback resistance (Rfb) and feedback capacitance (Cfb) to be passed at initialization. :Parameters: - `dirname` : all files from a directory ('<full path to data directory>') - `nfiles` : if requesting N files (in addition to dirname) from a specified directory - `fnames` : explicit list of filenames ([file1, file2,...]). This argument cannot be used in conjuction with dirname/nfiles. The filter argument is ignored when used in combination with fnames. - `filter` : '<wildcard filter>' (optional, filter is '*' if not specified) - `start` : Data start point in seconds. - `end` : Data end point in seconds. - `datafilter` : Handle to the algorithm to use to filter the data. If no algorithm is specified, datafilter is None and no filtering is performed. - `dcOffset` : Subtract a DC offset from the ionic current data. - `filtersettings`: Dict containing low pass filter settings (optional: if not provided filter settings will be loaded from the settings file. If no settings are found, `datafilter` will be turned off.) :Properties: - `FsHz` : sampling frequency in Hz. If the data was decimated, this property will hold the sampling frequency after decimation. - `LastFileProcessed` : return the data file that was last processed. - `ElapsedTimeSeconds` : return the analysis time in sec. :Errors: - `IncompatibleArgumentsError` : when conflicting arguments are used. - `EmptyDataPipeError` : when out of data. - `FileNotFoundError` : when data files do not exist in the specified path. - `InsufficientArgumentsError` : when incompatible arguments are passed """ def __init__(self, **kwargs): """ """ self.CHUNKSIZE=10000 self.dataGenerator=None self.logger=mlog.mosaicLogging().getLogger(name=__name__) # start by setting all passed keyword arguments as class attributes for (k,v) in kwargs.items(): setattr(self, k, v) # Check if the passed arguments are sane if hasattr(self, 'dirname') and hasattr(self, 'fnames'): raise IncompatibleArgumentsError("Incompatible arguments: expect either 'dirname' or 'fnames' when initializing class {0}.".format(type(self).__name__)) # Check for the filter arg if not hasattr(self, 'filter'): self.filter='*' if hasattr(self, 'fnames'): # set fnames here. self.dataFiles=self.fnames delattr(self, 'fnames') else: try: if hasattr(self, 'dirname') and hasattr(self,'nfiles'): # N files from a directory self.dataFiles=self._buildFileList(self.dirname, self.filter)[:int(self.nfiles)] delattr(self, 'dirname') delattr(self, 'nfiles') elif hasattr(self, 'dirname'): # all files from a directory self.dataFiles=self._buildFileList(self.dirname, self.filter) delattr(self, 'dirname') else: raise IncompatibleArgumentsError("Missing arguments: 'dirname' or 'fnames' must be supplied to initialize {0}".format(type(self).__name__)) except AttributeError as err: raise IncompatibleArgumentsError(err) # set additional meta-data self.nFiles = len(self.dataFiles) self.fileFormat='N/A' try: sep=path_separator() self.datPath=format_path(sep.join((self.dataFiles[0].split( sep ))[:-1])) except IndexError as err: raise FileNotFoundError("Files not found.") # setup data filtering if hasattr(self, 'datafilter'): self.logger.info("Data filtering active.") self.dataFilter=True self.dataFilterObj=self._setupDataFilter() else: self.dataFilter=False if not hasattr(self, 'dcOffset'): self.dcOffset=0.0 else: self.dcOffset=float(self.dcOffset) # set start to 0 if it doesn't exist if not hasattr(self, 'start'): self.start=0. # Track current filename self.currentFilename=self.dataFiles[0] # initialize an empty data pipeline self.currDataPipe=np.array([]) # Track the start point of the queue. This var is used to manage # deletion more effectively, by not deleting elements every time # popdata is called. Instead, data is actually deleted when the index # exceeds 1 million data points. self.currDataIdx=0 # a var that determines if the end of the data stream is imminent. self.nearEndOfData=0 # A global index that tracks the number of data points retrieved. self.globalDataIndex=0 self.datLenSec=0 self.initPipe=False # A list that holds the names of processed files. self.processedFilenames=[] # Call sub-class init self._init(**kwargs) def Stop(self): trajTimer.PrintStatistics() raise EmptyDataPipeError("End of data.") ################################################################# # Public API: functions ################################################################# @property def FsHz(self): """ .. important:: |property| Return the sampling frequency in Hz. """ if not self.initPipe: self._initPipe() if self.dataFilter: self.logger.debug(_dprop("Sampling frequency {0} ({1})", self.dataFilterObj.filterFs, type(self.dataFilterObj).__name__)) return self.dataFilterObj.filterFs else: self.logger.debug(_dprop("Sampling frequency {0}", self.Fs)) return self.Fs @mosaic_property def ElapsedTimeSeconds(self): """ .. important:: |property| Return the elapsed time in the time-series in seconds. """ if not self.initPipe: self._initPipe() if not self.dataFilter: Fs=self.Fs else: Fs=self.dataFilterObj.filterFs elapsedTime=(self.globalDataIndex - self.startIndex)/float(Fs) return elapsedTime @mosaic_property def LastFileProcessed(self): """ .. important:: |property| Return the last data file that was processed """ return self.currentFilename @mosaic_property def ProcessedFiles(self): """ .. important:: |property| Return a list of processed data filenames. """ return self.processedFilenames @mosaic_property def DataLengthSec(self): """ .. important:: |property| Return the approximate length of data that will be processed. If the data are in multiple files, this property assumes that each file contains an equal amount of data. """ if not self.initPipe: self._initPipe() return self.datLenSec @trajTimer.FunctionTiming def popdata(self, n): """ Pop data points from self.currDataPipe. This function uses recursion to automatically read data files when the queue length is shorter than the requested data points. When all data files are read, an EmptyDataPipeError is thrown. :Parameters: - `n` : number of requested data points :Returns: - Numpy array with requested data :Errors: - `EmptyDataPipeError` : if the queue has fewer data points than requested. """ if not self.initPipe: self._initPipe() if self.nearEndOfData>1: self.Stop() # If the global index exceeds the specied end point, raise an EmptyDataPipError if hasattr(self, "end") and self.end>0: if self.globalDataIndex > self.endIndex: self.Stop() try: # Get the elements to return: index to (index+n) t=self.currDataPipe[self.currDataIdx:self.currDataIdx+int(n)]-self.dcOffset if len(t) < int(n): raise InsufficientDataError # If the required data points were obtained, update the queue and global indices self.currDataIdx+=int(n) self.globalDataIndex+=int(n) # delete them from the pipe if the index exceeds 1 million if self.currDataIdx>1000000: self.currDataPipe=np.delete(self.currDataPipe, np.s_[:self.currDataIdx:], axis=0) # reset the index self.currDataIdx=0 # return the popped data return t except InsufficientDataError as err: if self.nearEndOfData>0: self.currDataIdx+=int(n) self.globalDataIndex+=int(n) self.nearEndOfData+=1 return t else: self._appenddata() return self.popdata(n)
[docs] def previewdata(self, n): """ Preview data points in self.currDataPipe. This function is identical in behavior to popdata, except it does not remove data point from the queue. Like popdata, it uses recursion to automatically read data files when the queue length is shorter than the requested data points. When all data files are read, an EmptyDataPipeError is thrown. :Parameters: `n` : number of requested data points :Returns: - Numpy array with requested data :Errors: - `EmptyDataPipeError` : if the queue has fewer data points than requested. """ if not self.initPipe: self._initPipe() try: # Get the elements to return t=self.currDataPipe[self.currDataIdx:self.currDataIdx+int(n)]-self.dcOffset if len(t) < int(n): raise InsufficientDataError return t except InsufficientDataError as err: if self.nearEndOfData>0: return t else: self._appenddata() return self.previewdata(int(n))
[docs] def formatsettings(self): """ Return a formatted string of settings for display """ self.logger.info( '\tTrajectory I/O settings:' ) self.logger.info( '\t\tFiles processed = {0}'.format(self.nFiles-len(self.dataFiles)) ) self.logger.info( '\t\tData path = \'{0}\''.format(self.datPath) ) self.logger.info( '\t\tFile format = {0}'.format(self.fileFormat) ) self.logger.info( '\t\tSampling frequency = {0} kHz'.format(self.FsHz*1e-3) ) # Sub-class formatted settings self._formatsettings() # add the filter settings if self.dataFilter: self.dataFilterObj.formatsettings()
################################################################# # Private API: Interface functions, implemented by sub-classes. # Should not be called from external classes ################################################################# def _appenddata(self): """ Read the specified data file(s) and append its data to the data pipeline. Set a class property FsHz with the sampling frequency in Hz. :Parameters: - None .. seealso:: See implementations of metaTrajIO for specfic documentation. """ try: data=self.scaleData(next(self.dataGenerator)) if self.dataFilter: self.dataFilterObj.filterData(data, self.Fs) self.currDataPipe=np.hstack((self.currDataPipe, self.dataFilterObj.filteredData )) else: self.currDataPipe=np.hstack((self.currDataPipe, data )) except (StopIteration, AttributeError, TypeError): # Read a new data file to get more data fname=self.popfnames() if fname: self.processedFilenames.extend([[fname, self.fileFormat, os.path.getmtime(fname)]]) self.rawData=self.readdata( fname ) self.dataGenerator=self._createGenerator() self._appenddata()
[docs] def scaleData(self, data): """ .. important:: |interfacemethod| Scale the raw data loaded with :func:`~mosaic.metaTrajIO.metaTrajIO.readdata`. Note this function will not necessarily receive the entire data array loaded with :func:`~mosaic.metaTrajIO.metaTrajIO.readdata`. Transformations must be able to process partial data chunks. :Parameters: - `data` : partial chunk of raw data loaded using :func:`~mosaic.metaTrajIO.metaTrajIO.readdata`. :Returns: - Array containing scaled data. :Default Behavior: - If not implemented by a sub-class, the default behavior is to return ``data`` to the calling function without modifications. :Example: Assuming the amplifier scale and offset values are stored in the class variables ``AmplifierScale`` and ``AmplifierOffset``, the raw data read using :func:`~mosaic.metaTrajIO.metaTrajIO.readdata` can be transformed by :func:`~mosaic.metaTrajIO.metaTrajIO.scaleData`. We can also use this function to change the array data type. .. code-block:: python def scaleData(self, data): return np.array(data*self.AmplifierScale-self.AmplifierOffset, dtype='f8') """ return data
[docs] @abstractmethod def _formatsettings(self): """ Log settings strings """ pass
[docs] @abstractmethod def _init(self, **kwargs): """ .. important:: |abstractmethod| This function is called at the end of the class constructor to perform additional initialization specific to the algorithm being implemented. The arguments to this function are identical to those passed to the class constructor. """ pass
[docs] @abstractmethod def readdata(self, fname): """ .. important:: |abstractmethod| Return raw data from a single data file. Set a class attribute Fs with the sampling frequency in Hz. :Parameters: - `fname` : fileame to read :Returns: An array object that holds raw (unscaled) data from `fname` :Errors: None """ pass
[docs] def popfnames(self): """ Pop a single filename from the start of ``self.dataFiles``. If ``self.dataFiles`` is empty, raise an ``EmptyDataPipeError`` error. :Parameters: - None :Returns: A single filename if successful. :Errors: - `EmptyDataPipeError` : when the filename list is empty. """ try: self.currentFilename=self.dataFiles.pop(0) return self.currentFilename except IndexError: if self.nearEndOfData: self.Stop() else: self.logger.debug("Approaching the end of the data stream.") self.nearEndOfData+=1
################################################################# # Internal Functions ################################################################# def _initPipe(self): # Last, on startup load a single data file to force # the sampling frequency FsHz to be set on startup self._appenddata() self.initPipe=True self.datLenSec=(len(self.rawData)/float(self.Fs)*(len(self.dataFiles)+1)) # Set the end point if hasattr(self, 'end'): if self.end > 0: # treat a negative end value the same as not setting end. self.endIndex=int((self.end-1)*self.Fs) self.datLenSec=self.end-self.start # Drop the first 'n' points specified by the start keyword if hasattr(self, 'start'): self.startIndex=int(self.start*self.Fs) if self.startIndex > 0: nBlks=int((self.startIndex-1)/self.CHUNKSIZE) for i in range(nBlks): self.popdata(self.CHUNKSIZE) self.popdata( int((self.startIndex-1)%self.CHUNKSIZE) ) def _setupDataFilter(self): if hasattr(self, 'filtersettings'): filtsettings=self.filtersettings else: filtername=str(self.datafilter.__name__.split('.')[-1]) filtsettings=settings.settings( self.datPath ).getSettings(filtername) if filtsettings=={}: self.logger.warning("WARNING: No settings found for '{0}'. Data filtering is disabled".format(str(self.datafilter.__name__))) self.dataFilter=False return else: return self.datafilter(**filtsettings) def _createGenerator(self): i=0 while i<len(self.rawData): yield self.rawData[i:i+self.CHUNKSIZE] i+=self.CHUNKSIZE def _buildFileList(self, dirname, filter): flist=set(glob.glob(format_path(dirname+"/"+filter))) for ignorefilter in ignorelist: flist=flist-set(glob.glob(format_path(dirname+"/"+ignorefilter))) return sorted(list(flist))