"""A collection of utility functions for working with files in KBMOD."""
from astropy.coordinates import *
from astropy.time import Time
import astropy.units as u
from collections import OrderedDict
import csv
from math import copysign
import numpy as np
from pathlib import Path
import re
import kbmod.search as kb
[docs]class FileUtils:
"""A class of static methods for working with KBMOD files.
Examples
--------
* Load an external file of visit ID to timestamp mappings.
``time_dict = FileUtils.load_time_dictionary("kbmod/data/demo_times.dat")``
* Load the results of a KBMOD run as trajectory objects.
``FileUtils.load_results_file_as_trajectories("results_DEMO.txt")``
* Make a filename safe.
``FileUtils.make_safe_filename("my string, is here")``
"""
[docs] @staticmethod
def make_safe_filename(s):
"""Makes a safe file name out of an arbitrary string.
Preserves the separators (spaces, commas, tabs, etc.) with underscores
and removes all other non-alphanumeric characters.
Parameters
----------
s : string
The input string
Returns
-------
res : string
The output string
"""
separators = set([" ", ".", ",", ";", "\t", "\n", ":", "-", "|", "/"])
# If the character is a letter or number, keep it.
# If it is a separator, replace with "_".
# Otherwise discard it.
pick_char = lambda x: x if (x.isalnum() or x == "_") else ("_" if x in separators else "")
res = "".join(pick_char(x) for x in s)
return res
[docs] @staticmethod
def visit_from_file_name(filename):
"""Automatically extract the visit ID from the file name.
Uses the heuristic that the visit ID is the first numeric
string of at least length 5 digits in the file name.
Parameters
----------
filename : str
The file name
Returns
-------
result : str
The visit ID string or None if there is no match.
"""
expr = re.compile(r"\d{4}(?:\d+)")
res = expr.search(filename)
if res is None:
return None
return res.group()
[docs] @staticmethod
def save_csv_from_list(file_name, data, overwrite=False):
"""Save a CSV file from a list of lists.
Parameters
----------
file_name : str
The full path and file name for the result.
data : list
The data to save.
overwrite : bool
A Boolean indicating whether to overwrite the files
or raise an exception on file existance.
"""
if Path(file_name).is_file() and not overwrite:
raise ValueError(f"{file_name} already exists")
with open(file_name, "w") as f:
writer = csv.writer(f)
writer.writerows([x for x in data])
[docs] @staticmethod
def load_csv_to_list(file_name, use_dtype=None, none_if_missing=False):
"""Load a CSV file to a list of numpy arrays.
Parameters
----------
file_name : str
The full path and file name of the data.
use_dtype : type
The numpy array dtype to use.
none_if_missing : bool
Return None if the file is missing. The default is to
raise an exception if the file is missing.
Returns
-------
data : list of numpy arrays
The data loaded.
"""
if not Path(file_name).is_file():
if none_if_missing:
return None
else:
raise FileNotFoundError
data = []
with open(file_name, "r") as f:
reader = csv.reader(f)
for row in reader:
data.append(np.array(row, dtype=use_dtype))
return data
[docs] @staticmethod
def load_time_dictionary(time_file):
"""Load a OrderedDict mapping ``visit_id`` to time stamp.
Parameters
----------
time_file : str
The path and name of the time file.
Returns
-------
image_time_dict : OrderedDict
A mapping of visit ID to time stamp.
"""
# Load a mapping from visit numbers to the visit times. This dictionary stays
# empty if no time file is specified.
image_time_dict = OrderedDict()
if time_file is None or len(time_file) == 0:
return image_time_dict
with open(time_file, "r") as csvfile:
reader = csv.reader(csvfile, delimiter=" ")
for row in reader:
if len(row[0]) < 2 or row[0][0] == "#":
continue
image_time_dict[row[0]] = float(row[1])
return image_time_dict
[docs] @staticmethod
def save_time_dictionary(time_file_name, time_mapping):
"""Save the mapping of visit_id -> time stamp to a file.
Parameters
----------
time_file_name : str
The path and name of the time file.
time_mapping : dict or OrderedDict
The mapping of visit ID to time stamp.
"""
with open(time_file_name, "w") as file:
file.write("# visit_id mean_julian_date\n")
for k in time_mapping.keys():
file.write(f"{k} {time_mapping[k]}\n")
[docs] @staticmethod
def load_psf_dictionary(psf_file):
"""Load a OrderedDict mapping ``visit_id`` to PSF.
Parameters
----------
psf_file : str
The path and name of the PSF file.
Returns
-------
psf_dict : OrderedDict
A mapping of visit ID to psf value.
"""
# Load a mapping from visit numbers to the visit times. This dictionary stays
# empty if no time file is specified.
psf_dict = OrderedDict()
if psf_file is None or len(psf_file) == 0:
return psf_dict
with open(psf_file, "r") as csvfile:
reader = csv.reader(csvfile, delimiter=" ")
for row in reader:
if len(row[0]) < 2 or row[0][0] == "#":
continue
psf_dict[row[0]] = float(row[1])
return psf_dict
[docs] @staticmethod
def trajectory_from_np_object(result):
"""Transform a numpy object holding trajectory information
into a trajectory object.
Parameters
----------
result : np object
The result object loaded by numpy.
Returns
-------
trj : trajectory
The corresponding trajectory object.
"""
trj = kb.trajectory()
trj.x = int(result["x"])
trj.y = int(result["y"])
trj.x_v = float(result["vx"])
trj.y_v = float(result["vy"])
trj.flux = float(result["flux"])
trj.lh = float(result["lh"])
trj.obs_count = int(result["num_obs"])
return trj
[docs] @staticmethod
def save_results_file(filename, results):
"""Save the result trajectories to a file.
Parameters
----------
filename : str
The filename of the results.
results : list
list of trajectory objects.
"""
np.savetxt(filename, results, fmt="%s")
[docs] @staticmethod
def load_results_file(filename):
"""Load the result trajectories.
Parameters
----------
filename : str
The filename of the results.
Returns
-------
results : np array
A np array with the result trajectories.
"""
results = np.genfromtxt(
filename,
usecols=(1, 3, 5, 7, 9, 11, 13),
names=["lh", "flux", "x", "y", "vx", "vy", "num_obs"],
ndmin=2,
)
return results
[docs] @staticmethod
def load_results_file_as_trajectories(filename):
"""Load the result trajectories.
Parameters
----------
filename : str
The full path and filename of the results.
Returns
-------
results : list
A list of trajectory objects
"""
np_results = FileUtils.load_results_file(filename)
results = [FileUtils.trajectory_from_np_object(x) for x in np_results]
return results
[docs] @staticmethod
def mpc_reader(filename):
"""Read in a file with observations in MPC format and return the coordinates.
Parameters
----------
filename: str
The name of the file with the MPC-formatted observations.
Returns
-------
coords: astropy SkyCoord object
A SkyCoord object with the ra, dec of the observations.
times: astropy Time object
Times of the observations
"""
iso_times = []
time_frac = []
ra = []
dec = []
with open(filename, "r") as f:
for line in f:
year = str(line[15:19])
month = str(line[20:22])
day = str(line[23:25])
iso_times.append(str("%s-%s-%s" % (year, month, day)))
time_frac.append(str(line[25:31]))
ra.append(str(line[32:44]))
dec.append(str(line[44:56]))
coords = SkyCoord(ra, dec, unit=(u.hourangle, u.deg))
t = Time(iso_times)
t_obs = []
for t_i, frac in zip(t, time_frac):
t_obs.append(t_i.mjd + float(frac))
obs_times = Time(t_obs, format="mjd")
return coords, obs_times
[docs] @staticmethod
def save_results_mpc(file_out, coords, times, observatory="X05"):
"""
Save the MPC-formatted observations to file.
Parameters
----------
file_out: str
The output filename with the MPC-formatted observations
of the KBMOD search result.
coords : list of SkyCoord
A list of sky coordinates (SkyCoord objects) of the observation.
t : list of Time
A list of times for each observation.
observatory : string
The three digit observatory code to use.
"""
if len(times) != len(coords):
raise ValueError(f"Unequal lists {len(times)} != {len(coords)}")
with open(file_out, "w") as f:
for i in range(len(times)):
mpc_line = FileUtils.format_result_mpc(coords[i], times[i], observatory)
f.write(mpc_line + "\n")