Source code for lsst.sims.maf.metrics.snSNRMetric

import numpy as np
import matplotlib.pylab as plt
import numpy.lib.recfunctions as rf
from scipy import interpolate
import lsst.sims.maf.metrics as metrics
from lsst.sims.maf.utils.snUtils import GenerateFakeObservations
from collections.abc import Iterable
import time

__all__ = ['SNSNRMetric']

[docs]class SNSNRMetric(metrics.BaseMetric): """ Metric to estimate the detection rate for faint supernovae (x1,color) = (-2.0,0.2) Parameters ---------- list : str, opt Name of the columns used to estimate the metric Default : 'observationStartMJD', 'fieldRA', 'fieldDec','filter','fiveSigmaDepth', 'visitExposureTime','night','observationId', 'numExposures','visitTime' coadd : bool, opt to make "coaddition" per night (uses snStacker) Default : True lim_sn : class, opt Reference data used to simulate LC points (interpolation) names_ref : str,opt names of the simulator used to produce reference data season : flota,opt season num Default : 1. z : float,opt redshift for this study Default : 0.01 """ def __init__(self, metricName='SNSNRMetric', mjdCol='observationStartMJD', RaCol='fieldRA', DecCol='fieldDec', filterCol='filter', m5Col='fiveSigmaDepth', exptimeCol='visitExposureTime', nightCol='night', obsidCol='observationId', nexpCol='numExposures', vistimeCol='visitTime', coadd=True, lim_sn=None, names_ref=None, season=1, z=0.01, **kwargs): self.mjdCol = mjdCol self.m5Col = m5Col self.filterCol = filterCol self.RaCol = RaCol self.DecCol = DecCol self.exptimeCol = exptimeCol self.seasonCol = 'season' self.nightCol = nightCol self.obsidCol = obsidCol self.nexpCol = nexpCol self.vistimeCol = vistimeCol cols = [self.nightCol, self.m5Col, self.filterCol, self.mjdCol, self.obsidCol, self.nexpCol, self.vistimeCol, self.exptimeCol, self.seasonCol] if coadd: cols += ['coadd'] super(SNSNRMetric, self).__init__( col=cols, metricName=metricName, **kwargs) self.filterNames = np.array(['u', 'g', 'r', 'i', 'z', 'y']) self.blue_cutoff = 300. self.red_cutoff = 800. self.min_rf_phase = -20. self.max_rf_phase = 40. self.z = z self.names_ref = names_ref self.season = season # SN DayMax: current date - shift days self.shift = 10. # These are reference LC self.lim_sn = lim_sn self.display = False
[docs] def run(self, dataSlice, slicePoint=None): """ run the metric Parameters ---------- dataSlice : array simulation data under study Returns ------- detection rate : float """ time_ref = time.time() goodFilters = np.in1d(dataSlice['filter'], self.filterNames) dataSlice = dataSlice[goodFilters] if dataSlice.size == 0: return None dataSlice.sort(order=self.mjdCol) if self.season != -1: seasons = self.season else: seasons = np.unique(dataSlice['season']) if not isinstance(seasons, Iterable): seasons = [seasons] self.info_season = None for seas in seasons: info = self.season_info(dataSlice, seas) if info is not None and info['season_length'] >= self.shift: if self.info_season is None: self.info_season = info else: self.info_season = np.concatenate((self.info_season, info)) self.info_season = self.check_seasons(self.info_season) if self.info_season is None: return 0. sel = dataSlice[np.in1d(dataSlice['season'], np.array(seasons))] detect_frac = None if len(sel) >= 5: detect_frac = self.process(sel) if detect_frac is not None: return np.median(detect_frac['frac_obs_{}'.format(self.names_ref[0])]) else: return 0.
[docs] def process(self, sel): """Process one season Parameters --------- sel : array array of observations season : int season number Returns ----- record array with the following fields: fieldRA (float) fieldDec (float) season (float) band (str) frac_obs_name_ref (float) """ self.band = np.unique(sel[self.filterCol])[0] time_ref = time.time() snr_obs = self.snr_slice(sel) # SNR for observations snr_fakes = self.snr_fakes(sel) # SNR for fakes detect_frac = self.detection_rate( snr_obs, snr_fakes) # Detection rate snr_obs = np.asarray(snr_obs) snr_fakes = np.asarray(snr_fakes) #self.plot(snr_obs, snr_fakes) # plt.show() detect_frac = np.asarray(detect_frac) return detect_frac
[docs] def snr_slice(self, dataSlice, j=-1, output_q=None): """ Estimate SNR for a given dataSlice Parameters --------- Input: dataSlice Returns ----- array with the following fields (all are of f8 type, except band which is of U1) SNR_name_ref: Signal-To-Noise Ratio estimator season : season cadence: cadence of the season season_length: length of the season MJD_min: min MJD of the season DayMax: SN max luminosity MJD (aka T0) MJD: m5_eff: mean m5 of obs passing the min_phase, max_phase cut fieldRA: mean field RA fieldDec: mean field Dec band: band m5: mean m5 (over the season) Nvisits: median number of visits (per observation) (over the season) ExposureTime: median exposure time (per observation) (over the season) """ # Get few infos: RA, Dec, Nvisits, m5, exptime fieldRA = np.mean(dataSlice[self.RaCol]) fieldDec = np.mean(dataSlice[self.DecCol]) # one visit = 2 exposures Nvisits = np.median(dataSlice[self.nexpCol]/2.) m5 = np.mean(dataSlice[self.m5Col]) exptime = np.median(dataSlice[self.exptimeCol]) dataSlice.sort(order=self.mjdCol) mjds = dataSlice[self.mjdCol] band = np.unique(dataSlice[self.filterCol])[0] # Define MJDs to consider for metric estimation # basically: step of one day between MJDmin and MJDmax dates = None for val in self.info_season: if dates is None: dates = np.arange( val['MJD_min']+self.shift, val['MJD_max']+1., 1.) else: dates = np.concatenate( (dates, np.arange(val['MJD_min']+self.shift, val['MJD_max']+1., 1.))) # SN DayMax: dates-shift where shift is chosen in the input yaml file T0_lc = dates-self.shift # for these DayMax, estimate the phases of LC points corresponding to the current dataSlice MJDs time_for_lc = -T0_lc[:, None]+mjds phase = time_for_lc/(1.+self.z) # phases of LC points # flag: select LC points only in between min_rf_phase and max_phase phase_max = self.shift/(1.+self.z) flag = (phase >= self.min_rf_phase) & (phase <= phase_max) # tile m5, MJDs, and seasons to estimate all fluxes and SNR at once m5_vals = np.tile(dataSlice[self.m5Col], (len(time_for_lc), 1)) season_vals = np.tile(dataSlice[self.seasonCol], (len(time_for_lc), 1)) # estimate fluxes and snr in SNR function fluxes_tot, snr = self.snr( time_for_lc, m5_vals, flag, season_vals, T0_lc) # now save the results in a record array _, idx = np.unique(snr['season'], return_inverse=True) infos = self.info_season[idx] vars_info = ['cadence', 'season_length', 'MJD_min'] snr = rf.append_fields( snr, vars_info, [infos[name] for name in vars_info]) snr = rf.append_fields(snr, 'DayMax', T0_lc) snr = rf.append_fields(snr, 'MJD', dates) snr = rf.append_fields(snr, 'm5_eff', np.mean( np.ma.array(m5_vals, mask=~flag), axis=1)) global_info = [(fieldRA, fieldDec, band, m5, Nvisits, exptime)]*len(snr) names = ['fieldRA', 'fieldDec', 'band', 'm5', 'Nvisits', 'ExposureTime'] global_info = np.rec.fromrecords(global_info, names=names) snr = rf.append_fields( snr, names, [global_info[name] for name in names]) if output_q is not None: output_q.put({j: snr}) else: return snr
[docs] def season_info(self, dataSlice, season): """ Get info on seasons for each dataSlice Parameters ----- dataSlice : array array of observations Returns ----- recordarray with the following fields: season, cadence, season_length, MJDmin, MJDmax """ rv = [] idx = (dataSlice[self.seasonCol] == season) slice_sel = dataSlice[idx] if len(slice_sel) < 5: return None slice_sel.sort(order=self.mjdCol) mjds_season = slice_sel[self.mjdCol] cadence = np.mean(mjds_season[1:]-mjds_season[:-1]) mjd_min = np.min(mjds_season) mjd_max = np.max(mjds_season) season_length = mjd_max-mjd_min Nvisits = np.median(slice_sel[self.nexpCol]) m5 = np.median(slice_sel[self.m5Col]) rv.append((float(season), cadence, season_length, mjd_min, mjd_max, Nvisits, m5)) info_season = np.rec.fromrecords( rv, names=['season', 'cadence', 'season_length', 'MJD_min', 'MJD_max', 'Nvisits', 'm5']) return info_season
[docs] def snr(self, time_lc, m5_vals, flag, season_vals, T0_lc): """ Estimate SNR vs time Parameters ----------- time_lc : m5_vals : list(float) five-sigme depth values flag : array(bool) flag to be applied (example: selection from phase cut) season_vals : array(float) season values T0_lc : array(float) array of T0 for supernovae Returns ----- fluxes_tot : list(float) list of (interpolated) fluxes snr_tab : array with the following fields: snr_name_ref (float) : Signal-to-Noise values season (float) : season num. """ seasons = np.ma.array(season_vals, mask=~flag) fluxes_tot = {} snr_tab = None for ib, name in enumerate(self.names_ref): fluxes = self.lim_sn.fluxes[ib](time_lc) if name not in fluxes_tot.keys(): fluxes_tot[name] = fluxes else: fluxes_tot[name] = np.concatenate((fluxes_tot[name], fluxes)) flux_5sigma = self.lim_sn.mag_to_flux[ib](m5_vals) snr = fluxes**2/flux_5sigma**2 snr_season = 5.*np.sqrt(np.sum(snr*flag, axis=1)) if snr_tab is None: snr_tab = np.asarray(np.copy(snr_season), dtype=[ ('SNR_'+name, 'f8')]) else: snr_tab = rf.append_fields( snr_tab, 'SNR_'+name, np.copy(snr_season)) """ snr_tab = rf.append_fields( snr_tab, 'season', np.mean(seasons, axis=1)) """ snr_tab = rf.append_fields( snr_tab, 'season', self.get_season(T0_lc)) # check if any masked value remaining # this would correspond to case where no obs point has been selected # ie no points with phase in [phase_min,phase_max] # this happens when internight gaps are large (typically larger than shift) idmask = np.where(snr_tab.mask) if len(idmask) > 0: tofill = np.copy(snr_tab['season']) season_recover = self.get_season( T0_lc[np.where(snr_tab.mask)]) tofill[idmask] = season_recover snr_tab = np.ma.filled(snr_tab, fill_value=tofill) return fluxes_tot, snr_tab
[docs] def get_season(self, T0): """ Estimate the seasons corresponding to T0 values Parameters ------- T0 : list(float) set of T0 values Returns ----- list (float) of corresponding seasons """ diff_min = T0[:, None]-self.info_season['MJD_min'] diff_max = -T0[:, None]+self.info_season['MJD_max'] seasons = np.tile(self.info_season['season'], (len(diff_min), 1)) flag = (diff_min >= 0) & (diff_max >= 0) seasons = np.ma.array(seasons, mask=~flag) return np.mean(seasons, axis=1)
[docs] def snr_fakes(self, dataSlice): """ Estimate SNR for fake observations in the same way as for observations (using SNR_Season) Parameters: ------- dataSlice : array array of observations Returns ----- snr_tab : array with the following fields: snr_name_ref (float) : Signal-to-Noise values season (float) : season num. """ # generate fake observations fake_obs = None # idx = (dataSlice[self.seasonCol] == season) band = np.unique(dataSlice[self.filterCol])[0] fake_obs = self.gen_fakes(dataSlice, band) # estimate SNR vs MJD snr_fakes = self.snr_slice( fake_obs[fake_obs['filter'] == band]) return snr_fakes
[docs] def gen_fakes(self, slice_sel, band): """ Generate fake observations according to observing values extracted from simulations Parameters ----- slice_sel : array array of observations band : str band to consider Returns ----- fake_obs_season : array array of observations with the following fields observationStartMJD (float) fieldRA (float) fieldDec (float) filter (U1) fiveSigmaDepth (float) numExposures (float) visitExposureTime (float) season (int) """ fieldRA = np.mean(slice_sel[self.RaCol]) fieldDec = np.mean(slice_sel[self.DecCol]) Tvisit = 30. fake_obs = None for val in self.info_season: cadence = val['cadence'] mjd_min = val['MJD_min'] mjd_max = val['MJD_max'] season_length = val['season_length'] Nvisits = val['Nvisits'] m5 = val['m5'] # build the configuration file config_fake = {} config_fake['Ra'] = fieldRA config_fake['Dec'] = fieldDec config_fake['bands'] = [band] config_fake['Cadence'] = [cadence] config_fake['MJD_min'] = [mjd_min] config_fake['season_length'] = season_length config_fake['Nvisits'] = [Nvisits] m5_nocoadd = m5-1.25*np.log10(float(Nvisits)*Tvisit/30.) config_fake['m5'] = [m5_nocoadd] config_fake['seasons'] = [val['season']] config_fake['Exposure_Time'] = [30.] config_fake['shift_days'] = 0. fake_obs_season = GenerateFakeObservations( config_fake).Observations if fake_obs is None: fake_obs = fake_obs_season else: fake_obs = np.concatenate((fake_obs, fake_obs_season)) return fake_obs
[docs] def plot(self, snr_obs, snr_fakes): """ Plot SNR vs time Parameters ----- snr_obs : array array estimated using snr_slice(observations) snr_obs : array array estimated using snr_slice(fakes) """ fig, ax = plt.subplots(figsize=(10, 7)) title = 'season {} - {} band - z={}'.format( self.season, self.band, self.z) fig.suptitle(title) ax.plot(snr_obs['MJD'], snr_obs['SNR_{}'.format( self.names_ref[0])], label='Simulation') ax.plot(snr_fakes['MJD'], snr_fakes['SNR_{}'.format( self.names_ref[0])], ls='--', label='Fakes')
[docs] def PlotHistory(self, fluxes, mjd, flag, snr, T0_lc, dates): """ Plot history of Plot For each MJD, fluxes and snr are plotted Each plot may be saved as a png to make a video afterwards Parameters ------ fluxes : list(float) LC fluxes mjd : list(float) mjds of the fluxes flag : array flag for selection of fluxes snr : list signal-to-noise ratio T0_lc : list(float) list of T0 supernovae dates : list(float) date of the display (mjd) """ dir_save = '/home/philippe/LSST/sn_metric_new/Plots' import pylab as plt plt.ion() fig, ax = plt.subplots(ncols=1, nrows=2) fig.canvas.draw() colors = ['b', 'r'] myls = ['-', '--'] mfc = ['b', 'None'] tot_label = [] fontsize = 12 mjd_ma = np.ma.array(mjd, mask=~flag) fluxes_ma = {} for key, val in fluxes.items(): fluxes_ma[key] = np.ma.array(val, mask=~flag) key = list(fluxes.keys())[0] jmax = len(fluxes_ma[key]) tot_label = [] tot_label_snr = [] min_flux = [] max_flux = [] for j in range(jmax): for ib, name in enumerate(fluxes_ma.keys()): tot_label.append(ax[0].errorbar( mjd_ma[j], fluxes_ma[name][j], marker='s', color=colors[ib], ls=myls[ib], label=name)) tot_label_snr.append(ax[1].errorbar( snr['MJD'][:j], snr['SNR_'+name][:j], color=colors[ib], label=name)) fluxx = fluxes_ma[name][j] fluxx = fluxx[~fluxx.mask] if len(fluxx) >= 2: min_flux.append(np.min(fluxx)) max_flux.append(np.max(fluxx)) else: min_flux.append(0.) max_flux.append(200.) min_fluxes = np.min(min_flux) max_fluxes = np.max(max_flux) tot_label.append(ax[0].errorbar([T0_lc[j], T0_lc[j]], [ min_fluxes, max_fluxes], color='k', label='DayMax')) tot_label.append(ax[0].errorbar([dates[j], dates[j]], [ min_fluxes, max_fluxes], color='k', ls='--', label='Current MJD')) fig.canvas.flush_events() # plt.savefig('{}/{}_{}.png'.format(dir_save, 'snr', 1000 + j)) if j != jmax-1: ax[0].clear() tot_label = [] tot_label_snr = [] labs = [l.get_label() for l in tot_label] ax[0].legend(tot_label, labs, ncol=1, loc='best', prop={'size': fontsize}, frameon=False) ax[0].set_ylabel('Flux [e.sec$^{-1}$]', fontsize=fontsize) ax[1].set_xlabel('MJD', fontsize=fontsize) ax[1].set_ylabel('SNR', fontsize=fontsize) ax[1].legend() labs = [l.get_label() for l in tot_label_snr] ax[1].legend(tot_label_snr, labs, ncol=1, loc='best', prop={'size': fontsize}, frameon=False) for i in range(2): ax[i].tick_params(axis='x', labelsize=fontsize) ax[i].tick_params(axis='y', labelsize=fontsize)
[docs] def detection_rate(self, snr_obs, snr_fakes): """ Estimate the time fraction(per season) for which snr_obs > snr_fakes = detection rate For regular cadences one should get a result close to 1 Parameters ------- snr_obs : array array estimated using snr_slice(observations) snr_fakes: array array estimated using snr_slice(fakes) Returns ----- record array with the following fields: fieldRA (float) fieldDec (float) season (float) band (str) frac_obs_name_ref (float) """ ra = np.mean(snr_obs['fieldRA']) dec = np.mean(snr_obs['fieldDec']) band = np.unique(snr_obs['band'])[0] rtot = [] for season in np.unique(snr_obs['season']): idx = snr_obs['season'] == season sel_obs = snr_obs[idx] idxb = snr_fakes['season'] == season sel_fakes = snr_fakes[idxb] sel_obs.sort(order='MJD') sel_fakes.sort(order='MJD') r = [ra, dec, season, band] names = [self.RaCol, self.DecCol, 'season', 'band'] for sim in self.names_ref: fakes = interpolate.interp1d( sel_fakes['MJD'], sel_fakes['SNR_'+sim]) obs = interpolate.interp1d(sel_obs['MJD'], sel_obs['SNR_'+sim]) mjd_min = np.max( [np.min(sel_obs['MJD']), np.min(sel_fakes['MJD'])]) mjd_max = np.min( [np.max(sel_obs['MJD']), np.max(sel_fakes['MJD'])]) mjd = np.arange(mjd_min, mjd_max, 1.) diff_res = obs(mjd)-fakes(mjd) idx = diff_res >= 0 r += [len(diff_res[idx])/len(diff_res)] names += ['frac_obs_'+sim] rtot.append(tuple(r)) return np.rec.fromrecords(rtot, names=names)
[docs] def check_seasons(self, tab): """ Check wether seasons have no overlap if it is the case: modify MJD_min and season length of the corresponding season return only seasons with season_length > 30 days Parameters -------------- tab : array with the following fields: Returns --------- tab : array with the following fields: """ if tab is None or len(tab) == 1: return tab if len(tab) > 1: diff = tab['MJD_min'][1:]-tab['MJD_max'][:-1] idb = np.argwhere(diff < 20.) if len(idb) >= 1: tab['MJD_min'][idb+1] = tab['MJD_max'][idb]+20. tab['season_length'][idb+1] = tab['MJD_max'][idb+1] - \ tab['MJD_min'][idb+1] return tab[tab['season_length'] > 30.]