Source code for kbmod.trajectory_generator

import abc
import copy
import logging
import math
import numpy as np
import random

import astropy.units as u

from astropy.table import Table

from kbmod.configuration import SearchConfiguration
from kbmod.search import Trajectory


logger = logging.getLogger(__name__)


[docs]def create_trajectory_generator(config, work_unit=None, **kwargs): """Create a TrajectoryGenerator object given a dictionary of configuration options. The generator class is specified by the 'name' entry, which must exist and match the class name of one of the subclasses of ``TrajectoryGenerator``. Parameters ---------- config : `dict` or `SearchConfiguration` The dictionary of generator parameters. work_unit : `WorkUnit`, optional A WorkUnit to provide additional information about the data that can be used to derive parameters that depend on the input. Returns ------- gen : `TrajectoryGenerator` A TrajectoryGenerator object. Raises ------ Raises a ``KeyError`` if the name entry is missing or the correct parameters do not exist. """ # Check if we are dealing with a top level configuration. if type(config) is SearchConfiguration: if config["generator_config"] is None: raise ValueError("Missing generator_config parameter.") config = config["generator_config"] if "name" not in config: raise KeyError("The trajectory generator configuration must contain a name field.") name = config["name"] if name not in TrajectoryGenerator.generators: raise KeyError(f"Trajectory generator {name} is undefined.") logger.info(f"Creating trajectory generator of type {name}") # Add any keyword arguments to the params, overriding current values. params = copy.deepcopy(config) params.update(kwargs) logger.debug(str(params)) return TrajectoryGenerator.generators[name](**params, work_unit=work_unit)
[docs]class TrajectoryGenerator(abc.ABC): """A base class for defining search strategies by generating candidate trajectories for each pixel. Implementations of TrajectoryGenerator must: 1) override generate() to provide new samples, 2) cannot be infinite """ generators = {} # A mapping of class name to class object for subclasses. def __init__(self, work_unit=None, **kwargs): pass def __init_subclass__(cls, **kwargs): # Register all subclasses in a dictionary mapping class name to the # class object, so we can programmatically create objects from the names. super().__init_subclass__(**kwargs) cls.generators[cls.__name__] = cls def __enter__(self): self.initialize() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() if exc_type is None: return True def __next__(self, *args, **kwargs): return next(self.generate(*args, **kwargs)) def __iter__(self, *args, **kwargs): return self.generate()
[docs] def initialize(self, *args, **kwargs): """Performs any setup needed for this generator""" pass
[docs] def close(self, *args, **kwargs): """Performs any cleanup needed for this generator""" pass
[docs] @abc.abstractmethod def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` a ``Trajectory`` to test at each pixel. """ raise NotImplementedError()
[docs] def to_table(self): """Generate the trajectories and put them into an astropy table for analysis. Returns ------- results : `astropy.table.Table` The different trajectories to try. """ data_dict = {"x": [], "y": [], "vx": [], "vy": []} for trj in self: data_dict["x"].append(trj.x) data_dict["y"].append(trj.y) data_dict["vx"].append(trj.vx) data_dict["vy"].append(trj.vy) return Table(data_dict)
[docs]class SingleVelocitySearch(TrajectoryGenerator): """Search a single velocity from each pixel.""" def __init__(self, vx, vy, **kwargs): """Create a class SingleVelocitySearch. Parameters ---------- vx : `float` The velocity in x pixels (pixels per day). vy : `float` The velocity in y pixels (pixels per day). """ super().__init__(**kwargs) self.vx = vx self.vy = vy def __repr__(self): return f"SingleVelocitySearch: vx={self.vx}, vy={self.vy}" def __str__(self): return f"SingleVelocitySearch: vx={self.vx}, vy={self.vy}" def __len__(self): return 1
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` A ``Trajectory`` to test at each pixel. """ yield Trajectory(vx=self.vx, vy=self.vy)
[docs]class VelocityGridSearch(TrajectoryGenerator): """Search a grid defined by steps in velocity space.""" def __init__(self, vx_steps, min_vx, max_vx, vy_steps, min_vy, max_vy, **kwargs): """Create a class VelocityGridSearch. Parameters ---------- vx_steps : `int` The number of velocity steps in the x direction. min_vx : `float` The minimum velocity in the x-dimension (pixels per day). max_vx : `float` The maximum velocity in the x-dimension (pixels per day). vy_steps : `int` The number of velocity steps in the y direction. min_vy : `float` The minimum velocity in the y-dimension (pixels per day). max_vy : `float` The maximum velocity in the y-dimension (pixels per day). """ super().__init__(**kwargs) if vx_steps < 2 or vy_steps < 2: raise ValueError("VelocityGridSearch requires at least 2 steps in each dimension") if max_vx < min_vx or max_vy < min_vy: raise ValueError("Invalid VelocityGridSearch bounds.") self.vx_steps = vx_steps self.min_vx = min_vx self.max_vx = max_vx self.vx_stepsize = (self.max_vx - self.min_vx) / float(self.vx_steps - 1) self.vy_steps = vy_steps self.min_vy = min_vy self.max_vy = max_vy self.vy_stepsize = (self.max_vy - self.min_vy) / float(self.vy_steps - 1) def __repr__(self): return ( "VelocityGridSearch:" f" vx=[{self.min_vx}, {self.max_vx}], {self.vx_steps}" f" vy=[{self.min_vy}, {self.max_vy}], {self.vy_steps}" ) def __str__(self): return ( "VelocityGridSearch:\n" f" Vel X: [{self.min_vx}, {self.max_vx}] in {self.vx_steps} steps.\n" f" Vel Y: [{self.min_vy}, {self.max_vy}] in {self.vy_steps} steps." ) def __len__(self): return self.vy_steps * self.vx_steps
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` A ``Trajectory`` to test at each pixel. """ for vy_i in range(self.vy_steps): for vx_i in range(self.vx_steps): vx = self.min_vx + vx_i * self.vx_stepsize vy = self.min_vy + vy_i * self.vy_stepsize yield Trajectory(vx=vx, vy=vy)
[docs]class PencilSearch(TrajectoryGenerator): """A search in a small cone around a given velocity. The search varies the given velocity's angle and magnitude. The angle includes the range: original angle +/- max_ang_offset. The velocity magnitude includes the range: original magnitude +/- max_vel_offset Parameters ---------- vx : `float` The center velocity in the x dimension (in pixels per day) vy : `float` The center velocity in the y dimension (in pixels per day) max_ang_offset : `float` The maximum offset of a candidate trajectory from the original (in radians) ang_step : `float` The step size to explore for each angle (in radians) max_vel_offset : `float` The maximum offset of the velocity's magnitude from the original (in pixels per day) vel_step : `float` The step size to explore for each velocity magnitude (in pixels per day) """ def __init__( self, vx, vy, max_ang_offset=0.2618, ang_step=0.035, max_vel_offset=10.0, vel_step=0.5, **kwargs, ): super().__init__(**kwargs) if ang_step <= 0.0 or vel_step <= 0.0 or max_ang_offset <= 0.0 or max_vel_offset <= 0.0: raise ValueError("Invalid parameters. All ranges and step sizes must be > 0.0.") self.center_vx = vx self.center_vy = vy self.center_ang = np.arctan2(vy, vx) self.min_ang = self.center_ang - max_ang_offset self.max_ang = self.center_ang + max_ang_offset self.ang_step = ang_step self.ang_array = np.arange(self.min_ang, self.max_ang + 1e-8, self.ang_step) self.center_vel = np.sqrt(vx * vx + vy * vy) self.min_vel = np.max([self.center_vel - max_vel_offset, 0.0]) self.max_vel = self.center_vel + max_vel_offset self.vel_step = vel_step self.vel_array = np.arange(self.min_vel, self.max_vel + 1e-8, self.vel_step) def __repr__(self): return ( "PencilSearch:" f" v=[{self.min_vel}, {self.max_vel}), {self.vel_step}" f" a=[{self.min_ang}, {self.max_ang}), {self.ang_step}" ) def __str__(self): return ( "PencilSearch:\n" f" Vel: [{self.min_vel}, {self.max_vel}) in {self.vel_step} sized steps.\n" f" Ang: [{self.min_ang}, {self.max_ang}) in {self.ang_step} sized steps." ) def __len__(self): return len(self.ang_array) * len(self.vel_array)
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` A ``Trajectory`` to test at each pixel. """ for ang in self.ang_array: for vel in self.vel_array: vx = np.cos(ang) * vel vy = np.sin(ang) * vel yield Trajectory(vx=vx, vy=vy)
[docs]class KBMODV1Search(TrajectoryGenerator): """Search a grid defined by velocities and angles.""" def __init__(self, vel_steps, min_vel, max_vel, ang_steps, min_ang, max_ang, **kwargs): """Create a class KBMODV1Search. Parameters ---------- vel_steps : `int` The number of velocity steps. min_vel : `float` The minimum velocity magnitude (in pixels per day) max_vel : `float` The maximum velocity magnitude (in pixels per day) ang_steps : `int` The number of angle steps. min_ang : `float` The minimum angle (in radians) max_ang : `float` The maximum angle (in radians) """ super().__init__(**kwargs) if vel_steps < 1 or ang_steps < 1: raise ValueError("KBMODV1Search requires at least 1 step in each dimension") if max_vel < min_vel or max_ang < min_ang: raise ValueError("Invalid KBMODV1Search bounds.") self.vel_steps = vel_steps self.min_vel = min_vel self.max_vel = max_vel self.vel_stepsize = (self.max_vel - self.min_vel) / float(self.vel_steps) self.ang_steps = ang_steps self.min_ang = min_ang self.max_ang = max_ang self.ang_stepsize = (self.max_ang - self.min_ang) / float(self.ang_steps) def __repr__(self): return ( "KBMODV1Search:" f" v=[{self.min_vel}, {self.max_vel}), {self.vel_steps}" f" a=[{self.min_ang}, {self.max_ang}), {self.ang_steps}" ) def __str__(self): return ( "KBMODV1Search:\n" f" Vel: [{self.min_vel}, {self.max_vel}) in {self.vel_steps} steps.\n" f" Ang: [{self.min_ang}, {self.max_ang}) in {self.ang_steps} steps." ) def __len__(self): return self.ang_steps * self.vel_steps
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` A ``Trajectory`` to test at each pixel. """ for ang_i in range(self.ang_steps): for vel_i in range(self.vel_steps): curr_ang = self.min_ang + ang_i * self.ang_stepsize curr_vel = self.min_vel + vel_i * self.vel_stepsize vx = math.cos(curr_ang) * curr_vel vy = math.sin(curr_ang) * curr_vel yield Trajectory(vx=vx, vy=vy)
[docs]class KBMODV1SearchConfig(KBMODV1Search): """Search a grid defined by velocities and angles in the format of the legacy configuration file.""" def __init__(self, v_arr, ang_arr, average_angle=None, work_unit=None, **kwargs): """Create a class KBMODV1SearchConfig. Parameters ---------- v_arr : `list` A triplet of the minimum velocity to use (in pixels per day), and the maximum velocity magnitude (in pixels per day), and the number of velocity steps to try. ang_arr : `list` A triplet of the minimum angle offset (in radians), and the maximum angle offset (in radians), and the number of angles to try. average_angle : `float`, optional The central angle to search around. Should align with the ecliptic in most cases. work_unit : `WorkUnit`, optional A WorkUnit to provide additional information about the data that can be used to derive parameters that depend on the input. """ if len(v_arr) != 3: raise ValueError("KBMODV1SearchConfig requires v_arr to be length 3") if len(ang_arr) != 3: raise ValueError("KBMODV1SearchConfig requires ang_arr to be length 3") if average_angle is None: if work_unit is None: raise ValueError( "KBMODV1SearchConfig requires a valid average_angle or a WorkUnit with a WCS." ) average_angle = work_unit.compute_ecliptic_angle() ang_min = average_angle - ang_arr[0] ang_max = average_angle + ang_arr[1] super().__init__(v_arr[2], v_arr[0], v_arr[1], ang_arr[2], ang_min, ang_max, **kwargs)
[docs]class EclipticCenteredSearch(TrajectoryGenerator): """Search a grid defined by velocities and angles relative to the ecliptic. Attributes ---------- ecliptic_angle : `float` The angle to use for the ecliptic in the image (in the units defined in ``angle_units``). velocities : `list` A triplet of the minimum velocity to use (in pixels per day), and the maximum velocity magnitude (in pixels per day), and the number of velocity steps to try. angles : `list` A triplet of the minimum angle offset (in radians), and the maximum angle offset (in radians), and the number of angles to try. min_ang : `float` The minimum search angle in the image (ecliptic_angle + min_ang_offset) in radians. max_ang : `float` The maximum search angle in the image (ecliptic_angle + max_ang_offset) in radians. """ def __init__( self, velocities=[0.0, 0.0, 0], angles=[0.0, 0.0, 0], angle_units="radian", velocity_units="pix / d", given_ecliptic=None, work_unit=None, **kwargs, ): """Create a class EclipticCenteredSearch. Parameters ---------- velocities : `list` A triplet of the minimum velocity to use (in pixels per day), and the maximum velocity magnitude (in pixels per day), and the number of velocity steps to try. angles : `list` A triplet of the minimum angle offset (in the units defined in ``angle_units``), the maximum angle offset (in the units defined in ``angle_units``), and the number of angles to try. angle_units : `str` The units for the angle. Default: 'radian' velocity_units : `str` The units for the angle. Default: 'pix / d' given_ecliptic : `float`, optional An override for the ecliptic as given in the config (in the units defined in ``angle_units``). This angle takes precedence over ``computed_ecliptic``. work_unit : `WorkUnit`, optional A WorkUnit to provide additional information about the data that can be used to derive parameters that depend on the input. """ super().__init__(**kwargs) ang_units = u.Unit(angle_units) vel_units = u.Unit(velocity_units) if given_ecliptic is not None: self.ecliptic_angle = (given_ecliptic * ang_units).to(u.rad).value elif work_unit is not None: # compute_ecliptic_angle() always produces radians. self.ecliptic_angle = work_unit.compute_ecliptic_angle() logger.info(f"Computing ecliptic angle from wcs: {self.ecliptic_angle}") else: logger.warning("No ecliptic angle provided. Using 0.0.") self.ecliptic_angle = 0.0 if len(angles) != 3: raise ValueError("Invalid angles parameter. Expected a length 3 list.") if len(velocities) != 3: raise ValueError("Invalid velocity parameter. Expected a length 3 list.") if angles[2] < 1: raise ValueError("EclipticCenteredSearch requires at least 1 step in angles.") if velocities[1] < velocities[0]: raise ValueError(f"Invalid EclipticCenteredSearch: {velocities[1]} < {velocities[0]}") if velocities[2] < 1: raise ValueError("EclipticCenteredSearch requires at least 1 step in velocities.") self.velocities = [ (velocities[0] * vel_units).to(u.pixel / u.day).value, (velocities[1] * vel_units).to(u.pixel / u.day).value, velocities[2], ] self.vel_stepsize = (velocities[1] - velocities[0]) / float(velocities[2] - 1) # Compute the angle bounds and step size in radians. self.angles = [ (angles[0] * ang_units).to(u.rad).value, (angles[1] * ang_units).to(u.rad).value, angles[2], ] self.min_ang = self.ecliptic_angle + self.angles[0] self.max_ang = self.ecliptic_angle + self.angles[1] self.ang_stepsize = (self.max_ang - self.min_ang) / float(self.angles[2] - 1) def __repr__(self): return ( "EclipticSearch:" f" v=[{self.velocities[0]}, {self.velocities[1]}], {self.velocities[2]}" f" a=[{self.min_ang}, {self.max_ang}], {self.ang_steps}" ) def __str__(self): return f"""EclipticSearch: Vel: [{self.velocities[0]}, {self.velocities[1]}] in {self.velocities[2]} steps. Ang: Ecliptic = {self.ecliptic_angle} Offsets = {self.angles[0]} to {self.angles[1]} [{self.min_ang}, {self.max_ang}] in {self.angles[2]} steps.""" def __len__(self): return self.angles[2] * self.velocities[2]
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` A ``Trajectory`` to test at each pixel. """ for ang_i in range(self.angles[2]): for vel_i in range(self.velocities[2]): curr_ang = self.min_ang + ang_i * self.ang_stepsize curr_vel = self.velocities[0] + vel_i * self.vel_stepsize vx = math.cos(curr_ang) * curr_vel vy = math.sin(curr_ang) * curr_vel yield Trajectory(vx=vx, vy=vy)
[docs]class RandomVelocitySearch(TrajectoryGenerator): """Search a grid defined by min/max bounds on pixel velocities.""" def __init__(self, min_vx, max_vx, min_vy, max_vy, max_samples=1_000_000, **kwargs): """Create a class KBMODV1Search. Parameters ---------- min_vx : `float` The minimum velocity magnitude (in pixels per day) max_vx : `float` The maximum velocity magnitude (in pixels per day) min_vy : `float` The minimum velocity magnitude (in pixels per day) max_vy : `float` The maximum velocity magnitude (in pixels per day) max_samples : `int` The maximum number of samples to generate. Used to avoid infinite loops in KBMOD code. """ super().__init__(**kwargs) if max_vx < min_vx or max_vy < min_vy: raise ValueError("Invalid RandomVelocitySearch bounds.") if max_samples <= 0: raise ValueError(f"Invalid maximum samples.") self.min_vx = min_vx self.max_vx = max_vx self.min_vy = min_vy self.max_vy = max_vy self.samples_left = max_samples def __repr__(self): return ( "RandomVelocitySearch:" f" vx=[{self.min_vx}, {self.max_vx}]" f" vy=[{self.min_vy}, {self.max_vy}]" ) def __str__(self): return self.__repr__() def __len__(self): return self.samples_left
[docs] def reset_sample_count(self, max_samples): """Reset the counter of samples left. Parameters ---------- max_samples : `int` The maximum number of samples to generate. """ if max_samples <= 0: raise ValueError(f"Invalid maximum samples.") self.samples_left = max_samples
[docs] def generate(self, *args, **kwargs): """Produces a single candidate trajectory to test. Returns ------- candidate : `Trajectory` a ``Trajectory`` to test at each pixel. """ while self.samples_left > 0: self.samples_left -= 1 vx = self.min_vx + random.random() * (self.max_vx - self.min_vx) vy = self.min_vy + random.random() * (self.max_vy - self.min_vy) yield Trajectory(vx=vx, vy=vy)