Source code for lsst.sims.maf.stackers.baseStacker

from __future__ import print_function
from builtins import zip
from builtins import object
import inspect
import warnings
import numpy as np
from future.utils import with_metaclass

__all__ = ['StackerRegistry', 'BaseStacker']


[docs]class StackerRegistry(type): """ Meta class for Stackers, to build a registry of stacker classes. """ def __init__(cls, name, bases, dict): super(StackerRegistry, cls).__init__(name, bases, dict) if not hasattr(cls, 'registry'): cls.registry = {} if not hasattr(cls, 'sourceDict'): cls.sourceDict = {} modname = inspect.getmodule(cls).__name__ if modname.startswith('lsst.sims.maf.stackers'): modname = '' else: if len(modname.split('.')) > 1: modname = '.'.join(modname.split('.')[:-1]) + '.' else: modname = modname + '.' stackername = modname + name if stackername in cls.registry: raise Exception('Redefining stacker %s! (there are >1 stackers with the same name)' % (stackername)) if stackername != 'BaseStacker': cls.registry[stackername] = cls colsAdded = cls.colsAdded for col in colsAdded: cls.sourceDict[col] = cls
[docs] def getClass(cls, stackername): return cls.registry[stackername]
[docs] def help(cls, doc=False): for stackername in sorted(cls.registry): if not doc: print(stackername) if doc: print('---- ', stackername, ' ----') print(cls.registry[stackername].__doc__) stacker = cls.registry[stackername]() print(' Columns added to SimData: ', ','.join(stacker.colsAdded)) print(' Default columns required: ', ','.join(stacker.colsReq))
[docs]class BaseStacker(with_metaclass(StackerRegistry, object)): """Base MAF Stacker: add columns generated at run-time to the simdata array.""" # List of the names of the columns generated by the Stacker. colsAdded = [] def __init__(self): """ Instantiate the stacker. This method should be overriden by the user. This serves as an example of the variables required by the framework. """ # Add the list of new columns generated by the stacker as class attributes (colsAdded - above). # List of the names of the columns required from the database (to generate the Stacker columns). self.colsReq = [] # Optional: specify the new column types. self.colsAddedDtypes = None # Optional: provide a list of units for the columns defined in colsAdded. self.units = [None] def __hash__(self): return None
[docs] def __eq__(self, otherStacker): """ Evaluate if two stackers are equivalent. """ # If the class names are different, they are not 'the same'. if self.__class__.__name__ != otherStacker.__class__.__name__: return False # Otherwise, this is the same stacker class, but may be instantiated differently. # We have to delve a little further, and compare the kwargs & attributes for each stacker. stateNow = dir(self) for key in stateNow: if not key.startswith('_') and key != 'registry' and key != 'run' and key != 'next': if not hasattr(otherStacker, key): return False # If the attribute is from numpy, assume it's an array and test it if type(getattr(self, key)).__module__ == np.__name__: if not np.array_equal(getattr(self, key), getattr(otherStacker, key)): return False else: if getattr(self, key) != getattr(otherStacker, key): return False return True
[docs] def __ne__(self, otherStacker): """ Evaluate if two stackers are not equal. """ if self == otherStacker: return False else: return True
def _addStackerCols(self, simData): """ Add the new Stacker columns to the simData array. If columns already present in simData, just allows 'run' method to overwrite. Returns simData array with these columns added (so 'run' method can set their values). """ if not hasattr(self, 'colsAddedDtypes') or self.colsAddedDtypes is None: self.colsAddedDtypes = [float for col in self.colsAdded] # Create description of new recarray. newdtype = simData.dtype.descr cols_present = [False] * len(self.colsAdded) for i, (col, dtype) in enumerate(zip(self.colsAdded, self.colsAddedDtypes)): if col in simData.dtype.names: if simData[col][0] is not None: cols_present[i] = True warnings.warn('Warning - column %s already present in simData, may be overwritten ' '(depending on stacker).' % (col)) else: newdtype += ([(col, dtype)]) newData = np.empty(simData.shape, dtype=newdtype) # Add references to old data. for col in simData.dtype.names: newData[col] = simData[col] # Were all columns present and populated with something not None? If so, then consider 'all there'. if sum(cols_present) == len(self.colsAdded): cols_present = True else: cols_present = False return newData, cols_present
[docs] def run(self, simData, override=False): """ Example: Generate the new stacker columns, given the simdata columns from the database. Returns the new simdata structured array that includes the new stacker columns. """ # Add new columns if len(simData) == 0: return simData simData, cols_present = self._addStackerCols(simData) # If override is set, it means go ahead and recalculate stacker values. if override: cols_present = False # Run the method to calculate/add new data. try: return self._run(simData, cols_present) except TypeError: warnings.warn('Please update the stacker %s so that the _run method matches the current API. ' 'This will give you the option to skip re-running stackers if the columns are ' 'already present.' % (self.__class__.__name__)) return self._run(simData)
def _run(self, simData, cols_present=False): # By moving the calculation of these columns to a separate method, we add the possibility of using # stackers with pandas dataframes. The _addStackerCols method won't work with dataframes, but the # _run methods are quite likely to (depending on their details), as they are just populating columns. raise NotImplementedError('Not Implemented: ' 'the child stackers should implement their own _run methods')