import numpy as np
from .baseMetric import BaseMetric
import lsst.sims.maf.utils as mafUtils
import lsst.sims.utils as utils
from scipy.optimize import curve_fit
from builtins import str
__all__ = ['ParallaxMetric', 'ProperMotionMetric', 'RadiusObsMetric',
'ParallaxCoverageMetric', 'ParallaxDcrDegenMetric']
[docs]class ParallaxMetric(BaseMetric):
"""Calculate the uncertainty in a parallax measurement given a series of observations.
Uses columns ra_pi_amp and dec_pi_amp, calculated by the ParallaxFactorStacker.
Parameters
----------
metricName : str, opt
Default 'parallax'.
m5Col : str, opt
The default column name for m5 information in the input data. Default fiveSigmaDepth.
filterCol : str, opt
The column name for the filter information. Default filter.
seeingCol : str, opt
The column name for the seeing information. Since the astrometry errors are based on the physical
size of the PSF, this should be the FWHM of the physical psf. Default seeingFwhmGeom.
rmag : float, opt
The r magnitude of the fiducial star in r band. Other filters are sclaed using sedTemplate keyword.
Default 20.0
SedTemplate : str, opt
The template to use. This can be 'flat' or 'O','B','A','F','G','K','M'. Default flat.
atm_err : float, opt
The expected centroiding error due to the atmosphere, in arcseconds. Default 0.01.
normalize : boolean, opt
Compare the astrometric uncertainty to the uncertainty that would result if half the observations
were taken at the start and half at the end. A perfect survey will have a value close to 1, while
a poorly scheduled survey will be close to 0. Default False.
badval : float, opt
The value to return when the metric value cannot be calculated. Default -666.
"""
def __init__(self, metricName='parallax', m5Col='fiveSigmaDepth',
filterCol='filter', seeingCol='seeingFwhmGeom', rmag=20.,
SedTemplate='flat', badval=-666,
atm_err=0.01, normalize=False, **kwargs):
Cols = [m5Col, filterCol, seeingCol, 'ra_pi_amp', 'dec_pi_amp']
if normalize:
units = 'ratio'
else:
units = 'mas'
super(ParallaxMetric, self).__init__(Cols, metricName=metricName, units=units,
badval=badval, **kwargs)
# set return type
self.m5Col = m5Col
self.seeingCol = seeingCol
self.filterCol = filterCol
filters = ['u', 'g', 'r', 'i', 'z', 'y']
self.mags = {}
if SedTemplate == 'flat':
for f in filters:
self.mags[f] = rmag
else:
self.mags = utils.stellarMags(SedTemplate, rmag=rmag)
self.atm_err = atm_err
self.normalize = normalize
self.comment = 'Estimated uncertainty in parallax measurement ' \
'(assuming no proper motion or that proper motion '
self.comment += 'is well fit). Uses measurements in all bandpasses, ' \
'and estimates astrometric error based on SNR '
self.comment += 'in each visit. '
if SedTemplate == 'flat':
self.comment += 'Assumes a flat SED. '
if self.normalize:
self.comment += 'This normalized version of the metric displays the ' \
'estimated uncertainty in the parallax measurement, '
self.comment += 'divided by the minimum parallax uncertainty possible ' \
'(if all visits were six '
self.comment += 'months apart). Values closer to 1 indicate more optimal ' \
'scheduling for parallax measurement.'
def _final_sigma(self, position_errors, ra_pi_amp, dec_pi_amp):
"""Assume parallax in RA and DEC are fit independently, then combined.
All inputs assumed to be arcsec """
sigma_A = position_errors/ra_pi_amp
sigma_B = position_errors/dec_pi_amp
sigma_ra = np.sqrt(1./np.sum(1./sigma_A**2))
sigma_dec = np.sqrt(1./np.sum(1./sigma_B**2))
# Combine RA and Dec uncertainties, convert to mas
sigma = np.sqrt(1./(1./sigma_ra**2+1./sigma_dec**2))*1e3
return sigma
[docs] def run(self, dataslice, slicePoint=None):
filters = np.unique(dataslice[self.filterCol])
if hasattr(filters[0], 'decode'):
filters = [str(f.decode('utf-8')) for f in filters]
snr = np.zeros(len(dataslice), dtype='float')
# compute SNR for all observations
for filt in filters:
good = np.where(dataslice[self.filterCol] == filt)
snr[good] = mafUtils.m52snr(self.mags[str(filt)], dataslice[self.m5Col][good])
position_errors = np.sqrt(mafUtils.astrom_precision(dataslice[self.seeingCol],
snr)**2+self.atm_err**2)
sigma = self._final_sigma(position_errors, dataslice['ra_pi_amp'], dataslice['dec_pi_amp'])
if self.normalize:
# Leave the dec parallax as zero since one can't have ra and dec maximized at the same time.
sigma = self._final_sigma(position_errors,
dataslice['ra_pi_amp']*0+1., dataslice['dec_pi_amp']*0)/sigma
return sigma
[docs]class ProperMotionMetric(BaseMetric):
"""Calculate the uncertainty in the returned proper motion.
This metric assumes gaussian errors in the astrometry measurements.
Parameters
----------
metricName : str, opt
Default 'properMotion'.
m5Col : str, opt
The default column name for m5 information in the input data. Default fiveSigmaDepth.
mjdCol : str, opt
The column name for the exposure time. Default observationStartMJD.
filterCol : str, opt
The column name for the filter information. Default filter.
seeingCol : str, opt
The column name for the seeing information. Since the astrometry errors are based on the physical
size of the PSF, this should be the FWHM of the physical psf. Default seeingFwhmGeom.
rmag : float, opt
The r magnitude of the fiducial star in r band. Other filters are sclaed using sedTemplate keyword.
Default 20.0
SedTemplate : str, opt
The template to use. This can be 'flat' or 'O','B','A','F','G','K','M'. Default flat.
atm_err : float, opt
The expected centroiding error due to the atmosphere, in arcseconds. Default 0.01.
normalize : boolean, opt
Compare the astrometric uncertainty to the uncertainty that would result if half the observations
were taken at the start and half at the end. A perfect survey will have a value close to 1, while
a poorly scheduled survey will be close to 0. Default False.
baseline : float, opt
The length of the survey used for the normalization, in years. Default 10.
badval : float, opt
The value to return when the metric value cannot be calculated. Default -666.
"""
def __init__(self, metricName='properMotion',
m5Col='fiveSigmaDepth', mjdCol='observationStartMJD',
filterCol='filter', seeingCol='seeingFwhmGeom', rmag=20.,
SedTemplate='flat', badval= -666,
atm_err=0.01, normalize=False,
baseline=10., **kwargs):
cols = [m5Col, mjdCol, filterCol, seeingCol]
if normalize:
units = 'ratio'
else:
units = 'mas/yr'
super(ProperMotionMetric, self).__init__(col=cols, metricName=metricName, units=units,
badval=badval, **kwargs)
# set return type
self.mjdCol = mjdCol
self.seeingCol = seeingCol
self.m5Col = m5Col
filters = ['u', 'g', 'r', 'i', 'z', 'y']
self.mags = {}
if SedTemplate == 'flat':
for f in filters:
self.mags[f] = rmag
else:
self.mags = utils.stellarMags(SedTemplate, rmag=rmag)
self.atm_err = atm_err
self.normalize = normalize
self.baseline = baseline
self.comment = 'Estimated uncertainty of the proper motion fit ' \
'(assuming no parallax or that parallax is well fit). '
self.comment += 'Uses visits in all bands, and generates approximate ' \
'astrometric errors using the SNR in each visit. '
if SedTemplate == 'flat':
self.comment += 'Assumes a flat SED. '
if self.normalize:
self.comment += 'This normalized version of the metric represents ' \
'the estimated uncertainty in the proper '
self.comment += 'motion divided by the minimum uncertainty possible ' \
'(if all visits were '
self.comment += 'obtained on the first and last days of the survey). '
self.comment += 'Values closer to 1 indicate more optimal scheduling.'
[docs] def run(self, dataslice, slicePoint=None):
filters = np.unique(dataslice['filter'])
filters = [str(f) for f in filters]
precis = np.zeros(dataslice.size, dtype='float')
for f in filters:
observations = np.where(dataslice['filter'] == f)
if np.size(observations[0]) < 2:
precis[observations] = self.badval
else:
snr = mafUtils.m52snr(self.mags[f],
dataslice[self.m5Col][observations])
precis[observations] = mafUtils.astrom_precision(
dataslice[self.seeingCol][observations], snr)
precis[observations] = np.sqrt(precis[observations]**2 + self.atm_err**2)
good = np.where(precis != self.badval)
result = mafUtils.sigma_slope(dataslice[self.mjdCol][good], precis[good])
result = result*365.25*1e3 # Convert to mas/yr
if (self.normalize) & (good[0].size > 0):
new_dates = dataslice[self.mjdCol][good]*0
nDates = new_dates.size
new_dates[nDates//2:] = self.baseline*365.25
result = (mafUtils.sigma_slope(new_dates, precis[good])*365.25*1e3)/result
# Observations that are very close together can still fail
if np.isnan(result):
result = self.badval
return result
[docs]class ParallaxCoverageMetric(BaseMetric):
"""
Check how well the parallax factor is distributed. Subtracts the weighted mean position of the
parallax offsets, then computes the weighted mean radius of the points.
If points are well distributed, the mean radius will be near 1. If phase coverage is bad,
radius will be close to zero.
For points on the Ecliptic, uniform sampling should result in a metric value of ~0.5.
At the poles, uniform sampling would result in a metric value of ~1.
Conceptually, it is helpful to remember that the parallax motion of a star at the pole is
a (nearly circular) ellipse while the motion of a star on the ecliptic is a straight line. Thus, any
pair of observations separated by 6 months will give the full parallax range for a star on the pole
but only observations on very specific dates will give the full range for a star on the ecliptic.
Optionally also demand that there are observations above the snrLimit kwarg spanning thetaRange radians.
Parameters
----------
m5Col: str, opt
Column name for individual visit m5. Default fiveSigmaDepth.
mjdCol: str, opt
Column name for exposure time dates. Default observationStartMJD.
filterCol: str, opt
Column name for filter. Default filter.
seeingCol: str, opt
Column name for seeing (assumed FWHM). Default seeingFwhmGeom.
rmag: float, opt
Magnitude of fiducial star in r filter. Other filters are scaled using sedTemplate keyword.
Default 20.0
sedTemplate: str, opt
Template to use (can be 'flat' or 'O','B','A','F','G','K','M'). Default 'flat'.
atm_err: float, opt
Centroiding error due to atmosphere in arcsec. Default 0.01 (arcseconds).
thetaRange: float, opt
Range of parallax offset angles to demand (in radians). Default=0 (means no range requirement).
snrLimit: float, opt
Only include points above the snrLimit when computing thetaRange. Default 5.
Returns
--------
metricValue: float
Returns a weighted mean of the length of the parallax factor vectors.
Values near 1 imply that the points are well distributed.
Values near 0 imply that the parallax phase coverage is bad.
Near the ecliptic, uniform sampling results in metric values of about 0.5.
Notes
-----
Uses the ParallaxFactor stacker to calculate ra_pi_amp and dec_pi_amp.
"""
def __init__(self, metricName='ParallaxCoverageMetric', m5Col='fiveSigmaDepth',
mjdCol='observationStartMJD', filterCol='filter', seeingCol='seeingFwhmGeom',
rmag=20., SedTemplate='flat',
atm_err=0.01, thetaRange=0., snrLimit=5, **kwargs):
cols = ['ra_pi_amp', 'dec_pi_amp', m5Col, mjdCol, filterCol, seeingCol]
units = 'ratio'
super(ParallaxCoverageMetric, self).__init__(cols,
metricName=metricName, units=units,
**kwargs)
self.m5Col = m5Col
self.seeingCol = seeingCol
self.filterCol = filterCol
self.mjdCol = mjdCol
# Demand the range of theta values
self.thetaRange = thetaRange
self.snrLimit = snrLimit
filters = ['u', 'g', 'r', 'i', 'z', 'y']
self.mags = {}
if SedTemplate == 'flat':
for f in filters:
self.mags[f] = rmag
else:
self.mags = utils.stellarMags(SedTemplate, rmag=rmag)
self.atm_err = atm_err
caption = "Parallax factor coverage for an r=%.2f star (0 is bad, 0.5-1 is good). " % (rmag)
caption += "One expects the parallax factor coverage to vary because stars on the ecliptic "
caption += "can be observed when they have no parallax offset while stars at the pole are always "
caption += "offset by the full parallax offset."""
self.comment = caption
def _thetaCheck(self, ra_pi_amp, dec_pi_amp, snr):
good = np.where(snr >= self.snrLimit)
theta = np.arctan2(dec_pi_amp[good], ra_pi_amp[good])
# Make values between 0 and 2pi
theta = theta-np.min(theta)
result = 0.
if np.max(theta) >= self.thetaRange:
# Check that things are in differnet quadrants
theta = (theta+np.pi) % 2.*np.pi
theta = theta-np.min(theta)
if np.max(theta) >= self.thetaRange:
result = 1
return result
def _computeWeights(self, dataSlice, snr):
# Compute centroid uncertainty in each visit
position_errors = np.sqrt(mafUtils.astrom_precision(dataSlice[self.seeingCol],
snr)**2+self.atm_err**2)
weights = 1./position_errors**2
return weights
def _weightedR(self, dec_pi_amp, ra_pi_amp, weights):
ycoord = dec_pi_amp-np.average(dec_pi_amp, weights=weights)
xcoord = ra_pi_amp-np.average(ra_pi_amp, weights=weights)
radius = np.sqrt(xcoord**2+ycoord**2)
aveRad = np.average(radius, weights=weights)
return aveRad
[docs] def run(self, dataSlice, slicePoint=None):
if np.size(dataSlice) < 2:
return self.badval
filters = np.unique(dataSlice[self.filterCol])
filters = [str(f) for f in filters]
snr = np.zeros(len(dataSlice), dtype='float')
# compute SNR for all observations
for filt in filters:
inFilt = np.where(dataSlice[self.filterCol] == filt)
snr[inFilt] = mafUtils.m52snr(self.mags[str(filt)], dataSlice[self.m5Col][inFilt])
weights = self._computeWeights(dataSlice, snr)
aveR = self._weightedR(dataSlice['ra_pi_amp'], dataSlice['dec_pi_amp'], weights)
if self.thetaRange > 0:
thetaCheck = self._thetaCheck(dataSlice['ra_pi_amp'], dataSlice['dec_pi_amp'], snr)
else:
thetaCheck = 1.
result = aveR*thetaCheck
return result
[docs]class ParallaxDcrDegenMetric(BaseMetric):
"""Use the full parallax and DCR displacement vectors to find if they are degenerate.
Parameters
----------
metricName : str, opt
Default 'ParallaxDcrDegenMetric'.
seeingCol : str, opt
Default 'FWHMgeom'
m5Col : str, opt
Default 'fiveSigmaDepth'
filterCol : str
Default 'filter'
atm_err : float
Minimum error in photometry centroids introduced by the atmosphere (arcseconds). Default 0.01.
rmag : float
r-band magnitude of the fiducual star that is being used (mag).
SedTemplate : str
The SED template to use for fiducia star colors, passed to lsst.sims.utils.stellarMags.
Default 'flat'
tol : float
Tolerance for how well curve_fit needs to work before believing the covariance result.
Default 0.05.
Returns
-------
metricValue : float
Returns the correlation coefficient between the best-fit parallax amplitude and DCR amplitude.
The RA and Dec offsets are fit simultaneously. Values close to zero are good, values close to +/- 1
are bad. Experience with fitting Monte Carlo simulations suggests the astrometric fits start
becoming poor around a correlation of 0.7.
"""
def __init__(self, metricName='ParallaxDcrDegenMetric', seeingCol='seeingFwhmGeom',
m5Col='fiveSigmaDepth', atm_err=0.01, rmag=20., SedTemplate='flat',
filterCol='filter', tol=0.05, **kwargs):
self.m5Col = m5Col
self.seeingCol = seeingCol
self.filterCol = filterCol
self.tol = tol
units = 'Correlation'
# just put all the columns that all the stackers will need here?
cols = ['ra_pi_amp', 'dec_pi_amp', 'ra_dcr_amp', 'dec_dcr_amp',
seeingCol, m5Col]
super(ParallaxDcrDegenMetric, self).__init__(cols, metricName=metricName, units=units,
**kwargs)
self.filters = ['u', 'g', 'r', 'i', 'z', 'y']
self.mags = {}
if SedTemplate == 'flat':
for f in self.filters:
self.mags[f] = rmag
else:
self.mags = utils.stellarMags(SedTemplate, rmag=rmag)
self.atm_err = atm_err
def _positions(self, x, a, b):
"""
Function to find parallax and dcr amplitudes
x should be a vector with [[parallax_x1, parallax_x2..., parallax_y1, parallax_y2...],
[dcr_x1, dcr_x2..., dcr_y1, dcr_y2...]]
"""
result = a*x[0, :] + b*x[1, :]
return result
[docs] def run(self, dataSlice, slicePoint=None):
# The idea here is that we calculate position errors (in RA and Dec) for all observations.
# Then we generate arrays of the parallax offsets (delta RA parallax = ra_pi_amp, etc)
# and the DCR offsets (delta RA DCR = ra_dcr_amp, etc), and just add them together into one
# RA (and Dec) offset. Then, we try to fit for how we combined these offsets, but while
# considering the astrometric noise. If we can figure out that we just added them together
# (i.e. the curve_fit result is [a=1, b=1] for the function _positions above)
# then we should be able to disentangle the parallax and DCR offsets when fitting 'for real'.
# compute SNR for all observations
snr = np.zeros(len(dataSlice), dtype='float')
for filt in self.filters:
inFilt = np.where(dataSlice[self.filterCol] == filt)
snr[inFilt] = mafUtils.m52snr(self.mags[filt], dataSlice[self.m5Col][inFilt])
# Compute the centroiding uncertainties
# Note that these centroiding uncertainties depend on the physical size of the PSF, thus
# we are using seeingFwhmGeom for these metrics, not seeingFwhmEff.
position_errors = np.sqrt(mafUtils.astrom_precision(dataSlice[self.seeingCol], snr)**2 +
self.atm_err**2)
# Construct the vectors of RA/Dec offsets. xdata is the "input data". ydata is the "output".
xdata = np.empty((2, dataSlice.size * 2), dtype=float)
xdata[0, :] = np.concatenate((dataSlice['ra_pi_amp'], dataSlice['dec_pi_amp']))
xdata[1, :] = np.concatenate((dataSlice['ra_dcr_amp'], dataSlice['dec_dcr_amp']))
ydata = np.sum(xdata, axis=0)
# Use curve_fit to compute covariance between parallax and dcr amplitudes
# Set the initial guess slightly off from the correct [1,1] to make sure it iterates.
popt, pcov = curve_fit(self._positions, xdata, ydata, p0=[1.1, 0.9],
sigma=np.concatenate((position_errors, position_errors)),
absolute_sigma=True)
# Catch if the fit failed to converge on the correct solution.
if np.max(np.abs(popt - np.array([1., 1.]))) > self.tol:
return self.badval
# Covariance between best fit parallax amplitude and DCR amplitude.
cov = pcov[1, 0]
# Convert covarience between parallax and DCR amplitudes to normalized correlation
perr = np.sqrt(np.diag(pcov))
correlation = cov/(perr[0]*perr[1])
result = correlation
# This can throw infs.
if np.isinf(result):
result = self.badval
return result
def calcDist_cosines(RA1, Dec1, RA2, Dec2):
# Taken from simSelfCalib.py
"""Calculates distance on a sphere using spherical law of cosines.
Give this function RA/Dec values in radians. Returns angular distance(s), in radians.
Note that since this is all numpy, you could input arrays of RA/Decs."""
# This formula can have rounding errors for case where distances are small.
# Oh, the joys of wikipedia - http://en.wikipedia.org/wiki/Great-circle_distance
# For the purposes of these calculations, this is probably accurate enough.
D = np.sin(Dec2)*np.sin(Dec1) + np.cos(Dec1)*np.cos(Dec2)*np.cos(RA2-RA1)
D = np.arccos(D)
return D
[docs]class RadiusObsMetric(BaseMetric):
"""find the radius in the focal plane. returns things in degrees."""
def __init__(self, metricName='radiusObs', raCol='fieldRA', decCol='fieldDec',
units='radians', **kwargs):
self.raCol = raCol
self.decCol = decCol
super(RadiusObsMetric, self).__init__(col=[self.raCol, self.decCol],
metricName=metricName, units=units, **kwargs)
[docs] def run(self, dataSlice, slicePoint):
ra = slicePoint['ra']
dec = slicePoint['dec']
distances = calcDist_cosines(ra, dec, np.radians(dataSlice[self.raCol]),
np.radians(dataSlice[self.decCol]))
distances = np.degrees(distances)
return distances
[docs] def reduceMean(self, distances):
return np.mean(distances)
[docs] def reduceRMS(self, distances):
return np.std(distances)
[docs] def reduceFullRange(self, distances):
return np.max(distances)-np.min(distances)