import numpy as np
import matplotlib.pylab as plt
import numpy.lib.recfunctions as rf
from scipy import interpolate
import lsst.sims.maf.metrics as metrics
from lsst.sims.maf.utils.snUtils import GenerateFakeObservations
from import Iterable
import time

__all__ = ['SNSNRMetric']

[docs]class SNSNRMetric(metrics.BaseMetric): """ Metric to estimate the detection rate for faint supernovae (x1,color) = (-2.0,0.2) Parameters ---------- list : str, opt Name of the columns used to estimate the metric Default : 'observationStartMJD', 'fieldRA', 'fieldDec','filter','fiveSigmaDepth', 'visitExposureTime','night','observationId', 'numExposures','visitTime' coadd : bool, opt to make "coaddition" per night (uses snStacker) Default : True lim_sn : class, opt Reference data used to simulate LC points (interpolation) names_ref : str,opt names of the simulator used to produce reference data season : flota,opt season num Default : 1. z : float,opt redshift for this study Default : 0.01 """ def __init__(self, metricName='SNSNRMetric', mjdCol='observationStartMJD', RaCol='fieldRA', DecCol='fieldDec', filterCol='filter', m5Col='fiveSigmaDepth', exptimeCol='visitExposureTime', nightCol='night', obsidCol='observationId', nexpCol='numExposures', vistimeCol='visitTime', coadd=True, lim_sn=None, names_ref=None, season=1, z=0.01, **kwargs): self.mjdCol = mjdCol self.m5Col = m5Col self.filterCol = filterCol self.RaCol = RaCol self.DecCol = DecCol self.exptimeCol = exptimeCol self.seasonCol = 'season' self.nightCol = nightCol self.obsidCol = obsidCol self.nexpCol = nexpCol self.vistimeCol = vistimeCol cols = [self.nightCol, self.m5Col, self.filterCol, self.mjdCol, self.obsidCol, self.nexpCol, self.vistimeCol, self.exptimeCol, self.seasonCol] if coadd: cols += ['coadd'] super(SNSNRMetric, self).__init__( col=cols, metricName=metricName, **kwargs) self.filterNames = np.array(['u', 'g', 'r', 'i', 'z', 'y']) self.blue_cutoff = 300. self.red_cutoff = 800. self.min_rf_phase = -20. self.max_rf_phase = 40. self.z = z self.names_ref = names_ref self.season = season # SN DayMax: current date - shift days self.shift = 10. # These are reference LC self.lim_sn = lim_sn self.display = False
[docs] def run(self, dataSlice, slicePoint=None): """ run the metric Parameters ---------- dataSlice : array simulation data under study Returns ------- detection rate : float """ time_ref = time.time() goodFilters = np.in1d(dataSlice['filter'], self.filterNames) dataSlice = dataSlice[goodFilters] if dataSlice.size == 0: return None dataSlice.sort(order=self.mjdCol) if self.season != -1: seasons = self.season else: seasons = np.unique(dataSlice['season']) if not isinstance(seasons, Iterable): seasons = [seasons] self.info_season = None for seas in seasons: info = self.season_info(dataSlice, seas) if info is not None and info['season_length'] >= self.shift: if self.info_season is None: self.info_season = info else: self.info_season = np.concatenate((self.info_season, info)) self.info_season = self.check_seasons(self.info_season) if self.info_season is None: return 0. sel = dataSlice[np.in1d(dataSlice['season'], np.array(seasons))] detect_frac = None if len(sel) >= 5: detect_frac = self.process(sel) if detect_frac is not None: return np.median(detect_frac['frac_obs_{}'.format(self.names_ref[0])]) else: return 0.
[docs] def process(self, sel): """Process one season Parameters --------- sel : array array of observations season : int season number Returns ----- record array with the following fields: fieldRA (float) fieldDec (float) season (float) band (str) frac_obs_name_ref (float) """ = np.unique(sel[self.filterCol])[0] time_ref = time.time() snr_obs = self.snr_slice(sel) # SNR for observations snr_fakes = self.snr_fakes(sel) # SNR for fakes detect_frac = self.detection_rate( snr_obs, snr_fakes) # Detection rate snr_obs = np.asarray(snr_obs) snr_fakes = np.asarray(snr_fakes) #self.plot(snr_obs, snr_fakes) # detect_frac = np.asarray(detect_frac) return detect_frac
[docs] def snr_slice(self, dataSlice, j=-1, output_q=None): """ Estimate SNR for a given dataSlice Parameters --------- Input: dataSlice Returns ----- array with the following fields (all are of f8 type, except band which is of U1) SNR_name_ref: Signal-To-Noise Ratio estimator season : season cadence: cadence of the season season_length: length of the season MJD_min: min MJD of the season DayMax: SN max luminosity MJD (aka T0) MJD: m5_eff: mean m5 of obs passing the min_phase, max_phase cut fieldRA: mean field RA fieldDec: mean field Dec band: band m5: mean m5 (over the season) Nvisits: median number of visits (per observation) (over the season) ExposureTime: median exposure time (per observation) (over the season) """ # Get few infos: RA, Dec, Nvisits, m5, exptime fieldRA = np.mean(dataSlice[self.RaCol]) fieldDec = np.mean(dataSlice[self.DecCol]) # one visit = 2 exposures Nvisits = np.median(dataSlice[self.nexpCol]/2.) m5 = np.mean(dataSlice[self.m5Col]) exptime = np.median(dataSlice[self.exptimeCol]) dataSlice.sort(order=self.mjdCol) mjds = dataSlice[self.mjdCol] band = np.unique(dataSlice[self.filterCol])[0] # Define MJDs to consider for metric estimation # basically: step of one day between MJDmin and MJDmax dates = None for val in self.info_season: if dates is None: dates = np.arange( val['MJD_min']+self.shift, val['MJD_max']+1., 1.) else: dates = np.concatenate( (dates, np.arange(val['MJD_min']+self.shift, val['MJD_max']+1., 1.))) # SN DayMax: dates-shift where shift is chosen in the input yaml file T0_lc = dates-self.shift # for these DayMax, estimate the phases of LC points corresponding to the current dataSlice MJDs time_for_lc = -T0_lc[:, None]+mjds phase = time_for_lc/(1.+self.z) # phases of LC points # flag: select LC points only in between min_rf_phase and max_phase phase_max = self.shift/(1.+self.z) flag = (phase >= self.min_rf_phase) & (phase <= phase_max) # tile m5, MJDs, and seasons to estimate all fluxes and SNR at once m5_vals = np.tile(dataSlice[self.m5Col], (len(time_for_lc), 1)) season_vals = np.tile(dataSlice[self.seasonCol], (len(time_for_lc), 1)) # estimate fluxes and snr in SNR function fluxes_tot, snr = self.snr( time_for_lc, m5_vals, flag, season_vals, T0_lc) # now save the results in a record array _, idx = np.unique(snr['season'], return_inverse=True) infos = self.info_season[idx] vars_info = ['cadence', 'season_length', 'MJD_min'] snr = rf.append_fields( snr, vars_info, [infos[name] for name in vars_info]) snr = rf.append_fields(snr, 'DayMax', T0_lc) snr = rf.append_fields(snr, 'MJD', dates) snr = rf.append_fields(snr, 'm5_eff', np.mean(, mask=~flag), axis=1)) global_info = [(fieldRA, fieldDec, band, m5, Nvisits, exptime)]*len(snr) names = ['fieldRA', 'fieldDec', 'band', 'm5', 'Nvisits', 'ExposureTime'] global_info = np.rec.fromrecords(global_info, names=names) snr = rf.append_fields( snr, names, [global_info[name] for name in names]) if output_q is not None: output_q.put({j: snr}) else: return snr
[docs] def season_info(self, dataSlice, season): """ Get info on seasons for each dataSlice Parameters ----- dataSlice : array array of observations Returns ----- recordarray with the following fields: season, cadence, season_length, MJDmin, MJDmax """ rv = [] idx = (dataSlice[self.seasonCol] == season) slice_sel = dataSlice[idx] if len(slice_sel) < 5: return None slice_sel.sort(order=self.mjdCol) mjds_season = slice_sel[self.mjdCol] cadence = np.mean(mjds_season[1:]-mjds_season[:-1]) mjd_min = np.min(mjds_season) mjd_max = np.max(mjds_season) season_length = mjd_max-mjd_min Nvisits = np.median(slice_sel[self.nexpCol]) m5 = np.median(slice_sel[self.m5Col]) rv.append((float(season), cadence, season_length, mjd_min, mjd_max, Nvisits, m5)) info_season = np.rec.fromrecords( rv, names=['season', 'cadence', 'season_length', 'MJD_min', 'MJD_max', 'Nvisits', 'm5']) return info_season
[docs] def snr(self, time_lc, m5_vals, flag, season_vals, T0_lc): """ Estimate SNR vs time Parameters ----------- time_lc : m5_vals : list(float) five-sigme depth values flag : array(bool) flag to be applied (example: selection from phase cut) season_vals : array(float) season values T0_lc : array(float) array of T0 for supernovae Returns ----- fluxes_tot : list(float) list of (interpolated) fluxes snr_tab : array with the following fields: snr_name_ref (float) : Signal-to-Noise values season (float) : season num. """ seasons =, mask=~flag) fluxes_tot = {} snr_tab = None for ib, name in enumerate(self.names_ref): fluxes = self.lim_sn.fluxes[ib](time_lc) if name not in fluxes_tot.keys(): fluxes_tot[name] = fluxes else: fluxes_tot[name] = np.concatenate((fluxes_tot[name], fluxes)) flux_5sigma = self.lim_sn.mag_to_flux[ib](m5_vals) snr = fluxes**2/flux_5sigma**2 snr_season = 5.*np.sqrt(np.sum(snr*flag, axis=1)) if snr_tab is None: snr_tab = np.asarray(np.copy(snr_season), dtype=[ ('SNR_'+name, 'f8')]) else: snr_tab = rf.append_fields( snr_tab, 'SNR_'+name, np.copy(snr_season)) """ snr_tab = rf.append_fields( snr_tab, 'season', np.mean(seasons, axis=1)) """ snr_tab = rf.append_fields( snr_tab, 'season', self.get_season(T0_lc)) # check if any masked value remaining # this would correspond to case where no obs point has been selected # ie no points with phase in [phase_min,phase_max] # this happens when internight gaps are large (typically larger than shift) idmask = np.where(snr_tab.mask) if len(idmask) > 0: tofill = np.copy(snr_tab['season']) season_recover = self.get_season( T0_lc[np.where(snr_tab.mask)]) tofill[idmask] = season_recover snr_tab =, fill_value=tofill) return fluxes_tot, snr_tab
[docs] def get_season(self, T0): """ Estimate the seasons corresponding to T0 values Parameters ------- T0 : list(float) set of T0 values Returns ----- list (float) of corresponding seasons """ diff_min = T0[:, None]-self.info_season['MJD_min'] diff_max = -T0[:, None]+self.info_season['MJD_max'] seasons = np.tile(self.info_season['season'], (len(diff_min), 1)) flag = (diff_min >= 0) & (diff_max >= 0) seasons =, mask=~flag) return np.mean(seasons, axis=1)
[docs] def snr_fakes(self, dataSlice): """ Estimate SNR for fake observations in the same way as for observations (using SNR_Season) Parameters: ------- dataSlice : array array of observations Returns ----- snr_tab : array with the following fields: snr_name_ref (float) : Signal-to-Noise values season (float) : season num. """ # generate fake observations fake_obs = None # idx = (dataSlice[self.seasonCol] == season) band = np.unique(dataSlice[self.filterCol])[0] fake_obs = self.gen_fakes(dataSlice, band) # estimate SNR vs MJD snr_fakes = self.snr_slice( fake_obs[fake_obs['filter'] == band]) return snr_fakes
[docs] def gen_fakes(self, slice_sel, band): """ Generate fake observations according to observing values extracted from simulations Parameters ----- slice_sel : array array of observations band : str band to consider Returns ----- fake_obs_season : array array of observations with the following fields observationStartMJD (float) fieldRA (float) fieldDec (float) filter (U1) fiveSigmaDepth (float) numExposures (float) visitExposureTime (float) season (int) """ fieldRA = np.mean(slice_sel[self.RaCol]) fieldDec = np.mean(slice_sel[self.DecCol]) Tvisit = 30. fake_obs = None for val in self.info_season: cadence = val['cadence'] mjd_min = val['MJD_min'] mjd_max = val['MJD_max'] season_length = val['season_length'] Nvisits = val['Nvisits'] m5 = val['m5'] # build the configuration file config_fake = {} config_fake['Ra'] = fieldRA config_fake['Dec'] = fieldDec config_fake['bands'] = [band] config_fake['Cadence'] = [cadence] config_fake['MJD_min'] = [mjd_min] config_fake['season_length'] = season_length config_fake['Nvisits'] = [Nvisits] m5_nocoadd = m5-1.25*np.log10(float(Nvisits)*Tvisit/30.) config_fake['m5'] = [m5_nocoadd] config_fake['seasons'] = [val['season']] config_fake['Exposure_Time'] = [30.] config_fake['shift_days'] = 0. fake_obs_season = GenerateFakeObservations( config_fake).Observations if fake_obs is None: fake_obs = fake_obs_season else: fake_obs = np.concatenate((fake_obs, fake_obs_season)) return fake_obs
[docs] def plot(self, snr_obs, snr_fakes): """ Plot SNR vs time Parameters ----- snr_obs : array array estimated using snr_slice(observations) snr_obs : array array estimated using snr_slice(fakes) """ fig, ax = plt.subplots(figsize=(10, 7)) title = 'season {} - {} band - z={}'.format( self.season,, self.z) fig.suptitle(title) ax.plot(snr_obs['MJD'], snr_obs['SNR_{}'.format( self.names_ref[0])], label='Simulation') ax.plot(snr_fakes['MJD'], snr_fakes['SNR_{}'.format( self.names_ref[0])], ls='--', label='Fakes')
[docs] def PlotHistory(self, fluxes, mjd, flag, snr, T0_lc, dates): """ Plot history of Plot For each MJD, fluxes and snr are plotted Each plot may be saved as a png to make a video afterwards Parameters ------ fluxes : list(float) LC fluxes mjd : list(float) mjds of the fluxes flag : array flag for selection of fluxes snr : list signal-to-noise ratio T0_lc : list(float) list of T0 supernovae dates : list(float) date of the display (mjd) """ dir_save = '/home/philippe/LSST/sn_metric_new/Plots' import pylab as plt plt.ion() fig, ax = plt.subplots(ncols=1, nrows=2) fig.canvas.draw() colors = ['b', 'r'] myls = ['-', '--'] mfc = ['b', 'None'] tot_label = [] fontsize = 12 mjd_ma =, mask=~flag) fluxes_ma = {} for key, val in fluxes.items(): fluxes_ma[key] =, mask=~flag) key = list(fluxes.keys())[0] jmax = len(fluxes_ma[key]) tot_label = [] tot_label_snr = [] min_flux = [] max_flux = [] for j in range(jmax): for ib, name in enumerate(fluxes_ma.keys()): tot_label.append(ax[0].errorbar( mjd_ma[j], fluxes_ma[name][j], marker='s', color=colors[ib], ls=myls[ib], label=name)) tot_label_snr.append(ax[1].errorbar( snr['MJD'][:j], snr['SNR_'+name][:j], color=colors[ib], label=name)) fluxx = fluxes_ma[name][j] fluxx = fluxx[~fluxx.mask] if len(fluxx) >= 2: min_flux.append(np.min(fluxx)) max_flux.append(np.max(fluxx)) else: min_flux.append(0.) max_flux.append(200.) min_fluxes = np.min(min_flux) max_fluxes = np.max(max_flux) tot_label.append(ax[0].errorbar([T0_lc[j], T0_lc[j]], [ min_fluxes, max_fluxes], color='k', label='DayMax')) tot_label.append(ax[0].errorbar([dates[j], dates[j]], [ min_fluxes, max_fluxes], color='k', ls='--', label='Current MJD')) fig.canvas.flush_events() # plt.savefig('{}/{}_{}.png'.format(dir_save, 'snr', 1000 + j)) if j != jmax-1: ax[0].clear() tot_label = [] tot_label_snr = [] labs = [l.get_label() for l in tot_label] ax[0].legend(tot_label, labs, ncol=1, loc='best', prop={'size': fontsize}, frameon=False) ax[0].set_ylabel('Flux [e.sec$^{-1}$]', fontsize=fontsize) ax[1].set_xlabel('MJD', fontsize=fontsize) ax[1].set_ylabel('SNR', fontsize=fontsize) ax[1].legend() labs = [l.get_label() for l in tot_label_snr] ax[1].legend(tot_label_snr, labs, ncol=1, loc='best', prop={'size': fontsize}, frameon=False) for i in range(2): ax[i].tick_params(axis='x', labelsize=fontsize) ax[i].tick_params(axis='y', labelsize=fontsize)
[docs] def detection_rate(self, snr_obs, snr_fakes): """ Estimate the time fraction(per season) for which snr_obs > snr_fakes = detection rate For regular cadences one should get a result close to 1 Parameters ------- snr_obs : array array estimated using snr_slice(observations) snr_fakes: array array estimated using snr_slice(fakes) Returns ----- record array with the following fields: fieldRA (float) fieldDec (float) season (float) band (str) frac_obs_name_ref (float) """ ra = np.mean(snr_obs['fieldRA']) dec = np.mean(snr_obs['fieldDec']) band = np.unique(snr_obs['band'])[0] rtot = [] for season in np.unique(snr_obs['season']): idx = snr_obs['season'] == season sel_obs = snr_obs[idx] idxb = snr_fakes['season'] == season sel_fakes = snr_fakes[idxb] sel_obs.sort(order='MJD') sel_fakes.sort(order='MJD') r = [ra, dec, season, band] names = [self.RaCol, self.DecCol, 'season', 'band'] for sim in self.names_ref: fakes = interpolate.interp1d( sel_fakes['MJD'], sel_fakes['SNR_'+sim]) obs = interpolate.interp1d(sel_obs['MJD'], sel_obs['SNR_'+sim]) mjd_min = np.max( [np.min(sel_obs['MJD']), np.min(sel_fakes['MJD'])]) mjd_max = np.min( [np.max(sel_obs['MJD']), np.max(sel_fakes['MJD'])]) mjd = np.arange(mjd_min, mjd_max, 1.) diff_res = obs(mjd)-fakes(mjd) idx = diff_res >= 0 r += [len(diff_res[idx])/len(diff_res)] names += ['frac_obs_'+sim] rtot.append(tuple(r)) return np.rec.fromrecords(rtot, names=names)
[docs] def check_seasons(self, tab): """ Check wether seasons have no overlap if it is the case: modify MJD_min and season length of the corresponding season return only seasons with season_length > 30 days Parameters -------------- tab : array with the following fields: Returns --------- tab : array with the following fields: """ if tab is None or len(tab) == 1: return tab if len(tab) > 1: diff = tab['MJD_min'][1:]-tab['MJD_max'][:-1] idb = np.argwhere(diff < 20.) if len(idb) >= 1: tab['MJD_min'][idb+1] = tab['MJD_max'][idb]+20. tab['season_length'][idb+1] = tab['MJD_max'][idb+1] - \ tab['MJD_min'][idb+1] return tab[tab['season_length'] > 30.]