Source code for lsst.sims.movingObjects.chebyValues

import os
import numpy as np
import pandas as pd
from .chebyshevUtils import chebeval

__all__ = ['ChebyValues']


[docs]class ChebyValues(object): """Calculates positions, velocities, deltas, vmags and elongations, given a series of coefficients generated by ChebyFits. """ def __init__(self): self.coeffs = {} self.coeffKeys = ['objId', 'tStart', 'tEnd', 'ra', 'dec', 'geo_dist', 'vmag', 'elongation'] self.ephemerisKeys = ['ra', 'dradt', 'dec', 'ddecdt', 'geo_dist', 'vmag', 'elongation']
[docs] def setCoefficients(self, chebyFits): """Set coefficients using a ChebyFits object. (which contains a dictionary of objId, tStart, tEnd, ra, dec, delta, vmag, and elongation lists). Parameters ---------- chebyFits : chebyFits ChebyFits object, with attribute 'coeffs' - a dictionary of lists of coefficients. """ self.coeffs = chebyFits.coeffs # Convert list of coefficients into numpy arrays. for k in self.coeffs: self.coeffs[k] = np.array(self.coeffs[k]) # Check that expected values were received. missing_keys = set(self.coeffKeys) - set(self.coeffs) if len(missing_keys) > 0: raise ValueError("Expected to find key(s) %s in coefficients." % ' '.join(list[missing_keys])) self.coeffs['meanRA'] = self.coeffs['ra'].swapaxes(0, 1)[0] self.coeffs['meanDec'] = self.coeffs['dec'].swapaxes(0, 1)[0]
[docs] def readCoefficients(self, chebyFitsFile): """Read coefficients from output file written by ChebyFits. Parameters ---------- chebyFitsFile : str The filename of the coefficients file. """ if not os.path.isfile(chebyFitsFile): raise IOError('Could not find chebyFitsFile at %s' % (chebyFitsFile)) # Read the coefficients file. coeffs = pd.read_table(chebyFitsFile, delim_whitespace=True) # The header line provides information on the number of coefficients for each parameter. datacols = coeffs.columns.values cols = {} coeff_cols = ['ra', 'dec', 'geo_dist', 'vmag', 'elongation'] for k in coeff_cols: cols[k] = [x for x in datacols if x.startswith(k)] # Translate dataframe to dictionary of numpy arrays # while consolidating RA/Dec/Delta/Vmag/Elongation coeffs. self.coeffs['objId'] = coeffs.objId.values self.coeffs['tStart'] = coeffs.tStart.values self.coeffs['tEnd'] = coeffs.tEnd.values for k in coeff_cols: self.coeffs[k] = np.empty([len(cols[k]), len(coeffs)], float) for i in range(len(cols[k])): self.coeffs[k][i] = coeffs['%s_%d' % (k, i)].values # Add the mean RA and Dec columns (before swapping the coefficients axes). self.coeffs['meanRA'] = self.coeffs['ra'][0] self.coeffs['meanDec'] = self.coeffs['dec'][0] # Swap the coefficient axes so that they are [segment, coeff]. for k in coeff_cols: self.coeffs[k] = self.coeffs[k].swapaxes(0, 1)
def _evalSegment(self, segmentIdx, times, subsetSegments=None, mask=True): """Evaluate the ra/dec/delta/vmag/elongation values for a given segment at a series of times. Parameters ---------- segmentIdx : int The index in (each of) self.coeffs for the segment. e.g. the first segment, for each object. times : np.ndarray The times at which to evaluate the segment. subsetSegments : numpy.ndarray, optional Optionally specify a subset of the total segment indexes. This lets you pick out particular objIds. mask : bool, optional If True, returns NaNs for values outside the range of times in the segment. If False, extrapolates segment for times outside the segment time range. Returns ------- dict Dictionary of RA, Dec, delta, vmag, and elongation values for the segment indicated, at the time indicated. """ if subsetSegments is None: subsetSegments = np.ones(len(self.coeffs['objId']), dtype=bool) tStart = self.coeffs['tStart'][subsetSegments][segmentIdx] tEnd = self.coeffs['tEnd'][subsetSegments][segmentIdx] tScaled = times - tStart tInterval = np.array([tStart, tEnd]) - tStart # Evaluate RA/Dec/Delta/Vmag/elongation. ephemeris = {} ephemeris['ra'], ephemeris['dradt'] = chebeval(tScaled, self.coeffs['ra'][subsetSegments][segmentIdx], interval=tInterval, doVelocity=True, mask=mask) ephemeris['dec'], ephemeris['ddecdt'] = chebeval(tScaled, self.coeffs['dec'][subsetSegments][segmentIdx], interval=tInterval, doVelocity=True, mask=mask) ephemeris['dradt'] = ephemeris['dradt'] * np.cos(np.radians(ephemeris['dec'])) for k in ('geo_dist', 'vmag', 'elongation'): ephemeris[k], _ = chebeval(tScaled, self.coeffs[k][subsetSegments][segmentIdx], interval=tInterval, doVelocity=False, mask=mask) return ephemeris
[docs] def getEphemerides(self, times, objIds=None, extrapolate=False): """Find the ephemeris information for 'objIds' at 'time'. Implicit in how this is currently written is that the segments are all expected to cover the same start/end time range across all objects. They do not have to have the same segment length for all objects. Parameters ---------- times : float or np.ndarray The time to calculate ephemeris positions. objIds : numpy.ndarray, optional The object ids for which to generate ephemerides. If None, then just uses all objects. extrapolate : bool If True, extrapolate beyond ends of segments if time outside of segment range. If False, return ValueError if time is beyond range of segments. Returns ------- numpy.ndarray The ephemeris positions for all objects. Note that these may not be sorted in the same order as objIds. """ if isinstance(times, float) or isinstance(times, int): times = np.array([times], float) ntimes = len(times) ephemerides = {} # Find subset of segments which match objId, if specified. if objIds is None: objMatch = np.ones(len(self.coeffs['objId']), dtype=bool) ephemerides['objId'] = np.unique(self.coeffs['objId']) else: if isinstance(objIds, str) or isinstance(objIds, int): objIds = np.array([objIds]) objMatch = np.in1d(self.coeffs['objId'], objIds) ephemerides['objId'] = objIds # Now find ephemeris values. ephemerides['time'] = np.zeros((len(ephemerides['objId']), ntimes), float) + times for k in self.ephemerisKeys: ephemerides[k] = np.zeros((len(ephemerides['objId']), ntimes), float) for it, t in enumerate(times): # Find subset of segments which contain the appropriate time. # Look for simplest subset first. segments = np.where((self.coeffs['tStart'][objMatch] <= t) & (self.coeffs['tEnd'][objMatch] > t))[0] if len(segments) == 0: segStart = self.coeffs['tStart'][objMatch].min() segEnd = self.coeffs['tEnd'][objMatch].max() if (segStart > t or segEnd < t): if not extrapolate: for k in self.ephemerisKeys: ephemerides[k][:,it] = np.nan else: # Find the segments to use to extrapolate the times. if segStart > t: segments = np.where(self.coeffs['tStart'][objMatch] == segStart)[0] if segEnd < t: segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0] elif segEnd == t: # Not extrapolating, but outside the simple match case above. segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0] for i, segmentIdx in enumerate(segments): ephemeris = self._evalSegment(segmentIdx, t, objMatch, mask=False) for k in self.ephemerisKeys: ephemerides[k][i][it] = ephemeris[k] ephemerides['objId'][i] = self.coeffs['objId'][objMatch][segmentIdx] if objIds is not None: if set(ephemerides['objId']) != set(objIds): raise ValueError('Did not find expected match between objIds provided and ephemeride objIds.') return ephemerides