Source code for mosaic.utilities.util

"""
	A collection of utility functions
"""
import sys
import ast
import numpy
from functools import reduce

__all__=["avg", "sd", "filter", "partition", "decimate", "commonest", "selectS", "flat2", "WindowSizeError"]

class WindowSizeError:
	pass
	
def str_(s):
	if type(s)==bytes:
		# return str(s, 'utf-8')
		return str(s, 'latin-1')
	else:
		return s

def bytes_(s):
	if type(s)==str:
		# return str(s, 'utf-8')
		return bytes(s, 'latin-1')
	else:
		return s

[docs]def avg(dat): """ Calculate the average of a list of reals """ try: return sum(dat)/len(dat) except ZeroDivisionError: return 0.0
[docs]def sd(dat): """ Wrapper for numpy std """ return numpy.std(dat)
[docs]def filter(dat, windowSz): """ Filter the data using a convolution. Returns an array of size len(dat)-windowSz+1 if dat is longer than windowSz. If len(dat) < windowSz, raise WindowSizeError """ if len(dat)<windowSz: raise WindowSizeError("Data length ({0}) must be longer than specified window size ({1}).".format(len(dat),windowSz)) # define the weights for the convolution weights=numpy.repeat(1.0,windowSz)/windowSz return list(numpy.convolve(dat,weights,'valid'))
[docs]def partition(dat, size): """ Partition a list into sub-lists, each of length size. If the number of elements in dat does not partition evenly, the last sub-list will have fewer elements. """ return (lambda dat, size: [dat[i:i+size] for i in range(0, len(dat), size)])(dat,size)
[docs]def decimate(dat, size): """ Decimate dat for a specified window size. """ return [ avg(decim) for decim in partition(dat,size) ]
[docs]def commonest(dat): """ Return the most common element in a list. """ return max( set(dat), key=dat.count )
[docs]def selectS(dat, nSigma, mu, sd): """ Select and return data from a list that lie within nSigma * SD of the mean. """ #sigma=min( numpy.std(dat), sd ) return [ d for d in dat if abs(d)>(mu-sd) and abs(d)<(mu+sd) ]
[docs]def flat2(dat): """ Flatten a 2D array to a list """ return reduce(lambda x,y: x+y,dat)
def eval_(expr): try: tree=ast.parse(expr, mode='eval') return eval(compile(tree, '<ast>', 'eval')) except: raise def exec_(expr): try: tree=ast.parse(expr, mode='exec') return eval(compile(tree, '<ast>', 'exec')) except: raise