import os
import numpy as np
import pandas as pd
from .chebyshevUtils import chebeval
__all__ = ['ChebyValues']
[docs]class ChebyValues(object):
"""Calculates positions, velocities, deltas, vmags and elongations,
given a series of coefficients generated by ChebyFits.
"""
def __init__(self):
self.coeffs = {}
self.coeffKeys = ['objId', 'tStart', 'tEnd', 'ra', 'dec', 'geo_dist', 'vmag', 'elongation']
self.ephemerisKeys = ['ra', 'dradt', 'dec', 'ddecdt', 'geo_dist', 'vmag', 'elongation']
[docs] def setCoefficients(self, chebyFits):
"""Set coefficients using a ChebyFits object.
(which contains a dictionary of objId, tStart, tEnd, ra, dec, delta, vmag, and elongation lists).
Parameters
----------
chebyFits : chebyFits
ChebyFits object, with attribute 'coeffs' - a dictionary of lists of coefficients.
"""
self.coeffs = chebyFits.coeffs
# Convert list of coefficients into numpy arrays.
for k in self.coeffs:
self.coeffs[k] = np.array(self.coeffs[k])
# Check that expected values were received.
missing_keys = set(self.coeffKeys) - set(self.coeffs)
if len(missing_keys) > 0:
raise ValueError("Expected to find key(s) %s in coefficients." % ' '.join(list[missing_keys]))
self.coeffs['meanRA'] = self.coeffs['ra'].swapaxes(0, 1)[0]
self.coeffs['meanDec'] = self.coeffs['dec'].swapaxes(0, 1)[0]
[docs] def readCoefficients(self, chebyFitsFile):
"""Read coefficients from output file written by ChebyFits.
Parameters
----------
chebyFitsFile : str
The filename of the coefficients file.
"""
if not os.path.isfile(chebyFitsFile):
raise IOError('Could not find chebyFitsFile at %s' % (chebyFitsFile))
# Read the coefficients file.
coeffs = pd.read_table(chebyFitsFile, delim_whitespace=True)
# The header line provides information on the number of coefficients for each parameter.
datacols = coeffs.columns.values
cols = {}
coeff_cols = ['ra', 'dec', 'geo_dist', 'vmag', 'elongation']
for k in coeff_cols:
cols[k] = [x for x in datacols if x.startswith(k)]
# Translate dataframe to dictionary of numpy arrays
# while consolidating RA/Dec/Delta/Vmag/Elongation coeffs.
self.coeffs['objId'] = coeffs.objId.values
self.coeffs['tStart'] = coeffs.tStart.values
self.coeffs['tEnd'] = coeffs.tEnd.values
for k in coeff_cols:
self.coeffs[k] = np.empty([len(cols[k]), len(coeffs)], float)
for i in range(len(cols[k])):
self.coeffs[k][i] = coeffs['%s_%d' % (k, i)].values
# Add the mean RA and Dec columns (before swapping the coefficients axes).
self.coeffs['meanRA'] = self.coeffs['ra'][0]
self.coeffs['meanDec'] = self.coeffs['dec'][0]
# Swap the coefficient axes so that they are [segment, coeff].
for k in coeff_cols:
self.coeffs[k] = self.coeffs[k].swapaxes(0, 1)
def _evalSegment(self, segmentIdx, times, subsetSegments=None, mask=True):
"""Evaluate the ra/dec/delta/vmag/elongation values for a given segment at a series of times.
Parameters
----------
segmentIdx : int
The index in (each of) self.coeffs for the segment.
e.g. the first segment, for each object.
times : np.ndarray
The times at which to evaluate the segment.
subsetSegments : numpy.ndarray, optional
Optionally specify a subset of the total segment indexes.
This lets you pick out particular objIds.
mask : bool, optional
If True, returns NaNs for values outside the range of times in the segment.
If False, extrapolates segment for times outside the segment time range.
Returns
-------
dict
Dictionary of RA, Dec, delta, vmag, and elongation values for the segment indicated,
at the time indicated.
"""
if subsetSegments is None:
subsetSegments = np.ones(len(self.coeffs['objId']), dtype=bool)
tStart = self.coeffs['tStart'][subsetSegments][segmentIdx]
tEnd = self.coeffs['tEnd'][subsetSegments][segmentIdx]
tScaled = times - tStart
tInterval = np.array([tStart, tEnd]) - tStart
# Evaluate RA/Dec/Delta/Vmag/elongation.
ephemeris = {}
ephemeris['ra'], ephemeris['dradt'] = chebeval(tScaled,
self.coeffs['ra'][subsetSegments][segmentIdx],
interval=tInterval, doVelocity=True, mask=mask)
ephemeris['dec'], ephemeris['ddecdt'] = chebeval(tScaled,
self.coeffs['dec'][subsetSegments][segmentIdx],
interval=tInterval, doVelocity=True, mask=mask)
ephemeris['dradt'] = ephemeris['dradt'] * np.cos(np.radians(ephemeris['dec']))
for k in ('geo_dist', 'vmag', 'elongation'):
ephemeris[k], _ = chebeval(tScaled, self.coeffs[k][subsetSegments][segmentIdx],
interval=tInterval, doVelocity=False, mask=mask)
return ephemeris
[docs] def getEphemerides(self, times, objIds=None, extrapolate=False):
"""Find the ephemeris information for 'objIds' at 'time'.
Implicit in how this is currently written is that the segments are all expected to cover the
same start/end time range across all objects.
They do not have to have the same segment length for all objects.
Parameters
----------
times : float or np.ndarray
The time to calculate ephemeris positions.
objIds : numpy.ndarray, optional
The object ids for which to generate ephemerides. If None, then just uses all objects.
extrapolate : bool
If True, extrapolate beyond ends of segments if time outside of segment range.
If False, return ValueError if time is beyond range of segments.
Returns
-------
numpy.ndarray
The ephemeris positions for all objects.
Note that these may not be sorted in the same order as objIds.
"""
if isinstance(times, float) or isinstance(times, int):
times = np.array([times], float)
ntimes = len(times)
ephemerides = {}
# Find subset of segments which match objId, if specified.
if objIds is None:
objMatch = np.ones(len(self.coeffs['objId']), dtype=bool)
ephemerides['objId'] = np.unique(self.coeffs['objId'])
else:
if isinstance(objIds, str) or isinstance(objIds, int):
objIds = np.array([objIds])
objMatch = np.in1d(self.coeffs['objId'], objIds)
ephemerides['objId'] = objIds
# Now find ephemeris values.
ephemerides['time'] = np.zeros((len(ephemerides['objId']), ntimes), float) + times
for k in self.ephemerisKeys:
ephemerides[k] = np.zeros((len(ephemerides['objId']), ntimes), float)
for it, t in enumerate(times):
# Find subset of segments which contain the appropriate time.
# Look for simplest subset first.
segments = np.where((self.coeffs['tStart'][objMatch] <= t) &
(self.coeffs['tEnd'][objMatch] > t))[0]
if len(segments) == 0:
segStart = self.coeffs['tStart'][objMatch].min()
segEnd = self.coeffs['tEnd'][objMatch].max()
if (segStart > t or segEnd < t):
if not extrapolate:
for k in self.ephemerisKeys:
ephemerides[k][:,it] = np.nan
else:
# Find the segments to use to extrapolate the times.
if segStart > t:
segments = np.where(self.coeffs['tStart'][objMatch] == segStart)[0]
if segEnd < t:
segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0]
elif segEnd == t:
# Not extrapolating, but outside the simple match case above.
segments = np.where(self.coeffs['tEnd'][objMatch] == segEnd)[0]
for i, segmentIdx in enumerate(segments):
ephemeris = self._evalSegment(segmentIdx, t, objMatch, mask=False)
for k in self.ephemerisKeys:
ephemerides[k][i][it] = ephemeris[k]
ephemerides['objId'][i] = self.coeffs['objId'][objMatch][segmentIdx]
if objIds is not None:
if set(ephemerides['objId']) != set(objIds):
raise ValueError('Did not find expected match between objIds provided and ephemeride objIds.')
return ephemerides