Source code for kbmod.fake_data.fake_data_creator

"""A class for creating fake data sets.

The FakeDataSet class allows the user to create fake data sets
for testing, including generating images with random noise and
adding artificial objects. The fake data can be saved to files
or used directly.
"""

import numpy as np
import warnings

from kbmod.configuration import SearchConfiguration
from kbmod.core.image_stack_py import ImageStackPy
from kbmod.core.psf import PSF
from kbmod.core.shift_and_stack import generate_psi_phi_from_image_stack
from kbmod.core.stamp_utils import extract_curve_values
from kbmod.filters.stamp_filters import append_all_stamps, append_coadds
from kbmod.results import Results
from kbmod.search import Trajectory
from kbmod.work_unit import WorkUnit


[docs]def create_fake_times(num_times, t0=0.0, obs_per_day=1, intra_night_gap=0.01, inter_night_gap=1): """Create a list of times based on a cluster of ``obs_per_day`` observations each night spaced out ``intra_night_gap`` within a night and ``inter_night_gap`` data between observing nights. Parameters ---------- num_times : `int` The number of time steps (number of images). t0 : `float` The time stamp of the first observation [default=0.0] obs_per_day : `int` The number of observations on the same night [default=1] intra_night_gap : `float` The time (in days) between observations in the same night [default=0.01] inter_night_gap : `int` The number of days between observing runs [default=1] Returns ------- result_times : `list` A list of ``num_times`` floats indicating the different time stamps. """ if num_times <= 0: raise ValueError(f"Invalid number of times {num_times}") result_times = [] seen_on_day = 0 # Number seen so far on the current day day_num = 0 for i in range(num_times): result_times.append(t0 + day_num + seen_on_day * intra_night_gap) seen_on_day += 1 if seen_on_day == obs_per_day: seen_on_day = 0 day_num += inter_night_gap return result_times
[docs]def make_fake_image_stack(height, width, times, noise_level=2.0, psf_val=0.5, psfs=None, rng=None): """Create a fake ImageStackPy for testing. Parameters ---------- width : int The width of the images in pixels. height : int The height of the images in pixels. times : list A list of time stamps. noise_level : float The level of the background noise. Default: 2.0 psf_val : float The value of the default PSF. Used if individual psfs are not specified. Default: 0.5 psfs : `list` of `numpy.ndarray`, optional A list of PSF kernels. If none, Gaussian PSFs from with std=psf_val are used. rng : np.random.Generator The random number generator to use. If None creates a new random generator. Default: None """ if rng is None: rng = np.random.default_rng() times = np.asarray(times) # Create the science and variance images. sci = [rng.normal(0.0, noise_level, (height, width)).astype(np.float32) for i in range(len(times))] var = [np.full((height, width), noise_level**2).astype(np.float32) for i in range(len(times))] # Create the PSF information. if psfs is None: psf_kernel = PSF.make_gaussian_kernel(psf_val) psfs = [psf_kernel for i in range(len(times))] elif len(psfs) != len(times): raise ValueError(f"The number of PSFs ({len(psfs)}) must be the same as times ({len(times)}).") return ImageStackPy(times, sci, var, psfs=psfs)
[docs]def image_stack_add_random_masks(stack, mask_fraction, rng=None): """Add random masks to the image stack. Parameters ---------- stack : ImageStackPy The image stack to modify. mask_fraction : float The fraction of pixels to mask in each image [0.0, 1.0]. rng : np.random.Generator, optional The random number generator to use. If None creates a new random generator. Default: None """ if not (0.0 <= mask_fraction <= 1.0): raise ValueError(f"Invalid mask fraction {mask_fraction}. Must be between 0 and 1.") if rng is None: rng = np.random.default_rng() for idx in range(len(stack.sci)): mask = rng.random(stack.sci[idx].shape) < mask_fraction stack.sci[idx][mask] = np.nan stack.var[idx][mask] = np.nan
[docs]def image_stack_add_fake_object(stack, x, y, vx, vy, *, ax=0.0, ay=0.0, flux=100.0): """Insert a fake object given the trajectory. Parameters ---------- stack : ImageStackPy The image stack to modify. x : int The x-coordinate of the object at the first time (in pixels). y : int The y-coordinate of the object at the first time (in pixels). vx : float The x-velocity of the object (in pixels per day). vy : float The y-velocity of the object (in pixels per day). ax : float, optional The x-acceleration of the object (in pixels per day^2). This is only used to test non-linear trajectories and defaults to 0.0. ay : float, optional The y-acceleration of the object (in pixels per day^2). This is only used to test non-linear trajectories and defaults to 0.0. flux : float, optional The flux of the object. Default: 100.0 """ for idx, t in enumerate(stack.zeroed_times): psf_kernel = stack.psfs[idx] psf_dim = psf_kernel.shape[0] psf_radius = psf_dim // 2 px = int(x + vx * t + 0.5 * ax * t * t + 0.5) py = int(y + vy * t + 0.5 * ay * t * t + 0.5) for psf_y in range(psf_dim): for psf_x in range(psf_dim): img_x = px + psf_x - psf_radius img_y = py + psf_y - psf_radius # Only add flux to pixels inside the image with non-masked values. if ( img_x >= 0 and img_x < stack.width and img_y >= 0 and img_y < stack.height and np.isfinite(stack.sci[idx][img_y, img_x]) ): stack.sci[idx][img_y, img_x] += flux * psf_kernel[psf_y, psf_x]
[docs]class FakeDataSet: """This class creates fake data sets for testing and demo notebooks.""" def __init__( self, width, height, times, *, mask_fraction=0.0, noise_level=2.0, psf_val=0.5, psfs=None, artifacts_fraction=0.0, artifacts_mean=0.0, artifacts_std=2.0, use_seed=-1, ): """The constructor. Parameters ---------- width : `int` The width of the images in pixels. height : `int` The height of the images in pixels. times : `list` A list of time stamps, such as produced by ``create_fake_times``. noise_level : `float` The level of the background noise. Default: 2.0 psf_val : `float` The value of the default PSF std. Used if individual psfs are not specified. Default: 0.5 psfs : `list` of `numpy.ndarray`, optional A list of PSF kernels. If none, Gaussian PSFs from with std=psf_val are used. mask_fraction : `float`, optional The fraction of pixels to mask in each image [0.0, 1.0]. Default: 0.0 (no masks). artifacts_fraction : `float`, optional The fraction of pixels to modify with noise artifacts that are brighter than the background noise [0.0, 1.0]. Default: 0.0 (no artifacts). artifacts_mean : `float`, optional The mean value of the artifacts in units of flux. Default: 0.0 (no artifacts). artifacts_std : `float`, optional The standard deviation of the artifacts. Default: 2.0 (same as noise level). use_seed : `int` Use a deterministic seed to avoid flaky tests. Default: -1 (no seed, random behavior). """ self.times = times self.num_times = len(times) if self.num_times == 0: raise ValueError("The list of times must not be empty.") # Base image information. self.width = width self.height = height self.noise_level = noise_level self.mask_fraction = mask_fraction if width <= 0 or height <= 0: raise ValueError(f"Invalid image dimensions: width={width}, height={height}") if not (0.0 <= mask_fraction <= 1.0): raise ValueError(f"Invalid mask fraction {mask_fraction}. Must be between 0 and 1.") # Artifacts information. self.artifacts_fraction = artifacts_fraction self.artifacts_mean = artifacts_mean self.artifacts_std = artifacts_std if not (0.0 <= artifacts_fraction <= 1.0): raise ValueError(f"Invalid artifacts fraction {artifacts_fraction}. Must be between 0 and 1.") # PSF information. self.psf_val = psf_val self.psfs = psfs # Set up the random number generator. self.use_seed = use_seed if use_seed < 0: self.rng = np.random.default_rng() else: self.rng = np.random.default_rng(use_seed) # Set the auxiliary data to empty. self.trajectories = [] self.fake_wcs = None # Make the image stack, mask out the pixels, and add the artifacts. self.reset()
[docs] def reset(self): """Regenerate the image stack and clear the inserted objects.""" self.stack_py = make_fake_image_stack( self.height, self.width, self.times, noise_level=self.noise_level, psf_val=self.psf_val, psfs=self.psfs, rng=self.rng, ) if self.mask_fraction > 0.0: image_stack_add_random_masks(self.stack_py, self.mask_fraction, rng=self.rng) if self.artifacts_fraction > 0.0: self.insert_random_artifacts( self.artifacts_fraction, self.artifacts_mean, self.artifacts_std, ) # Clear the list of inserted objects and WCS. self.trajectories = []
[docs] def set_wcs(self, new_wcs): """Set a new fake WCS to be used for this data. Parameters ---------- new_wcs : `astropy.wcs.WCS` The WCS to use. """ self.fake_wcs = new_wcs
[docs] def insert_object(self, trj): """Insert a fake object given the trajectory. Parameters ---------- trj : `Trajectory` The trajectory of the fake object to insert. """ # Insert the object into the images. image_stack_add_fake_object( self.stack_py, trj.x, trj.y, trj.vx, trj.vy, flux=trj.flux, ) # Save the trajectory into the internal list. self.trajectories.append(trj)
[docs] def trajectory_is_within_bounds(self, trj): """Check if the trajectory is within the image bounds for all times.""" dt = self.times[-1] - self.times[0] xe = trj.x + trj.vx * dt ye = trj.y + trj.vy * dt return ( 0 <= trj.x < self.width and 0 <= trj.y < self.height and 0 <= xe < self.width and 0 <= ye < self.height )
[docs] def insert_random_object(self, flux, vx=None, vy=None): """Create a fake object that will be within the image bounds in all images. Parameters ---------- flux : `float` The flux of the object. vx : `float` or `list` of `float`, optional The x-velocity or list of allowable x-velocities (in pixels per day). If None, a random x-velocity is generated. vy : `float` or `list` of `float`, optional The y-velocity or list of allowable y-velocities (in pixels per day). If None, a random y-velocity is generated. Returns ------- trj : `Trajectory` The trajectory of the inserted object. """ dt = self.times[-1] - self.times[0] trj = Trajectory(flux=flux) is_valid = False itr = 0 while not is_valid and itr < 1000: # We use rejection sampling to ensure the object is visible in all images. trj.x = int(self.rng.random() * self.width) trj.y = int(self.rng.random() * self.height) if vx is None: # If no x-velocity is specified, create one by picking a random end point. xe = int(self.rng.random() * self.width) trj.vx = (xe - trj.x) / dt elif np.isscalar(vx): # If a scalar is given, use it as the x-velocity. trj.vx = vx else: # If a vector is given, pick a random one. trj.vx = self.rng.choice(vx) if vy is None: # If no y-velocity is specified, create one by picking a random end point. ye = int(self.rng.random() * self.height) trj.vy = (ye - trj.y) / dt elif np.isscalar(vy): # If a scalar is given, use it as the y-velocity. trj.vy = vy else: # If a vector is given, pick a random one. trj.vy = self.rng.choice(vy) # Check if the object is visible in all images. If not, try again. is_valid = self.trajectory_is_within_bounds(trj) itr += 1 if not is_valid: warnings.warn( "Failed to create a valid random object after 1000 attempts. " "The object may not be visible in all images." ) # Insert the object. self.insert_object(trj) return trj
[docs] def insert_random_objects_from_generator(self, num_trj, generator, flux): """Insert a number of random objects based on a given TrajectorGenerator. This ensures that the inserted objects can be recovered by the search. Parameters ---------- num_trj : `int` The number of trajectories to insert. generator : `TrajectoryGenerator` The generator to use for creating the trajectories. flux : `float` The flux of the object. Returns ------- trjs : `List` of `Trajectory` The list of all the trajectories inserted. """ # Extract the list of possible velocities from the generator. candidate_trjs = [trj for trj in generator] if len(candidate_trjs) == 0: raise ValueError("The generator did not produce any trajectories.") # Insert the objects one at a time, using rejection sampling to ensure # they appear in all images. trjs = [] itr = 0 while len(trjs) < num_trj and itr < 10 * num_trj: # Pick a random trajectory from the generator to use for its velocity. trj_v = self.rng.choice(candidate_trjs) trj = Trajectory( x = int(self.rng.random() * self.width), y = int(self.rng.random() * self.height), vx = trj_v.vx, vy = trj_v.vy, flux=flux, ) if self.trajectory_is_within_bounds(trj): # If the trajectory is valid, insert it. self.insert_object(trj) trjs.append(trj) itr += 1 if len(trjs) < num_trj: warnings.warn(f"Only inserted {len(trjs)} out of {num_trj} requested trajectories.") return trjs
[docs] def insert_random_artifacts(self, fraction, mean, std): """Insert noise artifacts into the images that are brighter than the background noise. Parameters ---------- fraction : `float` The fraction of pixels to modify [0.0, 1.0] mean : `float` The mean value of the artifacts in units of flux. std : `float` The standard deviation of the artifacts. """ if not (0.0 <= fraction <= 1.0): raise ValueError(f"Invalid fraction {fraction}. Must be between 0 and 1.") for idx in range(len(self.stack_py.sci)): to_add = self.rng.random(self.stack_py.sci[idx].shape) < fraction # Only add to unmasked pixels. mask = self.stack_py.get_mask(idx) to_add &= ~mask self.stack_py.sci[idx][to_add] += self.rng.normal(mean, std, size=np.sum(to_add))
[docs] def get_work_unit(self, config=None): """Create a WorkUnit from the fake data. Parameters ---------- filename : `str` The name of the resulting WorkUnit file. config : `SearchConfiguration`, optional The configuration parameters to use. If None then uses default values. """ if config is None: config = SearchConfiguration() # Create the WorkUnit, but disable warnings about no WCS since this if fake data. with warnings.catch_warnings(): warnings.simplefilter("ignore") work = WorkUnit(im_stack=self.stack_py, config=config, wcs=self.fake_wcs) return work
[docs] def save_fake_data_to_work_unit(self, filename, config=None): """Create the fake data in a WorkUnit file. Parameters ---------- filename : `str` The name of the resulting WorkUnit file. config : `SearchConfiguration`, optional The configuration parameters to use. If None then uses default values. """ work = self.get_work_unit(config) work.to_fits(filename)
[docs] def make_results( self, generate_psi_phi=True, generate_all_stamps=True, stamp_radius=10, coadds=["sum", "mean", "median"], ): """Create a Results object that corresponds to the trajectories inserted into this fake data set. All the results will be based on true fakes. Parameters ---------- generate_psi_phi : `bool`, optional If True, generates the PSI and PHI images from the image stack. Default: True generate_all_stamps : `bool`, optional If True, generates all the stamps for the trajectories. Default: True stamp_radius : `int`, optional The radius of the stamps to generate. Default: 10. coadds : `list` of `str`, optional A list of coadd types to generate. Can be "sum", "mean", "median", and "weighted". Default: ["sum", "mean", "median"] Returns ------- results : `Results` The results object containing the trajectories, psi/phi, and coadds. """ if len(self.trajectories) == 0: raise ValueError("No trajectories in the fake data set.") # Create a results data set that has the inserted trajectories. results = Results.from_trajectories(self.trajectories, track_filtered=False) if generate_psi_phi: # Generate the psi and phi images from the image stack. psi_imgs, phi_imgs = generate_psi_phi_from_image_stack(self.stack_py) # Generate the psi and phi curves for each trajectory. psi_curves = [] phi_curves = [] for trj in self.trajectories: xvals = (trj.x + self.stack_py.zeroed_times * trj.vx + 0.5).astype(int) yvals = (trj.y + self.stack_py.zeroed_times * trj.vy + 0.5).astype(int) psi_curves.append(extract_curve_values(psi_imgs, xvals, yvals)) phi_curves.append(extract_curve_values(phi_imgs, xvals, yvals)) # Append the curves to the results. results.add_psi_phi_data(psi_curves, phi_curves) # Add the stamp data. if generate_all_stamps: append_all_stamps(results, self.stack_py, stamp_radius=stamp_radius) append_coadds(results, self.stack_py, coadds, stamp_radius) return results