Source code for lsst.sims.maf.stackers.generalStackers

import warnings
import numpy as np
import palpy
from lsst.sims.utils import Site, m5_flat_sed, xyz_from_ra_dec, xyz_angular_radius, \
    _buildTree, _xyz_from_ra_dec
from lsst.sims.survey.fields import FieldsDatabase
from .baseStacker import BaseStacker

__all__ = ['NormAirmassStacker', 'ParallaxFactorStacker', 'HourAngleStacker',
           'FilterColorStacker', 'ZenithDistStacker', 'ParallacticAngleStacker',
           'DcrStacker', 'FiveSigmaStacker', 'OpSimFieldStacker',
           'SaturationStacker']

# Original stackers by Peter Yoachim (yoachim@uw.edu)
# Filter color stacker by Lynne Jones (lynnej@uw.edu)

    
[docs]class SaturationStacker(BaseStacker): """Calculate the saturation limit of a point source. Assumes Guassian PSF. Parameters ---------- pixscale : float, opt (0.2) Arcsec per pixel gain : float, opt (2.3) electrons per adu saturation_e : float, opt (150e3) The saturation level in electrons zeropoints : dict-like, opt (None) The zeropoints for the telescope. Keys should be str with filter names, values in mags. If None, will use Rubin-like zeropoints. km : dict-like, opt (None) Atmospheric extinction values. Keys should be str with filter names. If None, will use Rubin-like zeropoints. """ colsAdded = ['saturation_mag'] def __init__(self, seeingCol='seeingFwhmEff', skybrightnessCol='skyBrightness', exptimeCol='visitExposureTime', nexpCol='numExposures', filterCol='filter', airmassCol='airmass', saturation_e=150e3, zeropoints=None, km=None, pixscale=0.2, gain=1.0): self.units = ['mag'] self.colsReq = [seeingCol, skybrightnessCol, exptimeCol, nexpCol, filterCol, airmassCol] self.seeingCol = seeingCol self.skybrightnessCol = skybrightnessCol self.exptimeCol = exptimeCol self.nexpCol = nexpCol self.filterCol = filterCol self.airmassCol = airmassCol self.saturation_adu = saturation_e/gain self.pixscale = 0.2 names = ['u', 'g', 'r', 'i', 'z', 'y'] types = [float]*6 if zeropoints is None: # Note these zeropoints are calculating the number of *electrons* per second (thus gain=1) # https://github.com/lsst-pst/syseng_throughputs/blob/master/notebooks/Syseng%20Throughputs%20Repo%20Demo.ipynb self.zeropoints = np.array([27.03, 28.38, 28.15, 27.86, 27.46, 26.68]).view(list(zip(names, types))) self.saturation_adu = saturation_e else: self.zeropoints = zeropoints if km is None: # Also from notebook above self.km = np.array([0.491, 0.213, 0.126, 0.096, 0.069, 0.170]).view(list(zip(names, types))) else: self.km = km def _run(self, simData, cols_present=False): for filtername in np.unique(simData[self.filterCol]): in_filt = np.where(simData[self.filterCol] == filtername)[0] # Calculate the length of the on-sky time per EXPOSURE exptime = simData[self.exptimeCol][in_filt] / simData[self.nexpCol][in_filt] # Calculate sky counts per pixel per second from skybrightness + zeropoint (e/1s) sky_counts = 10.**(0.4*(self.zeropoints[filtername] - simData[self.skybrightnessCol][in_filt])) * self.pixscale**2 # Total sky counts in each exposure sky_counts = sky_counts * exptime # The counts available to the source (at peak) in each exposure is the difference between saturation and sky remaining_counts_peak = (self.saturation_adu - sky_counts) # Now to figure out how many counts there would be total, if there are that many in the peak sigma = simData[self.seeingCol][in_filt]/2.354 source_counts = remaining_counts_peak * 2.*np.pi*(sigma/self.pixscale)**2 # source counts = counts per exposure (expTimeCol / nexp) # Translate to counts per second, to apply zeropoint count_rate = source_counts / exptime simData['saturation_mag'][in_filt] = -2.5*np.log10(count_rate) + self.zeropoints[filtername] # Airmass correction simData['saturation_mag'][in_filt] -= self.km[filtername]*(simData[self.airmassCol][in_filt] - 1.) return simData
[docs]class FiveSigmaStacker(BaseStacker): """ Calculate the 5-sigma limiting depth for a point source in the given conditions. This is generally not needed, unless the m5 parameters have been updated or m5 was not previously calculated. """ colsAdded = ['m5_simsUtils'] def __init__(self, airmassCol='airmass', seeingCol='seeingFwhmEff', skybrightnessCol='skyBrightness', filterCol='filter', exptimeCol='visitExposureTime'): self.units = ['mag'] self.colsReq = [airmassCol, seeingCol, skybrightnessCol, filterCol, exptimeCol] self.airmassCol = airmassCol self.seeingCol = seeingCol self.skybrightnessCol = skybrightnessCol self.filterCol = filterCol self.exptimeCol = exptimeCol def _run(self, simData, cols_present=False): if cols_present: # Column already present in data; assume it needs updating and recalculate. return simData filts = np.unique(simData[self.filterCol]) for filtername in filts: infilt = np.where(simData[self.filterCol] == filtername) simData['m5_simsUtils'][infilt] = m5_flat_sed(filtername, simData[infilt][self.skybrightnessCol], simData[infilt][self.seeingCol], simData[infilt][self.exptimeCol], simData[infilt][self.airmassCol]) return simData
[docs]class NormAirmassStacker(BaseStacker): """Calculate the normalized airmass for each opsim pointing. """ colsAdded = ['normairmass'] def __init__(self, airmassCol='airmass', decCol='fieldDec', degrees=True, telescope_lat = -30.2446388): self.units = ['X / Xmin'] self.colsReq = [airmassCol, decCol] self.airmassCol = airmassCol self.decCol = decCol self.telescope_lat = telescope_lat self.degrees = degrees def _run(self, simData, cols_present=False): """Calculate new column for normalized airmass.""" # Run method is required to calculate column. # Driver runs getColInfo to know what columns are needed from db & which are calculated, # then gets data from db and then calculates additional columns (via run methods here). if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData dec = simData[self.decCol] if self.degrees: dec = np.radians(dec) min_z_possible = np.abs(dec - np.radians(self.telescope_lat)) min_airmass_possible = 1./np.cos(min_z_possible) simData['normairmass'] = simData[self.airmassCol] / min_airmass_possible return simData
[docs]class ZenithDistStacker(BaseStacker): """Calculate the zenith distance for each pointing. If 'degrees' is True, then assumes altCol is in degrees and returns degrees. If 'degrees' is False, assumes altCol is in radians and returns radians. """ colsAdded = ['zenithDistance'] def __init__(self, altCol='altitude', degrees=True): self.altCol = altCol self.degrees = degrees if self.degrees: self.units = ['degrees'] else: self.unit = ['radians'] self.colsReq = [self.altCol] def _run(self, simData, cols_present=False): """Calculate new column for zenith distance.""" if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData if self.degrees: simData['zenithDistance'] = 90.0 - simData[self.altCol] else: simData['zenithDistance'] = np.pi/2.0 - simData[self.altCol] return simData
[docs]class ParallaxFactorStacker(BaseStacker): """Calculate the parallax factors for each opsim pointing. Output parallax factor in arcseconds. """ colsAdded = ['ra_pi_amp', 'dec_pi_amp'] def __init__(self, raCol='fieldRA', decCol='fieldDec', dateCol='observationStartMJD', degrees=True): self.raCol = raCol self.decCol = decCol self.dateCol = dateCol self.units = ['arcsec', 'arcsec'] self.colsReq = [raCol, decCol, dateCol] self.degrees = degrees def _gnomonic_project_toxy(self, RA1, Dec1, RAcen, Deccen): """Calculate x/y projection of RA1/Dec1 in system with center at RAcen, Deccenp. Input radians. """ # also used in Global Telescope Network website cosc = np.sin(Deccen) * np.sin(Dec1) + np.cos(Deccen) * np.cos(Dec1) * np.cos(RA1-RAcen) x = np.cos(Dec1) * np.sin(RA1-RAcen) / cosc y = (np.cos(Deccen)*np.sin(Dec1) - np.sin(Deccen)*np.cos(Dec1)*np.cos(RA1-RAcen)) / cosc return x, y def _run(self, simData, cols_present=False): if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData ra_pi_amp = np.zeros(np.size(simData), dtype=[('ra_pi_amp', 'float')]) dec_pi_amp = np.zeros(np.size(simData), dtype=[('dec_pi_amp', 'float')]) ra_geo1 = np.zeros(np.size(simData), dtype='float') dec_geo1 = np.zeros(np.size(simData), dtype='float') ra_geo = np.zeros(np.size(simData), dtype='float') dec_geo = np.zeros(np.size(simData), dtype='float') ra = simData[self.raCol] dec = simData[self.decCol] if self.degrees: ra = np.radians(ra) dec = np.radians(dec) for i, ack in enumerate(simData): mtoa_params = palpy.mappa(2000., simData[self.dateCol][i]) # Object with a 1 arcsec parallax ra_geo1[i], dec_geo1[i] = palpy.mapqk(ra[i], dec[i], 0., 0., 1., 0., mtoa_params) # Object with no parallax ra_geo[i], dec_geo[i] = palpy.mapqk(ra[i], dec[i], 0., 0., 0., 0., mtoa_params) x_geo1, y_geo1 = self._gnomonic_project_toxy(ra_geo1, dec_geo1, ra, dec) x_geo, y_geo = self._gnomonic_project_toxy(ra_geo, dec_geo, ra, dec) # Return ra_pi_amp and dec_pi_amp in arcseconds. ra_pi_amp[:] = np.degrees(x_geo1-x_geo)*3600. dec_pi_amp[:] = np.degrees(y_geo1-y_geo)*3600. simData['ra_pi_amp'] = ra_pi_amp simData['dec_pi_amp'] = dec_pi_amp return simData
[docs]class DcrStacker(BaseStacker): """Calculate the RA,Dec offset expected for an object due to differential chromatic refraction. For DCR calculation, we also need zenithDistance, HA, and PA -- but these will be explicitly handled within this stacker so that setup is consistent and they run in order. If those values have already been calculated elsewhere, they will not be overwritten. Parameters ---------- filterCol : str The name of the column with filter names. Default 'fitler'. altCol : str Name of the column with altitude info. Default 'altitude'. raCol : str Name of the column with RA. Default 'fieldRA'. decCol : str Name of the column with Dec. Default 'fieldDec'. lstCol : str Name of the column with local sidereal time. Default 'observationStartLST'. site : str or lsst.sims.utils.Site Name of the observory or a lsst.sims.utils.Site object. Default 'LSST'. mjdCol : str Name of column with modified julian date. Default 'observationStartMJD' dcr_magnitudes : dict Magitude of the DCR offset for each filter at altitude/zenith distance of 45 degrees. Defaults u=0.07, g=0.07, r=0.50, i=0.045, z=0.042, y=0.04 (all arcseconds). Returns ------- numpy.array Returns array with additional columns 'ra_dcr_amp' and 'dec_dcr_amp' with the DCR offsets for each observation. Also runs ZenithDistStacker and ParallacticAngleStacker. """ colsAdded = ['ra_dcr_amp', 'dec_dcr_amp'] # zenithDist, HA, PA def __init__(self, filterCol='filter', altCol='altitude', degrees=True, raCol='fieldRA', decCol='fieldDec', lstCol='observationStartLST', site='LSST', mjdCol='observationStartMJD', dcr_magnitudes=None): self.units = ['arcsec', 'arcsec'] if dcr_magnitudes is None: # DCR amplitudes are in arcseconds. self.dcr_magnitudes = {'u': 0.07, 'g': 0.07, 'r': 0.050, 'i': 0.045, 'z': 0.042, 'y': 0.04} else: self.dcr_magnitudes = dcr_magnitudes self.zdCol = 'zenithDistance' self.paCol = 'PA' self.filterCol = filterCol self.raCol = raCol self.decCol = decCol self.degrees = degrees self.colsReq = [filterCol, raCol, decCol, altCol, lstCol] # 'zenithDist', 'PA', 'HA' are additional columns required, coming from other stackers which must # also be configured -- so we handle this explicitly here. self.zstacker = ZenithDistStacker(altCol=altCol, degrees=self.degrees) self.pastacker = ParallacticAngleStacker(raCol=raCol, decCol=decCol, mjdCol=mjdCol, degrees=self.degrees, lstCol=lstCol, site=site) # Note that RA/Dec could be coming from a dither stacker! # But we will assume that coord stackers will be handled separately. def _run(self, simData, cols_present=False): if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData # Need to make sure the Zenith stacker gets run first # Call _run method because already added these columns due to 'colsAdded' line. simData = self.zstacker.run(simData) simData = self.pastacker.run(simData) if self.degrees: zenithTan = np.tan(np.radians(simData[self.zdCol])) parallacticAngle = np.radians(simData[self.paCol]) else: zenithTan = np.tan(simData[self.zdCol]) parallacticAngle = simData[self.paCol] dcr_in_ra = zenithTan * np.sin(parallacticAngle) dcr_in_dec = zenithTan * np.cos(parallacticAngle) for filtername in np.unique(simData[self.filterCol]): fmatch = np.where(simData[self.filterCol] == filtername) dcr_in_ra[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_ra[fmatch] dcr_in_dec[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_dec[fmatch] simData['ra_dcr_amp'] = dcr_in_ra simData['dec_dcr_amp'] = dcr_in_dec return simData
[docs]class HourAngleStacker(BaseStacker): """Add the Hour Angle for each observation. Always in HOURS. """ colsAdded = ['HA'] def __init__(self, lstCol='observationStartLST', raCol='fieldRA', degrees=True): self.units = ['Hours'] self.colsReq = [lstCol, raCol] self.lstCol = lstCol self.raCol = raCol self.degrees = degrees def _run(self, simData, cols_present=False): """HA = LST - RA """ if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData if len(simData) == 0: return simData if self.degrees: ra = np.radians(simData[self.raCol]) lst = np.radians(simData[self.lstCol]) else: ra = simData[self.raCol] lst = simData[self.lstCol] # Check that LST is reasonable if (np.min(lst) < 0) | (np.max(lst) > 2.*np.pi): warnings.warn('LST values are not between 0 and 2 pi') # Check that RA is reasonable if (np.min(ra) < 0) | (np.max(ra) > 2.*np.pi): warnings.warn('RA values are not between 0 and 2 pi') ha = lst - ra # Wrap the results so HA between -pi and pi ha = np.where(ha < -np.pi, ha + 2. * np.pi, ha) ha = np.where(ha > np.pi, ha - 2. * np.pi, ha) # Convert radians to hours simData['HA'] = ha*12/np.pi return simData
[docs]class ParallacticAngleStacker(BaseStacker): """Add the parallactic angle to each visit. If 'degrees' is True, this will be in degrees (as are all other angles). If False, then in radians. """ colsAdded = ['PA'] def __init__(self, raCol='fieldRA', decCol='fieldDec', degrees=True, mjdCol='observationStartMJD', lstCol='observationStartLST', site='LSST'): self.lstCol = lstCol self.raCol = raCol self.decCol = decCol self.degrees = degrees self.mjdCol = mjdCol self.site = Site(name=site) self.units = ['radians'] self.colsReq = [self.raCol, self.decCol, self.mjdCol, self.lstCol] self.haStacker = HourAngleStacker(lstCol=lstCol, raCol=raCol, degrees=self.degrees) def _run(self, simData, cols_present=False): # Equation from: # http://www.gb.nrao.edu/~rcreager/GBTMetrology/140ft/l0058/gbtmemo52/memo52.html # or # http://www.gb.nrao.edu/GBT/DA/gbtidl/release2pt9/contrib/contrib/parangle.pro if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData # Using the run method (not _run) means that if HA is present, it will not be recalculated. simData = self.haStacker.run(simData) if self.degrees: dec = np.radians(simData[self.decCol]) else: dec = simData[self.decCol] simData['PA'] = np.arctan2(np.sin(simData['HA']*np.pi/12.), (np.cos(dec) * np.tan(self.site.latitude_rad) - np.sin(dec) * np.cos(simData['HA']*np.pi/12.))) if self.degrees: simData['PA'] = np.degrees(simData['PA']) return simData
[docs]class FilterColorStacker(BaseStacker): """Translate filters ('u', 'g', 'r' ..) into RGB tuples. This is useful for making movies if you want to make the pointing have a related color-tuple for a plot. """ colsAdded = ['rRGB', 'gRGB', 'bRGB'] def __init__(self, filterCol='filter'): self.filter_rgb_map = {'u': (0, 0, 1), # dark blue 'g': (0, 1, 1), # cyan 'r': (0, 1, 0), # green 'i': (1, 0.5, 0.3), # orange 'z': (1, 0, 0), # red 'y': (1, 0, 1)} # magenta self.filterCol = filterCol # self.units used for plot labels self.units = ['rChan', 'gChan', 'bChan'] # Values required for framework operation: this specifies the data columns required from the database. self.colsReq = [self.filterCol] def _run(self, simData, cols_present=False): # Translate filter names into numbers. if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData filtersUsed = np.unique(simData[self.filterCol]) for f in filtersUsed: if f not in self.filter_rgb_map: raise IndexError('Filter %s not in filter_rgb_map' % (f)) match = np.where(simData[self.filterCol] == f)[0] simData['rRGB'][match] = self.filter_rgb_map[f][0] simData['gRGB'][match] = self.filter_rgb_map[f][1] simData['bRGB'][match] = self.filter_rgb_map[f][2] return simData
[docs]class OpSimFieldStacker(BaseStacker): """Add the fieldId of the closest OpSim field for each RA/Dec pointing. Parameters ---------- raCol : str, opt Name of the RA column. Default fieldRA. decCol : str, opt Name of the Dec column. Default fieldDec. """ colsAdded = ['opsimFieldId'] def __init__(self, raCol='fieldRA', decCol='fieldDec', degrees=True): self.colsReq = [raCol, decCol] self.units = ['#'] self.raCol = raCol self.decCol = decCol self.degrees = degrees fields_db = FieldsDatabase() # Returned RA/Dec coordinates in degrees fieldid, ra, dec = fields_db.get_id_ra_dec_arrays("select * from Field;") asort = np.argsort(fieldid) self.tree = _buildTree(np.radians(ra[asort]), np.radians(dec[asort])) def _run(self, simData, cols_present=False): if cols_present: # Column already present in data; assume it is correct and does not need recalculating. return simData if self.degrees: coord_x, coord_y, coord_z = xyz_from_ra_dec(simData[self.raCol], simData[self.decCol]) field_ids = self.tree.query_ball_point(list(zip(coord_x, coord_y, coord_z)), xyz_angular_radius()) else: # use _xyz private method (sending radians) coord_x, coord_y, coord_z = _xyz_from_ra_dec(simData[self.raCol], simData[self.decCol]) field_ids = self.tree.query_ball_point(list(zip(coord_x, coord_y, coord_z)), xyz_angular_radius()) simData['opsimFieldId'] = np.array([ids[0] for ids in field_ids]) + 1 return simData