Source code for lsst.sims.maf.utils.snUtils

import numpy as np
import matplotlib.pyplot as plt
from scipy import interpolate
import numpy.lib.recfunctions as rf


[docs]class Lims: """ class to handle light curve of SN Parameters --------------- Li_files : str light curve reference file mag_to_flux_files : str files of magnitude to flux band : str band considered SNR : float Signal-To-Noise Ratio cut mag_range : pair(float),opt mag range considered Default : (23., 27.5) dt_range : pair(float) difference time range considered (cadence) Default : (0.5, 25.) """ def __init__(self, Li_files, mag_to_flux_files, band, SNR, mag_range=(23., 27.5), dt_range=(0.5, 25.)): self.band = band self.SNR = SNR self.lims = [] self.mag_to_flux = [] self.mag_range = mag_range self.dt_range = dt_range for val in Li_files: self.lims.append(self.get_lims(self.band, np.load(val), SNR)) for val in mag_to_flux_files: self.mag_to_flux.append(np.load(val)) self.interp()
[docs] def get_lims(self, band, tab, SNR): """ Estimations of the limits Parameters --------------- band : str band to consider tab : numpy array table of data SNR : float Signal-to-Noise Ratio cut Returns: ----------- dict of limits with redshift and band as keys. """ lims = {} for z in np.unique(tab['z']): idx = (tab['z'] == z) & (tab['band'] == 'LSST::'+band) idx &= (tab['flux_e'] > 0.) sel = tab[idx] if len(sel) > 0: li2 = np.sqrt(np.sum(sel['flux_e']**2)) lim = 5. * li2 / SNR if z not in lims.keys(): lims[z] = {} lims[z][band] = lim return lims
[docs] def mesh(self, mag_to_flux): """ Mesh grid to estimate five-sigma depth values (m5) from mags input. Parameters --------------- mag_to_flux : magnitude to flux values Returns ----------- m5 values time difference dt (cadence) metric=sqrt(dt)*F5 where F5 is the 5-sigma flux """ dt = np.linspace(self.dt_range[0], self.dt_range[1], 100) m5 = np.linspace(self.mag_range[0], self.mag_range[1], 50) ida = mag_to_flux['band'] == self.band fa = interpolate.interp1d( mag_to_flux[ida]['m5'], mag_to_flux[ida]['flux_e']) f5 = fa(m5) F5, DT = np.meshgrid(f5, dt) M5, DT = np.meshgrid(m5, dt) metric = np.sqrt(DT) * F5 return M5, DT, metric
[docs] def interp(self): """ Estimate a grid of interpolated values in the plane (m5, cadence, metric) Parameters --------------- None """ M5_all = [] DT_all = [] metric_all = [] for val in self.mag_to_flux: M5, DT, metric = self.mesh(val) M5_all.append(M5) DT_all.append(DT) metric_all.append(metric) sorted_keys = [] for i in range(len(self.lims)): sorted_keys.append(np.sort([k for k in self.lims[i].keys()])[::-1]) figa, axa = plt.subplots() for kk, lim in enumerate(self.lims): fmt = {} ll = [lim[zz][self.band] for zz in sorted_keys[kk]] cs = axa.contour(M5_all[kk], DT_all[kk], metric_all[kk], ll) points_values = None for io, col in enumerate(cs.collections): if col.get_segments(): myarray = col.get_segments()[0] res = np.array(myarray[:, 0], dtype=[('m5', 'f8')]) res = rf.append_fields(res, 'cadence', myarray[:, 1]) res = rf.append_fields( res, 'z', [sorted_keys[kk][io]]*len(res)) if points_values is None: points_values = res else: points_values = np.concatenate((points_values, res)) self.points_ref = points_values plt.close(figa) # do not display
[docs] def interp_griddata(self, data): """ Estimate metric interpolation for data (m5,cadence) Parameters --------------- data : data where interpolation has to be done (m5,cadence) Returns ----------- griddata interpolation (m5,cadence,metric) """ ref_points = self.points_ref res = interpolate.griddata((ref_points['m5'], ref_points['cadence']), ref_points['z'], ( data['m5_mean'], data['cadence_mean']), method='cubic') return res
[docs]class GenerateFakeObservations: """ Class to generate Fake observations Parameters --------- config: yaml-like configuration file (parameter choice: filter, cadence, m5,Nseasons, ...) list : str,opt Name of the columns used. Default : 'observationStartMJD', 'fieldRA', 'fieldDec','filter','fiveSigmaDepth','visitExposureTime','numExposures','visitTime','season' Returns --------- recordarray of observations with the fields: MJD, Ra, Dec, band,m5,Nexp, ExpTime, Season """ def __init__(self, config, mjdCol='observationStartMJD', RaCol='fieldRA', DecCol='fieldDec', filterCol='filter', m5Col='fiveSigmaDepth', exptimeCol='visitExposureTime', nexpCol='numExposures', seasonCol='season'): self.mjdCol = mjdCol self.m5Col = m5Col self.filterCol = filterCol self.RaCol = RaCol self.DecCol = DecCol self.exptimeCol = exptimeCol self.seasonCol = seasonCol self.nexpCol = nexpCol # now make fake obs self.make_fake(config)
[docs] def make_fake(self, config): """ Generate Fake observations Parameters --------- config: yaml-like configuration file (parameter choice: filter, cadence, m5,Nseasons, ...) """ bands = config['bands'] cadence = dict(zip(bands, config['Cadence'])) shift_days = dict( zip(bands, [config['shift_days']*io for io in range(len(bands))])) m5 = dict(zip(bands, config['m5'])) Nvisits = dict(zip(bands, config['Nvisits'])) Exposure_Time = dict(zip(bands, config['Exposure_Time'])) inter_season_gap = 300. Ra = config['Ra'] Dec = config['Dec'] rtot = [] # for season in range(1, config['nseasons']+1): for il, season in enumerate(config['seasons']): # mjd_min = config['MJD_min'] + float(season-1)*inter_season_gap mjd_min = config['MJD_min'][il] mjd_max = mjd_min+config['season_length'] for i, band in enumerate(bands): mjd = np.arange(mjd_min, mjd_max+cadence[band], cadence[band]) mjd += shift_days[band] m5_coadded = self.m5_coadd(m5[band], Nvisits[band], Exposure_Time[band]) myarr = np.array(mjd, dtype=[(self.mjdCol, 'f8')]) myarr = rf.append_fields(myarr, [self.RaCol, self.DecCol, self.filterCol], [ [Ra]*len(myarr), [Dec]*len(myarr), [band]*len(myarr)]) myarr = rf.append_fields(myarr, [self.m5Col, self.nexpCol, self.exptimeCol, self.seasonCol], [ [m5_coadded]*len(myarr), [Nvisits[band]]*len(myarr), [Nvisits[band]*Exposure_Time[band]]*len(myarr), [season]*len(myarr)]) rtot.append(myarr) res = np.copy(np.concatenate(rtot)) res.sort(order=self.mjdCol) self.Observations = res
[docs] def m5_coadd(self, m5, Nvisits, Tvisit): """ Coadded m5 estimation Parameters --------- m5 : list(float) list of five-sigma depth values Nvisits : list(float) list of the number of visits Tvisit : list(float) list of the visit times Returns --------- m5_coadd : list(float) list of m5 coadded values """ m5_coadd = m5+1.25*np.log10(float(Nvisits)*Tvisit/30.) return m5_coadd
[docs]class ReferenceData: """ class to handle light curve of SN Parameters --------------- Li_files : str light curve reference file mag_to_flux_files : str files of magnitude to flux band : str band considered z : float redshift considered """ def __init__(self, Li_files, mag_to_flux_files, band, z): self.band = band self.z = z self.fluxes = [] self.mag_to_flux = [] for val in Li_files: self.fluxes.append(self.interp_fluxes( self.band, np.load(val), self.z)) for val in mag_to_flux_files: self.mag_to_flux.append( self.interp_mag(self.band, np.load(val)))
[docs] def interp_fluxes(self, band, tab, z): """ Flux interpolator Parameters --------------- band : str band considered tab : array reference data with (at least) fields z,band,time,DayMax z : float redshift considered Returns ----- list (float) of interpolated fluxes (in e/sec) """ lims = {} idx = (np.abs(tab['z'] - z) < 1.e-5) & (tab['band'] == 'LSST::'+band) sel = tab[idx] selc = np.copy(sel) difftime = (sel['time']-sel['DayMax']) selc = rf.append_fields(selc, 'deltaT', difftime) return interpolate.interp1d(selc['deltaT'], selc['flux_e'], bounds_error=False, fill_value=0.)
[docs] def interp_mag(self, band, tab): """ magnitude (m5) to flux (e/sec) interpolator Parameters --------------- band : str band considered tab : array reference data with (at least) fields band,m5,flux_e, z : float redshift considered Returns ----- list (float) of interpolated fluxes (in e/sec) """ idx = tab['band'] == band sel = tab[idx] return interpolate.interp1d(sel['m5'], sel['flux_e'], bounds_error=False, fill_value=0.)