import importlib import os import yaml import astropy.table refcat_name_to_dataset_name = { "ps1": "ps1_pv3_3pi_20170110", "ps1_old": "ps1_pv3_3pi_20170110_old", "panstarrs1": "ps1_pv3_3pi_20170110", "panstarrs": "ps1_pv3_3pi_20170110", "Pan-STARRS1": "ps1_pv3_3pi_20170110", "gaia": "gaia_dr2_20200414", "gaia_dr2": "gaia_dr2_20200414", "gaiadr2": "gaia_dr2_20200414", "Gaia-DR2": "gaia_dr2_20200414", "gaia_dr3": "gaia_dr3_20230707", "gaiadr3": "gaia_dr3_20230707", "Gaia-DR3": "gaia_dr3_20230707", } PROJECT_DIR = os.environ.get("PROJECT_DIR", None) if PROJECT_DIR is None: raise ValueError("You need to define a PROJECT_DIR environment variable. Did you run a provided Bash script or the python code directly?") def load_refcat_yaml(dataset_name): if dataset_name == "ps1_pv3_3pi_20170110": return yaml.load(open(os.path.join(PROJECT_DIR, "exports", "ps1_export_gen3.yaml"), "r"), Loader=yaml.CLoader) elif dataset_name == "gaia_dr2_20200414": return yaml.load(open(os.path.join(PROJECT_DIR, "exports", "gaia_export_gen3.yaml"), "r"), Loader=yaml.CLoader) elif dataset_name == "gaia_dr3_20230707": return yaml.load(open(os.path.join(PROJECT_DIR, "exports", "gaia_dr3_export_gen3.yaml"), "r"), Loader=yaml.CLoader) else: raise ValueError(f"dataset name {dataset_name} is not supported") def make_refcat_import(dataset_name, shards): if dataset_name == "ps1_pv3_3pi_20170110": shards_dir = os.path.join(PROJECT_DIR, "ps1", "ps1_pv3_3pi_20170110") paths = list(map(lambda shard : os.path.join(shards_dir, f"{shard}.fits"), shards)) elif dataset_name == "ps1_pv3_3pi_20170110_old": shards_dir = os.path.join(PROJECT_DIR, "refcats", "ps1_pv3_3pi_20170110") paths = list(map(lambda shard : os.path.join(shards_dir, f"ps1_pv3_3pi_20170110_{shard}.fits"), shards)) elif dataset_name == "gaia_dr2_20200414": shards_dir = os.path.join(PROJECT_DIR, "refcats", "gaia_dr2_20200414") paths = list(map(lambda shard : os.path.join(shards_dir, f"gaia_dr2_20200414_{shard}.fits"), shards)) elif dataset_name == "gaia_dr3_20230707": shards_dir = os.path.join(PROJECT_DIR, "GAIA_DR3", "gaia_dr3") paths = list(map(lambda shard : os.path.join(shards_dir, f"{shard}.fits"), shards)) else: raise ValueError(f"dataset name {dataset_name} is not supported") htm7 = astropy.table.Column(shards, name='htm7') filename = astropy.table.Column(paths, name='filename') mask = list(map(lambda x : os.path.exists(x), filename)) return astropy.table.Table([filename[mask], htm7[mask]]) def deferred_import(module, name=None, ns=globals()): """Defer the import of the stack untill we actually need it to be able to print help message before the heat death of the universe. """ if name is None: name = module if ns.get(name, False): return try: ns[name] = importlib.import_module(module) except ImportError as e: raise ImportError("No Rubin Stack found. Please activate Rubin stack.") from e def get_exp_metadata(exposure): """Return exposure's bounding box, wcs and, if existant, filter and calibration data. Parameters ---------- exposure : `afw.image.ExposureF` Exposure """ deferred_import("lsst.pipe.base", "pipeBase") expInfo = exposure.getInfo() #filterName = expInfo.getFilter().getName() or None filterName = expInfo.getFilter().physicalLabel or None if filterName == "_unknown_": filterName = None return pipeBase.Struct( bbox = exposure.getBBox(), wcs = expInfo.getWcs(), filterName = filterName, ) def get_shard_filename(refcatConf, shardId): """Returns the name of the shard file.""" return f"{refcatConf.ref_dataset_name}_{shardId}_refcats_gen2.fits" def get_shard_filepath(refcatConf, refcatLocation, shardId): """Return the filepath to the shard file.""" return os.path.join(refcatLocation, get_shard_filename(refcatConf, shardId)) def resolve_input_meaning(input_val, default_val): """Resolves the meaning of given input value to the given default value, the input value or to ``None``. When the given input value is ``None`` returns the default value. When the input value is truthy returns the given input value. When the input value is falsy returns ``None`` Parameters ---------- input_val : any Input value. default_val : any Output value when input_value is None. Returns ------- interpreted_val : any Given value, default value or None - depending on the input. """ if input_val is None: return default_val elif input_val: return input_val else: return None ############################################################ # Resolvers ############################################################ def calculate_circle(bbox, wcs, pixelMargin): """Computes on-sky center and radius of search region Parameters ---------- bbox : `lsst.geom.Box2DI` or `lsst.geom.Box2D` Bounding box wcs : `lsst.afw.geom.SkyWcs` WCS pixelMargin : `int` or `float` padding in pixels by which the bbox will be expanded Returns ---------- coord : `lsst.geom.SpherePoint` ICRS center of the search region radius : `lsst.geom.Angle` Radius of the search region bbox : `lsst.geom.Box2D` Bounding box used to compute the circumscribed circle """ deferred_import("lsst.geom", "geom") deferred_import("lsst.pipe.base", "pipeBase") bbox = geom.Box2D(bbox) bbox.grow(pixelMargin) coord = wcs.pixelToSky(bbox.getCenter()) radius = max(coord.separation(wcs.pixelToSky(pp)) for pp in bbox.getCorners()) return pipeBase.Struct(coord=coord, radius=radius, bbox=bbox) def resolve_circle2shard_ids(refCatConf, circle): """Resolves IDs of shards overlapping an on-sky circular region. Parameters ---------- refCatConf : `lsst.meas.algorithms.DatasetConfig` Configuration of the reference catalog circle : `lsst.pipe.base.Struct` Struct containing ICRS center of the search region (`lsst.geom.SpherePoint`) and radius of the search region (`lsst.geom.Angle`). Returns ---------- shard_ids : `list` List of integer IDs of reference catalog shards overlapping the region. """ ref_dataset_name = refCatConf.ref_dataset_name deferred_import("lsst.meas.algorithms", "measAlgs") indexer = measAlgs.IndexerRegistry[refCatConf.indexer.name](refCatConf.indexer.active) shard_ids, boundary_mask = indexer.getShardIds(circle.coord, circle.radius) return shard_ids def resolve_bbox2shard_ids(refCatConf, bbox, wcs, pixelMargin=300): """Resolves IDs of shards overlapping an on-sky bounding box. Parameters ---------- refCatConf : `lsst.meas.algorithms.DatasetConfig object` Reference catalog configuration bbox : `lsst.geom.Box2D` bounding box of the region of interest wcs : `lsst.afw.geom.SkyWcs` WCS defining the bbox coordinate system pixelMargin: `int` Bounding box padding, in pixels. Default: 300. Returns ---------- shard_ids : `list` IDs of reference catalog shards overlapping the region. """ circle = calculate_circle(bbox, wcs, pixelMargin) shard_ids = resolve_circle2shard_ids(refCatConf, circle) return shard_ids def resolve_exposure_shard_ids(refCatConf, exposure, pixelMargin=300, **kwargs): """Resolves IDs of shards overlapping an Exposure. Parameters ---------- refCatConf : `lsst.meas.algorithms.DatasetConfig object` Reference catalog configuration exposure : `lsst.afw.ExposureF` Image pixelMargin: `int` Bounding box padding, in pixels. Default: 300. Returns ---------- shard_ids : `list` IDs of reference catalog shards overlapping the region. """ meta = get_exp_metadata(exposure) return resolve_bbox2shard_ids( refCatConf, bbox=meta.bbox, wcs=meta.wcs, pixelMargin=pixelMargin, **kwargs ) def resolve_decamraw_shard_ids(refCatConf, fitsPath, pixelMargin=300, **kwargs): """Resolves IDs of shards overlapping an fits file. Parameters ---------- refCatConf : `lsst.meas.algorithms.DatasetConfig object` Reference catalog configuration exposure : `lsst.afw.ExposureF` Image pixelMargin: `int` Bounding box padding, in pixels. Default: 300. Returns ---------- shard_ids : `list` IDs of reference catalog shards overlapping the region. """ deferred_import("astropy.io.fits", "fitsio") deferred_import("lsst.geom", "geom") deferred_import("astropy.wcs", "awcs") deferred_import("lsst.afw.geom", "afwGeom") hdul = fitsio.open(fitsPath) shardIds = [] bbox = geom.Box2D(geom.Point2D(0, 0), geom.Extent2D(hdul[1].header["NAXIS1"], hdul[1].header["NAXIS2"])) for hdu in hdul[1:]: wcs = awcs.WCS(hdu.header) crpix = geom.Point2D(wcs.wcs.crpix) crval = geom.SpherePoint(longitude=wcs.wcs.crval[0], latitude=wcs.wcs.crval[1], units=geom.degrees) skyWcs = afwGeom.makeSkyWcs(crpix=crpix, crval=crval, cdMatrix=wcs.wcs.cd, projection="TAN") shards = resolve_bbox2shard_ids(refCatConf, bbox=bbox, wcs=skyWcs, pixelMargin=pixelMargin, **kwargs) shardIds.extend(shards) return list(set(shardIds)) def create_bbox_and_wcs_from_decam_fits(path): bboxes = [] wcss = [] deferred_import("astropy.io.fits", "fitsio") deferred_import("lsst.geom", "geom") deferred_import("astropy.wcs", "awcs") deferred_import("lsst.afw.geom", "afwGeom") with fitsio.open(path) as hdul: bbox = geom.Box2D(geom.Point2D(0, 0), geom.Extent2D(hdul[1].header["NAXIS1"], hdul[1].header["NAXIS2"])) for hdu in hdul[1:]: wcs = awcs.WCS(hdu.header) crpix = geom.Point2D(wcs.wcs.crpix) crval = geom.SpherePoint(longitude=wcs.wcs.crval[0], latitude=wcs.wcs.crval[1], units=geom.degrees) skyWcs = afwGeom.makeSkyWcs(crpix=crpix, crval=crval, cdMatrix=wcs.wcs.cd, projection="TAN") wcss.append(skyWcs) bboxes.append(bbox) return bboxes, wcss