Source code for lsst.sims.maf.metrics.moSummaryMetrics

import numpy as np
import warnings

from .moMetrics import BaseMoMetric

__all__ = ['integrateOverH', 'ValueAtHMetric', 'MeanValueAtHMetric',
           'MoCompletenessMetric', 'MoCompletenessAtTimeMetric']


[docs]def integrateOverH(Mvalues, Hvalues, Hindex = 0.33): """Function to calculate a metric value integrated over an Hrange, assuming a power-law distribution. Parameters ---------- Mvalues : numpy.ndarray The metric values at each H value. Hvalues : numpy.ndarray The H values corresponding to each Mvalue (must be the same length). Hindex : float, opt The power-law index expected for the H value distribution. Default is 0.33 (dN/dH = 10^(Hindex * H) ). Returns -------- numpy.ndarray The integrated or cumulative metric values. """ # Set expected H distribution. # dndh = differential size distribution (number in this bin) dndh = np.power(10., Hindex*(Hvalues-Hvalues.min())) # dn = cumulative size distribution (number in this bin and brighter) intVals = np.cumsum(Mvalues*dndh)/np.cumsum(dndh) return intVals
[docs]class ValueAtHMetric(BaseMoMetric): """Return the metric value at a given H value. Requires the metric values to be one-dimensional (typically, completeness values). Parameters ---------- Hmark : float, opt The H value at which to look up the metric value. Default = 22. """ def __init__(self, Hmark=22, **kwargs): metricName = 'Value At H=%.1f' %(Hmark) super(ValueAtHMetric, self).__init__(metricName=metricName, **kwargs) self.Hmark = Hmark
[docs] def run(self, metricVals, Hvals): # Check if desired H value is within range of H values. if (self.Hmark < Hvals.min()) or (self.Hmark > Hvals.max()): warnings.warn('Desired H value of metric outside range of provided H values.') return None if metricVals.shape[0] != 1: warnings.warn('This is not an appropriate summary statistic for this data - need 1d values.') return None value = np.interp(self.Hmark, Hvals, metricVals[0]) return value
[docs]class MeanValueAtHMetric(BaseMoMetric): """Return the mean value of a metric at a given H. Allows the metric values to be multi-dimensional (i.e. use a cloned H distribution). Parameters ---------- Hmark : float, opt The H value at which to look up the metric value. Default = 22. """ def __init__(self, Hmark=22, reduceFunc=np.mean, metricName=None, **kwargs): if metricName is None: metricName = 'Mean Value At H=%.1f' %(Hmark) super(MeanValueAtHMetric, self).__init__(metricName=metricName, **kwargs) self.Hmark = Hmark self.reduceFunc = reduceFunc
[docs] def run(self, metricVals, Hvals): # Check if desired H value is within range of H values. if (self.Hmark < Hvals.min()) or (self.Hmark > Hvals.max()): warnings.warn('Desired H value of metric outside range of provided H values.') return None value = np.interp(self.Hmark, Hvals, self.reduceFunc(metricVals.swapaxes(0, 1), axis=1)) return value
[docs]class MoCompletenessMetric(BaseMoMetric): """Calculate the fraction of the population that meets `threshold` value or higher. This is equivalent to calculating the completeness (relative to the entire population) given the output of a Discovery_N_Chances metric, or the fraction of the population that meets a given cutoff value for Color determination metrics. Any moving object metric that outputs a float value can thus have the 'fraction of the population' with greater than X value calculated here, as a summary statistic. Parameters ---------- threshold : int, opt Count the fraction of the population that exceeds this value. Default = 1. nbins : int, opt If the H values for the metric are not a cloned distribution, then split up H into this many bins. Default 20. minHrange : float, opt If the H values for the metric are not a cloned distribution, then split up H into at least this range (otherwise just use the min/max of the H values). Default 1.0 cumulative : bool, opt If False, simply report the differential fractional value (or differential completeness). If True, integrate over the H distribution (using IntegrateOverH) to report a cumulative fraction. Default None which becomes True; if metricName is set and starts with 'Differential' this will then set to False. Hindex : float, opt Use Hindex as the power law to integrate over H, if cumulative is True. Default 0.3. """ def __init__(self, threshold=1, nbins=20, minHrange=1.0, cumulative=None, Hindex=0.33, **kwargs): if cumulative is None: # if metricName does not start with 'differential', then cumulative->True if 'metricName' not in kwargs: self.cumulative = True metricName = 'CumulativeCompleteness' else: # 'metricName' in kwargs: metricName = kwargs.pop('metricName') if metricName.lower().startswith('differential'): self.cumulative=False else: self.cumulative=True else: # cumulative was set self.cumulative = cumulative if 'metricName' in kwargs: metricName = kwargs.pop('metricName') if metricName.lower().startswith('differential') and self.cumulative: warnings.warn(f'Completeness metricName is {metricName} but cumulative is True') else: if self.cumulative: metricName = 'CumulativeCompleteness' else: metricName = 'DifferentialCompleteness' if self.cumulative: units = "<=H" else: units = "@H" super().__init__(metricName=metricName, units=units, **kwargs) self.threshold = threshold # If H is not a cloned distribution, then we need to specify how to bin these values. self.nbins = nbins self.minHrange = minHrange self.Hindex = Hindex
[docs] def run(self, metricValues, Hvals): nSsos = metricValues.shape[0] nHval = len(Hvals) metricValH = metricValues.swapaxes(0, 1) if nHval == metricValues.shape[1]: # Hvals array is probably the same as the cloned H array. completeness = np.zeros(len(Hvals), float) for i, H in enumerate(Hvals): completeness[i] = np.where(metricValH[i].filled(0) >= self.threshold)[0].size completeness = completeness / float(nSsos) else: # The Hvals are spread more randomly among the objects (we probably used one per object). hrange = Hvals.max() - Hvals.min() minH = Hvals.min() if hrange < self.minHrange: hrange = self.minHrange minH = Hvals.min() - hrange/2.0 stepsize = hrange / float(self.nbins) bins = np.arange(minH, minH + hrange + stepsize/2.0, stepsize) Hvals = bins[:-1] n_all, b = np.histogram(metricValH[0], bins) condition = np.where(metricValH[0] >= self.threshold)[0] n_found, b = np.histogram(metricValH[0][condition], bins) completeness = n_found.astype(float) / n_all.astype(float) completeness = np.where(n_all==0, 0, completeness) if self.cumulative: completenessInt = integrateOverH(completeness, Hvals, self.Hindex) summaryVal = np.empty(len(completenessInt), dtype=[('name', np.str_, 20), ('value', float)]) summaryVal['value'] = completenessInt for i, Hval in enumerate(Hvals): summaryVal['name'][i] = 'H <= %f' % (Hval) else: summaryVal = np.empty(len(completeness), dtype=[('name', np.str_, 20), ('value', float)]) summaryVal['value'] = completeness for i, Hval in enumerate(Hvals): summaryVal['name'][i] = 'H = %f' % (Hval) return summaryVal
[docs]class MoCompletenessAtTimeMetric(BaseMoMetric): """Calculate the completeness (relative to the entire population) <= a given H as a function of time, given the times of each discovery. Input values of the discovery times can come from the Discovery_Time (child) metric or the KnownObjects metric. Parameters ---------- times : numpy.ndarray like The bins to distribute the discovery times into. Same units as the discovery time (typically MJD). Hval : float, opt The value of H to count completeness at (or cumulative completeness to). Default None, in which case a value halfway through Hvals (the slicer H range) will be chosen. cumulative : bool, opt If True, calculate the cumulative completeness (completeness <= H). If False, calculate the differential completeness (completeness @ H). Default None which becomes 'True' unless metricName starts with 'differential'. Hindex : float, opt Use Hindex as the power law to integrate over H, if cumulative is True. Default 0.3. """ def __init__(self, times, Hval=None, cumulative=None, Hindex=0.33, **kwargs): self.Hval = Hval self.times = times self.Hindex = Hindex if cumulative is None: # if metricName does not start with 'differential', then cumulative->True if 'metricName' not in kwargs: self.cumulative = True metricName = 'CumulativeCompleteness@Time@H=%.2f' % self.Hval else: # 'metricName' in kwargs: metricName = kwargs.pop('metricName') if metricName.lower().startswith('differential'): self.cumulative=False else: self.cumulative=True else: # cumulative was set self.cumulative = cumulative if 'metricName' in kwargs: metricName = kwargs.pop('metricName') if metricName.lower().startswith('differential') and self.cumulative: warnings.warn(f'Completeness metricName is {metricName} but cumulative is True') else: if self.cumulative: metricName = 'CumulativeCompleteness@Time@H=%.2f' % self.Hval else: metricName = 'DifferentialCompleteness@Time@H=%.2f' % self.Hval self._setLabels() super().__init__(metricName=metricName, units=self.units, **kwargs) def _setLabels(self): if self.Hval is not None: if self.cumulative: self.units = 'H <=%.1f' % (self.Hval) else: self.units = 'H = %.1f' % (self.Hval) else: self.units = 'H'
[docs] def run(self, discoveryTimes, Hvals): if len(Hvals) != discoveryTimes.shape[1]: warnings.warn("This summary metric expects cloned H distribution. Cannot calculate summary.") return nSsos = discoveryTimes.shape[0] timesinH = discoveryTimes.swapaxes(0, 1) completenessH = np.empty([len(Hvals), len(self.times)], float) for i, H in enumerate(Hvals): n, b = np.histogram(timesinH[i].compressed(), bins=self.times) completenessH[i][0] = 0 completenessH[i][1:] = n.cumsum() completenessH = completenessH / float(nSsos) completeness = completenessH.swapaxes(0, 1) if self.cumulative: for i, t in enumerate(self.times): completeness[i] = integrateOverH(completeness[i], Hvals) # To save the summary statistic, we must pick out a given H value. if self.Hval is None: Hidx = len(Hvals) // 2 self.Hval = Hvals[Hidx] self._setLabels() else: Hidx = np.where(np.abs(Hvals - self.Hval) == np.abs(Hvals - self.Hval).min())[0][0] self.Hval = Hvals[Hidx] self._setLabels() summaryVal = np.empty(len(self.times), dtype=[('name', np.str_, 20), ('value', float)]) summaryVal['value'] = completeness[:, Hidx] for i, time in enumerate(self.times): summaryVal['name'][i] = '%s @ %.2f' % (self.units, time) return summaryVal