import os
import numpy as np
import pandas as pd
from .chebyshevUtils import chebeval
__all__ = ['ChebyValues']
[docs]class ChebyValues(object):
"""Calculates positions, velocities, deltas, vmags and elongations,
given a series of coefficients generated by ChebyFits.
def __init__(self):
self.coeffs = {}
self.coeffKeys = ['objId', 'tStart', 'tEnd', 'ra', 'dec', 'geo_dist', 'vmag', 'elongation']
self.ephemerisKeys = ['ra', 'dradt', 'dec', 'ddecdt', 'geo_dist', 'vmag', 'elongation']
[docs] def setCoefficients(self, chebyFits):
"""Set coefficients using a ChebyFits object.
(which contains a dictionary of objId, tStart, tEnd, ra, dec, delta, vmag, and elongation lists).
chebyFits : chebyFits
ChebyFits object, with attribute 'coeffs' - a dictionary of lists of coefficients.
self.coeffs = chebyFits.coeffs
# Convert list of coefficients into numpy arrays.
for k in self.coeffs:
self.coeffs[k] = np.array(self.coeffs[k])
# Check that expected values were received.
missing_keys = set(self.coeffKeys) - set(self.coeffs)
if len(missing_keys) > 0:
raise ValueError("Expected to find key(s) %s in coefficients." % ' '.join(list[missing_keys]))
self.coeffs['meanRA'] = self.coeffs['ra'].swapaxes(0, 1)[0]
self.coeffs['meanDec'] = self.coeffs['dec'].swapaxes(0, 1)[0]
[docs] def readCoefficients(self, chebyFitsFile):
"""Read coefficients from output file written by ChebyFits.
chebyFitsFile : str
The filename of the coefficients file.
if not os.path.isfile(chebyFitsFile):
raise IOError('Could not find chebyFitsFile at %s' % (chebyFitsFile))
# Read the coefficients file.
coeffs = pd.read_table(chebyFitsFile, delim_whitespace=True)
# The header line provides information on the number of coefficients for each parameter.
datacols = coeffs.columns.values
cols = {}
coeff_cols = ['ra', 'dec', 'geo_dist', 'vmag', 'elongation']
for k in coeff_cols:
cols[k] = [x for x in datacols if x.startswith(k)]
# Translate dataframe to dictionary of numpy arrays
# while consolidating RA/Dec/Delta/Vmag/Elongation coeffs.
self.coeffs['objId'] = coeffs.objId.values
self.coeffs['tStart'] = coeffs.tStart.values
self.coeffs['tEnd'] = coeffs.tEnd.values
for k in coeff_cols:
self.coeffs[k] = np.empty([len(cols[k]), len(coeffs)], float)
for i in range(len(cols[k])):
self.coeffs[k][i] = coeffs['%s_%d' % (k, i)].values
# Add the mean RA and Dec columns (before swapping the coefficients axes).
self.coeffs['meanRA'] = self.coeffs['ra'][0]
self.coeffs['meanDec'] = self.coeffs['dec'][0]
# Swap the coefficient axes so that they are [segment, coeff].
for k in coeff_cols:
self.coeffs[k] = self.coeffs[k].swapaxes(0, 1)
def _evalSegment(self, segmentIdx, times, subsetSegments=None, mask=True):
"""Evaluate the ra/dec/delta/vmag/elongation values for a given segment at a series of times.
segmentIdx : int
The index in (each of) self.coeffs for the segment.
e.g. the first segment, for each object.
times : np.ndarray
The times at which to evaluate the segment.
subsetSegments : numpy.ndarray, optional
Optionally specify a subset of the total segment indexes.
This lets you pick out particular objIds.
mask : bool, optional
If True, returns NaNs for values outside the range of times in the segment.
If False, extrapolates segment for times outside the segment time range.
Dictionary of RA, Dec, delta, vmag, and elongation values for the segment indicated,
at the time indicated.
if subsetSegments is None:
subsetSegments = np.ones(len(self.coeffs['objId']), dtype=bool)
tStart = self.coeffs['tStart'][subsetSegments][segmentIdx]
tEnd = self.coeffs['tEnd'][subsetSegments][segmentIdx]
tScaled = times - tStart
tInterval = np.array([tStart, tEnd]) - tStart
# Evaluate RA/Dec/Delta/Vmag/elongation.
ephemeris = {}
ephemeris['ra'], ephemeris['dradt'] = chebeval(tScaled,
interval=tInterval, doVelocity=True, mask=mask)
ephemeris['dec'], ephemeris['ddecdt'] = chebeval(tScaled,
interval=tInterval, doVelocity=True, mask=mask)
ephemeris['dradt'] = ephemeris['dradt'] * np.cos(np.radians(ephemeris['dec']))
for k in ('geo_dist', 'vmag', 'elongation'):
ephemeris[k], _ = chebeval(tScaled, self.coeffs[k][subsetSegments][segmentIdx],
interval=tInterval, doVelocity=False, mask=mask)
return ephemeris
[docs] def getEphemerides(self, times, objIds=None, extrapolate=False):
"""Find the ephemeris information for 'objIds' at 'time'.
Implicit in how this is currently written is that the segments are all expected to cover the
same start/end time range across all objects.
They do not have to have the same segment length for all objects.
times : float or np.ndarray
The time to calculate ephemeris positions.
objIds : numpy.ndarray, optional
The object ids for which to generate ephemerides. If None, then just uses all objects.
extrapolate : bool
If True, extrapolate beyond ends of segments if time outside of segment range.
If False, return ValueError if time is beyond range of segments.
The ephemeris positions for all objects.
Note that these may not be sorted in the same order as objIds.
if isinstance(times, float) or isinstance(times, int):
times = np.array([times], float)
ntimes = len(times)
ephemerides = {}
# Find subset of segments which match objId, if specified.
if objIds is None:
objMatch = np.ones(len(self.coeffs['objId']), dtype=bool)
ephemerides['objId'] = np.unique(self.coeffs['objId'])
if isinstance(objIds, str) or isinstance(objIds, int):
objIds = np.array([objIds])
objMatch = np.in1d(self.coeffs['objId'], objIds)
ephemerides['objId'] = objIds
# Now find ephemeris values.
ephemerides['time'] = np.zeros((len(ephemerides['objId']), ntimes), float) + times
for k in self.ephemerisKeys:
ephemerides[k] = np.zeros((len(ephemerides['objId']), ntimes), float)
for it, t in enumerate(times):
# Find subset of segments which contain the appropriate time.
# Look for simplest subset first.
segments = np.where((self.coeffs['tStart'][objMatch] <= t) &
(self.coeffs['tEnd'][objMatch] > t))[0]
if len(segments) == 0:
segStart = self.coeffs['tStart'][objMatch].min()
segEnd = self.coeffs['tEnd'][objMatch].max()
if (segStart > t or segEnd < t):
if not extrapolate:
for k in self.ephemerisKeys:
ephemerides[k][:,it] = np.nan
# Find the segments to use to extrapolate the times.
if segStart > t:
segments = np.where(self.coeffs['tStart'][objMatch] == segStart)[0]
if segEnd < t:
segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0]
elif segEnd == t:
# Not extrapolating, but outside the simple match case above.
segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0]
for i, segmentIdx in enumerate(segments):
ephemeris = self._evalSegment(segmentIdx, t, objMatch, mask=False)
for k in self.ephemerisKeys:
ephemerides[k][i][it] = ephemeris[k]
ephemerides['objId'][i] = self.coeffs['objId'][objMatch][segmentIdx]
if objIds is not None:
if set(ephemerides['objId']) != set(objIds):
raise ValueError('Did not find expected match between objIds provided and ephemeride objIds.')
return ephemerides