"""Class for standardizing Data Products of Vera C. Rubin Science Pipelines
via the Rubin Data Butler.
"""
import importlib
import uuid
import astropy.nddata as bitmask
from astropy.wcs import WCS
import astropy.units as u
import numpy as np
from .standardizer import Standardizer, StandardizerConfig
from kbmod.core.psf import PSF
from kbmod.search import LayeredImage
__all__ = [
"ButlerStandardizer",
"ButlerStandardizerConfig",
]
def deferred_import(module, name=None):
"""Imports module/class/function/name as ``name`` into global modules.
If module/class/function/name already exists does nothing.
Intended for importing a large module or a library only when needed, as to
avoid the cost of the import if the functionality depending on the imported
library is not being used.
Used to defer the import of Vera C. Rubin Middleware component only during
`ButlerStandardizer` initialization so that we may be able to import KBMOD
before the heat death of the universe.
Parameters
----------
module : `str`
Fully qualified name of the module/submodule/class/function/name.
name : `str` or `None`, optional
Name the target is imported as.
Raises
------
ImportError :
Target is not reachable. Ensure package is
installed and visible int the environment.
"""
if name is None:
name = module
if globals().get(name, False):
return
try:
globals()[name] = importlib.import_module(module)
except ImportError as e:
raise ImportError(f"Module {module} or name {name} not found.") from e
[docs]class ButlerStandardizerConfig(StandardizerConfig):
do_mask = True
"""Perform masking if ``True``, otherwise return an empty mask."""
do_bitmask = True
"""Mask ``mask_flags`` from the mask plane in the FITS file."""
do_threshold = False
"""Mask all pixels above the given count threshold."""
grow_mask = True
"""Grow mask footprint by ``grow_kernel_shape``"""
brightness_treshold = 10
"""Pixels with value greater than this threshold will be masked."""
grow_kernel_shape = (10, 10)
"""Size of the symmetric square kernel by which mask footprints will be
increased by."""
mask_flags = ["BAD", "CLIPPED", "CR", "CROSSTALK", "EDGE", "NO_DATA", "SAT", "SENSOR_EDGE", "SUSPECT"]
"""List of flags that will be masked."""
psf_std = 1
"""Standard deviation of the Point Spread Function."""
standardize_metadata = True
"""Fetch and include values from Rubin's Exposure.metadata
PropertyList such as 'OBSID', 'AIRMASS', 'DIMM2SEE' ec. Typically
corresponds to ingested header values, ergo the capitalized column
names.
"""
standardize_effective_summary_stats = False
"""Include the "effective" fit metrics from SummaryStats"""
standardize_uri = False
"""Include an URL-like path to to the file"""
zero_point = 31
"""Photometric zero point to which all the science and variance will be scaled to."""
greedy_export = False
"""If True, the standardizer will keep the Exposure object in memory
after the LayeredImage is created. This is useful for large datasets."""
[docs]class ButlerStandardizer(Standardizer):
"""Standardizer for Vera C. Rubin Data Products, namely the underlying
``Exposure`` objects of ``calexp``, ``difim`` and various ``*coadd``
dataset types.
This standardizer will volunteer to process ``DatasetRef`` or ``DatasetId``
objects. The Rubin Data Butler is expected to be provided at
instantiation time, in addition to the target we want to standardize.
Parameters
----------
tgt : `lsst.daf.butler.core.DatasetId`, `lsst.daf.butler.core.DatasetRef` or `int`
Target to standardize.
butler : `lsst.daf.butler.Butler`
Vera C. Rubin Data Butler.
config : `StandardizerConfig`, `dict` or `None`, optional
Configuration key-values used when standardizing the file.
Attributes
----------
butler : `lsst.daf.butler.Butler`
Vera C. Rubin Data Butler.
ref : `lsst.daf.butler.core.DatasetRef`
Dataset reference to the given target
exp : `lsst.afw.image.exposure.Exposure`
The `Exposure` object targeted by the ``ref``
processable : `list[lsst.afw.image.exposure.Exposure]`
Items marked as processable by the standardizer. See `Standardizer`.
"""
name = "ButlerStandardizer"
priority = 2
configClass = ButlerStandardizerConfig
[docs] @classmethod
def resolveTarget(self, tgt):
# DatasetId is a type alias for UUID's so it'll be like a large int or
# hex or string of int/hex etc. We try to cast to UUID to check if str
# is compliant
# https://github.com/lsst/daf_butler/blob/main/python/lsst/daf/butler/_dataset_ref.py#L265
if isinstance(tgt, uuid.UUID):
return True
if isinstance(tgt, str):
try:
uuid.UUID(tgt)
except ValueError:
return False
else:
return True
# kinda hacky, but I don't want to import the entire Stack before I
# absolutely know I need/have it.
tgttype = str(type(tgt)).lower()
if "datasetref" in tgttype or "datasetid" in tgttype:
return True
return False
def __init__(self, dataId, butler, config=None, **kwargs):
deferred_import("lsst.daf.butler", "dafButler")
# Somewhere around w_2024_ builds the datastore.root
# was removed as an attribute of the datastore, not sure
# it was ever replaced with anything back-compatible
try:
super().__init__(str(butler._datastore.root), config=config)
except AttributeError:
super().__init__(butler.datastore.root, config=config)
self.butler = butler
# including records expands the dataId to include
# key pieces of information such as filter and band
# loading datastore_records could be a shortcut to
# relative path inside the repository
if isinstance(dataId, dafButler.DatasetRef):
ref = dataId
elif isinstance(dataId, dafButler.DatasetId):
ref = butler.get_dataset(dataId, dimension_records=True)
elif isinstance(dataId, (uuid.UUID, str)):
did = dafButler.DatasetId(dataId)
ref = butler.get_dataset(did, dimension_records=True)
else:
raise TypeError(
"Expected DatasetRef, DatasetId or an unique integer ID, " f"got {dataId} instead."
)
self.ref = ref
self.exp = None
self.processable = [self.exp]
# This is for lazy loading
self._bbox = None
self._wcs = None
# This is set for internal use because often we can't share
# between methods and they all need these values. Centralizing
# metadatada fetching via butler also helps performance
self._metadata = None
self._naxis1 = None
self._naxis2 = None
def _computeSkyBBox(self, wcs, dimX, dimY):
"""Given an Rubin SkyWCS object and the dimensions of an image
calculates the values of world coordinates image center and
image corners.
The corners are given by the following indices:
topleft topright
(0, dimX) ---------- (dimY, dimX)
| |
| x |
| (dimY/2, dimX/2) |
| center |
| |
(0, 0) ---------- (dimY, 0)
botleft botright
Parameters
----------
wcs : `object`
World coordinate system object, must support standard WCS API.
dimX : `int`
Maximal index in the NumPy convention x-axis, a "height".
dimY : `int`
Maximal index in the NumPy convention y-axis, a "width"
return_type : `str`, optional
A 'dict' or an 'array', the type the result is returned as.
Returns
-------
standardizedBBox : `dict` or `array`
An array of shape ``(5, 2)`` starting with center coordinate,
then bottom left and progressing clockwise around the detector.
When a dictionary, ``ra`` and ``dec`` keys mark center coordinates.
The ``'ra_bl', 'dec_bl'`` mark bottom left. Progressing clocwise
again, ``tl``, ``tr`` and ``br`` mark top left, top right and bottom
right mark the edge position, and ``ra_`` and ``dec_`` prefix the
coordinate.
Notes
-----
The center point is assumed to be at the (dimX/2, dimY/2) pixel
coordinates, rounded down.
Bottom left corner is taken to be the (0,0)-th pixel and image lies
in the first quadrant of a unit circle to match Astropy's convention.
"""
center = wcs.pixelToSky(int(dimY // 2), int(dimX // 2))
botleft = wcs.pixelToSky(0, 0)
topleft = wcs.pixelToSky(0, dimX)
topright = wcs.pixelToSky(dimY, dimX)
botright = wcs.pixelToSky(dimY, 0)
pts = np.array(
[
[center.getRa().asDegrees(), center.getDec().asDegrees()],
[botleft.getRa().asDegrees(), botleft.getDec().asDegrees()],
[topleft.getRa().asDegrees(), topleft.getDec().asDegrees()],
[topright.getRa().asDegrees(), topright.getDec().asDegrees()],
[botright.getRa().asDegrees(), botright.getDec().asDegrees()],
]
)
return pts
def _fetch_meta(self):
"""Fetch metadata and any dataset components that do not
load the image or large amount of data.
This resolves the majority of the metadata, temporal and
spatial information required by KBMOD, which is stored
in the attributes and then returned in the standardize
mehtods. This prevents having to evaluate queries to the
Butler Registry and Datastore repeatedly, and enables the
evalution of information that spans multiple datasets without
forcing the Butler.get of a larger dataset (f.e. the Exposure
object).
"""
self._metadata = {}
# First we standardize the required metadata. Most of this can
# be extracted from the dataset reference and a visitInfo.
# This includes i) data required to roundtrip the standardizer
# ii) timestamp and iii) pointing information
self._metadata["dataId"] = str(self.ref.id)
self._metadata["collection"] = self.ref.run
self._metadata["datasetType"] = self.ref.datasetType.name
self._metadata["visit"] = self.ref.dataId["visit"]
self._metadata["detector"] = self.ref.dataId["detector"]
self._metadata["band"] = self.ref.dataId["band"]
self._metadata["filter"] = self.ref.dataId["physical_filter"]
visit_ref = self.ref.makeComponentRef("visitInfo")
visit = self.butler.get(visit_ref)
expt = visit.exposureTime
mjd_start = visit.date.toAstropy()
half_way = mjd_start + (expt / 2) * u.s + 0.5 * u.s
self._metadata["exposureTime"] = expt
# Note the timescales for MJD. The Butler uses TAI, but we convert
# time stamps to UTC for consistency.
# Name mjd into mjd_mid - make it obvious it's middle of exposure.
self._metadata["mjd_start"] = mjd_start.utc.mjd
self._metadata["mjd_mid"] = half_way.utc.mjd
self._metadata["object"] = visit.object
self._metadata["pointing_ra"] = visit.boresightRaDec.getRa().asDegrees()
self._metadata["pointing_dec"] = visit.boresightRaDec.getDec().asDegrees()
self._metadata["airmass"] = visit.boresightAirmass
obs = visit.getObservatory()
self._metadata["obs_lon"] = obs.getLongitude().asDegrees()
self._metadata["obs_lat"] = obs.getLatitude().asDegrees()
self._metadata["obs_elev"] = obs.getElevation()
# Pointing information is hard to standardize because the
# dimensions of the detector are not easily availible. We get
# those from BBox (in-pixel bounding box). NAXIS values are
# required if we reproject, so we must extract them if we can
bbox_ref = self.ref.makeComponentRef("bbox")
bbox = self.butler.get(bbox_ref)
self._naxis1 = bbox.getWidth()
self._naxis2 = bbox.getHeight()
# If the standardizer is re-used, many generators will be
# depleted, returning None as values. Cast to dict to make
# a copy.
wcs_ref = self.ref.makeComponentRef("wcs")
wcs = self.butler.get(wcs_ref)
meta = dict(wcs.getFitsMetadata())
meta["NAXIS1"] = self._naxis1
meta["NAXIS2"] = self._naxis2
self._wcs = WCS(meta)
# calculate the WCS "error" (max difference between edge coordinates
# from Rubin's more powerful SkyWCS and Atropy's Fits-WCS)
skyBBox = self._computeSkyBBox(wcs, self._naxis2, self._naxis1)
apyBBox = self._computeBBoxArray(self._wcs, self._naxis2, self._naxis1)
self._metadata["wcs_err"] = (skyBBox - apyBBox).max()
# TODO: see issue #666
# this will unroll the entire bbox into columns
# make sky bbox the default, since that one is guaranteed to be correct
self._bbox = self._bboxArrayToDict(skyBBox)
self._metadata.update(self._bbox)
# We need to fetch summary stats for the zero-point, so we
# might as well extract the rest out of it, exception is
# only the effective metrics, which may not exists for all filters
summary_ref = self.ref.makeComponentRef("summaryStats")
summary = self.butler.get(summary_ref)
self._metadata["psfSigma"] = summary.psfSigma
self._metadata["psfArea"] = summary.psfArea
self._metadata["nPsfStar"] = summary.nPsfStar
self._metadata["zeroPoint"] = summary.zeroPoint
self._metadata["skyBg"] = summary.skyBg
self._metadata["skyNoise"] = summary.skyNoise
self._metadata["meanVar"] = summary.meanVar
self._metadata["astromOffsetMean"] = summary.astromOffsetMean
self._metadata["astromOffsetStd"] = summary.astromOffsetStd
# The rest of the data here is optional, generally metadata
# is nice to standardize, but keys may change between
# different instruments, summary stats are useful for
# photometric analysis of the results, while the effective
# values are too often NaN. The URI location itself is
# ultimately not very useful, but helpful for data inspection.
if self.config.standardize_metadata:
meta_ref = self.ref.makeComponentRef("metadata")
meta = self.butler.get(meta_ref)
# dataId sometimes doesn't have a filter or a band,
# depending on the way the initial ref is resolved? Why
# is middleware so complicated! Best-effort attempt,
# 90% cases?
self._metadata["OBSID"] = meta["OBSID"]
# Note that the following metadata keys may not be present in all
# Rubin butlers, and we only extract them if available.
if "DTNSANAM" in meta:
self._metadata["DTNSANAM"] = meta["DTNSANAM"]
if "AIRMASS" in meta:
self._metadata["AIRMASS"] = meta["AIRMASS"]
d2s = 0.0
if "DIMM2SEE" in meta and meta["DIMM2SEE"] != "NaN":
self._metadata["DIMM2SEE"] = d2s
if "GAINA" in meta:
self._metadata["GAINA"] = meta["GAINA"]
if "GAINB" in meta:
self._metadata["GAINB"] = meta["GAINB"]
# Will be nan for VR filter so it's optional
if self.config.standardize_effective_summary_stats:
self._metadata["effTime"] = summary.effTime
self._metadata["effTimePsfSigmaScale"] = summary.effTimePsfSigmaScale
self._metadata["effTimeSkyBgScale"] = summary.effTimeSkyBgScale
self._metadata["effTimeZeroPointScale"] = summary.effTimeZeroPointScale
if self.config.standardize_uri:
self._metadata["location"] = self.butler.getURI(
self.ref,
collections=[
self.ref.run,
],
).geturl()
@property
def wcs(self):
if self._wcs is None:
self._fetch_meta()
return [
self._wcs,
]
@property
def bbox(self):
if self._bbox is None:
self._fetch_meta()
return [
self._bbox,
]
[docs] def standardizeScienceImage(self):
self.exp = self.butler.get(self.ref) if self.exp is None else self.exp
zp_correct = 10 ** ((self._metadata["zeroPoint"] - self.config.zero_point) / 2.5)
return [
self.exp.image.array / zp_correct,
]
[docs] def standardizeVarianceImage(self):
self.exp = self.butler.get(self.ref) if self.exp is None else self.exp
zp_correct = 10 ** ((self._metadata["zeroPoint"] - self.config.zero_point) / 2.5)
return [
self.exp.variance.array / zp_correct**2,
]
[docs] def standardizeMaskImage(self):
self.exp = self.butler.get(self.ref) if self.exp is None else self.exp
if self._naxis1 is None or self._naxis2 is None:
self._fetch_meta()
# Return empty masks if no masking is done
if not self.config["do_mask"]:
return (np.zeros((self._naxis1, self._naxis2)) for size in sizes)
# Otherwise load the mask extension and process it
mask = self.exp.mask.array.astype(int)
if self.config["do_bitmask"]:
# flip_bits makes ignore_flags into mask_these_flags
bit_flag_map = self.exp.mask.getMaskPlaneDict()
bit_flag_map = {key: int(2**val) for key, val in bit_flag_map.items()}
mask = bitmask.bitfield_to_boolean_mask(
bitfield=mask,
ignore_flags=self.config["mask_flags"],
flag_name_map=bit_flag_map,
flip_bits=True,
)
if self.config["do_threshold"]:
bmask = self.exp.image.array > self.config["brightness_threshold"]
mask = mask | bmask
if self.config["grow_mask"]:
# Only import the scipy module if we actually need it.
from scipy.signal import convolve2d
grow_kernel = np.ones(self.config["grow_kernel_shape"])
mask = convolve2d(mask, grow_kernel, mode="same").astype(bool)
return [
mask,
]
[docs] def standardizePSF(self):
# TODO: Update when we formalize the PSF, Any of these are available
# from the stack:
# self.exp.psf.computeImage
# self.exp.psf.computeKernelImage
# self.exp.psf.getKernel
# self.exp.psf.getLocalKernel
std = self.config["psf_std"]
return [PSF.make_gaussian_kernel(std)]
# These exist because standardizers promise to return lists
# for compatiblity for single-data and multi-data sources
[docs] def standardizeWCS(self):
if self._wcs is None:
self._fetch_meta()
return [
self._wcs,
]
[docs] def standardizeBBox(self):
if self._bbox is None:
self._fetch_meta()
return [
self._bbox,
]
[docs] def toLayeredImage(self):
masks = self.standardizeMaskImage()
# This is required atm because RawImage can not
# support different types, TODO: update when fixed
mask = masks[0].astype(np.float32)
imgs = [
LayeredImage(
self.standardizeScienceImage()[0],
self.standardizeVarianceImage()[0],
mask,
self.standardizePSF()[0],
self._metadata["mjd_mid"],
),
]
if not self.config["greedy_export"]:
self.exp = None
return imgs