Source code for kbmod.fake_data_creator

"""A class for creating fake data sets.

The FakeDataSet class allows the user to create fake data sets
for testing, including generating images with random noise and
adding artificial objects. The fake data can be saved to files
or used directly.
"""
import os
import sys
import random

from astropy.io import fits
from pathlib import Path

from kbmod.file_utils import *
from kbmod.search import *


[docs]class FakeDataSet: """This class creates fake data sets for testing and demo notebooks.""" def __init__(self, width, height, num_times, noise_level=2.0, psf_val=0.5, obs_per_day=3, use_seed=False): """The constructor. Parameters ---------- width : int The width of the images in pixels. height : int The height of the images in pixels. num_times : int The number of time steps (number of images). noise_level : float The level of the background noise. psf_val : float The value of the default PSF. obs_per_day : int The number of observations on the same night. use_seed : bool Use a deterministic seed to avoid flaky tests. """ self.width = width self.height = height self.psf_val = psf_val self.noise_level = noise_level self.num_times = num_times self.use_seed = use_seed self.trajectories = [] # Generate times with multiple observations per night # separated by ~15 minutes. self.times = [] seen_on_day = 0 day_num = 0 for i in range(num_times): t = 57130.2 + day_num + seen_on_day * 0.01 self.times.append(t) seen_on_day += 1 if seen_on_day == obs_per_day: seen_on_day = 0 day_num += 1 # Make the image stack. self.stack = self.make_fake_image_stack()
[docs] def make_fake_image_stack(self): """Make a stack of fake layered images. Returns ------- stack : image_stack """ p = psf(self.psf_val) image_list = [] for i in range(self.num_times): img = layered_image( ("%06i" % i), self.width, self.height, self.noise_level, self.noise_level**2, self.times[i], p, i if self.use_seed else -1, ) image_list.append(img) stack = image_stack(image_list) return stack
[docs] def insert_object(self, trj): """Insert a fake object given the trajectory. Parameters ---------- trj : trajectory The trajectory of the fake object to insert. """ t0 = self.times[0] for i in range(self.num_times): dt = self.times[i] - t0 px = trj.x + dt * trj.x_v + 0.5 py = trj.y + dt * trj.y_v + 0.5 # Get the image for the timestep, add the object, and # re-set the image. This last step needs to be done # explicitly because of how pybind handles references. current_layered_image = self.stack.get_single_image(i) current_layered_image.add_object(px, py, trj.flux) self.stack.set_single_image(i, current_layered_image) # Save the trajectory into the internal list. self.trajectories.append(trj)
[docs] def insert_random_object(self, flux): """Create a fake object and insert it into the image. Parameters ---------- flux : float The flux of the object. Returns ------- t : trajectory The trajectory of the inserted object. """ dt = self.times[-1] - self.times[0] # Create the random trajectory. t = trajectory() t.x = int(random.random() * self.width) xe = int(random.random() * self.width) t.x_v = (xe - t.x) / dt t.y = int(random.random() * self.height) ye = int(random.random() * self.height) t.y_v = (ye - t.y) / dt t.flux = flux # Insert the object. self.insert_object(t) return t
[docs] def save_fake_data(self, data_dir): """Create the fake data in a given directory. Parameters ---------- data_dir : str The path of the directory for the fake data. """ # Make the subdirectory if needed. dir_path = Path(data_dir) if not dir_path.is_dir(): print("Directory '%s' does not exist. Creating." % data_dir) os.mkdir(data_dir) # Save each of the image files. for i in range(self.stack.img_count()): img = self.stack.get_single_image(i) filename = f"{dir_path}/{img.get_name()}.fits" # If the file already exists, delete it. if Path(filename).exists(): os.remove(filename) # Save the file. img.save_layers(data_dir + "/") # Open the file and insert fake WCS data. hdul = fits.open(filename) hdul[1].header["WCSAXES"] = 2 hdul[1].header["CTYPE1"] = "RA---TAN-SIP" hdul[1].header["CTYPE2"] = "DEC--TAN-SIP" hdul[1].header["CRVAL1"] = 200.614997245422 hdul[1].header["CRVAL2"] = -7.78878863332778 hdul[1].header["CRPIX1"] = 1033.934327 hdul[1].header["CRPIX2"] = 2043.548284 hdul[1].header["CD1_1"] = -1.13926485986789e-07 hdul[1].header["CD1_2"] = 7.31839748843125e-05 hdul[1].header["CD2_1"] = -7.30064978350695e-05 hdul[1].header["CD2_2"] = -1.27520156332774e-07 hdul[1].header["CTYPE1A"] = "LINEAR " hdul[1].header["CTYPE2A"] = "LINEAR " hdul[1].header["CUNIT1A"] = "PIXEL " hdul[1].header["CUNIT2A"] = "PIXEL " hdul.writeto(filename, overwrite=True) hdul.close()
[docs] def save_time_file(self, file_name): """Save the mapping of visit ID -> timestamp to a file. Parameters ---------- file_name : str The file name for the timestamp file. """ mapping = {} for i in range(self.num_times): id_str = self.stack.get_single_image(i).get_name() mapping[id_str] = self.times[i] FileUtils.save_time_dictionary(file_name, mapping)
[docs] def delete_fake_data(self, data_dir): """Remove the fake data in a given directory. Parameters ---------- data_dir : str The path of the directory for the fake data. """ for i in range(self.stack.img_count()): img = self.stack.get_single_image(i) filename = f"{data_dir}/{img.get_name()}.fits" if Path(filename).exists(): os.remove(filename)