From 99d2177f668de9c053694c3bfb8c14f095368c98 Mon Sep 17 00:00:00 2001 From: Lee Kelvin Date: Fri, 3 Apr 2026 07:14:09 -0700 Subject: [PATCH 1/3] Modify bright star stamps to output as lsst.images --- .../lsst/meas/algorithms/brightStarStamps.py | 634 +++++++++--------- 1 file changed, 320 insertions(+), 314 deletions(-) diff --git a/python/lsst/meas/algorithms/brightStarStamps.py b/python/lsst/meas/algorithms/brightStarStamps.py index 9701c523b..efdea80f7 100644 --- a/python/lsst/meas/algorithms/brightStarStamps.py +++ b/python/lsst/meas/algorithms/brightStarStamps.py @@ -23,220 +23,286 @@ from __future__ import annotations -__all__ = ["BrightStarStamp", "BrightStarStamps"] - +__all__ = [ + "BrightStarStampInfo", + "BrightStarStampSerializationModel", + "BrightStarStampsSerializationModel", + "BrightStarStamp", + "BrightStarStamps", +] + +import functools from collections.abc import Sequence -from dataclasses import dataclass +from types import EllipsisType +from typing import Any -import numpy as np +from astro_metadata_translator import ObservationInfo +from pydantic import BaseModel, Field -from lsst.afw.detection import Psf -from lsst.afw.fits import Fits, readMetadata -from lsst.afw.geom import SkyWcs -from lsst.afw.image import ImageFitsReader, MaskedImageF, MaskFitsReader -from lsst.afw.table.io import InputArchive, OutputArchive from lsst.daf.base import PropertyList -from lsst.geom import Angle, Point2D, degrees -from lsst.meas.algorithms.stamps import AbstractStamp -from lsst.utils.introspection import get_full_type_name - - -@dataclass -class BrightStarStamp(AbstractStamp): - """A single postage stamp centered on a bright star. +from lsst.images import ( + Box, + Image, + ImageSerializationModel, + Mask, + MaskedImage, + MaskedImageSerializationModel, + MaskSchema, + MaskSerializationModel, + Projection, + ProjectionSerializationModel, + fits, +) +from lsst.images.serialization import ( + ArchiveTree, + InputArchive, + MetadataValue, + OutputArchive, + Quantity, +) +from lsst.images.utils import is_none + + +class BrightStarStampInfo(BaseModel): + """Information about a bright star in a `BrightStarStamp`. Attributes ---------- - stamp_im : `~lsst.afw.image.MaskedImageF` - The pixel data for this stamp. - psf : `~lsst.afw.detection.Psf`, optional - The point-spread function for this star. - wcs : `~lsst.afw.geom.SkyWcs`, optional - World coordinate system associated with the stamp. visit : `int`, optional - Visit number of the observation. + The visit during which the bright star was observed. detector : `int`, optional - Detector ID within the visit. + The detector on which the bright star was observed. ref_id : `int`, optional - Reference catalog ID of the star. + The reference catalog ID for the bright star. ref_mag : `float`, optional - Reference catalog magnitude of the star. - position : `~lsst.geom.Point2D`, optional - Center position of the star on the detector in pixel coordinates. - focal_plane_radius : `float`, optional - Radial distance from the focal plane center in tangent-plane pixels. - focal_plane_angle : `~lsst.geom.Angle`, optional - Azimuthal angle on the focal plane (counterclockwise from +X). - scale : `float`, optional - Flux scaling factor applied to the PSF model. - scale_err : `float`, optional - Error in the flux scale. - pedestal : `float`, optional - Background pedestal level. - pedestal_err : `float`, optional - Error on the pedestal. - pedestal_scale_cov : `float`, optional - Covariance between pedestal and scale. - gradient_x : `float`, optional - Background gradient in the X direction. - gradient_y : `float`, optional - Background gradient in the Y direction. - curvature_x: `float`, optional - Background curvature in the X direction. - curvature_y: `float`, optional - Background curvature in the Y direction. - curvature_xy: `float`, optional - The xy component of the second order fit. - global_reduced_chi_squared : `float`, optional - Reduced chi-squared for the global model fit. - global_degrees_of_freedom : `int`, optional - Degrees of freedom for the global model fit. - psf_reduced_chi_squared : `float`, optional - Reduced chi-squared for the PSF fit. - psf_degrees_of_freedom : `int`, optional - Degrees of freedom for the PSF fit. - psf_masked_flux_fraction : `float`, optional - Fraction of flux masked in the PSF. - - Notes - ----- - This class is designed to be used with `BrightStarStamps`, which manages - collections of these stamps and handles reading/writing them to FITS files. - The `factory` class method provides a standard interface to construct - instances from image data and metadata, while the `_getMetadata` method - extracts metadata for storage in FITS headers. + The reference magnitude for the bright star. + position_x : `float`, optional + The x-coordinate of the bright star in the focal plane. + position_y : `float`, optional + The y-coordinate of the bright star in the focal plane. + focal_plane_radius : `~lsst.images.utils.Quantity`, optional + The radius of the bright star from the center of the focal plane. + focal_plane_angle : `~lsst.images.utils.Quantity`, optional + The angle of the bright star in the focal plane, + measured from the +x axis. """ - stamp_im: MaskedImageF - psf: Psf | None - wcs: SkyWcs | None - visit: int | None - detector: int | None - ref_id: int | None - ref_mag: float | None - position: Point2D | None - focal_plane_radius: float | None - focal_plane_angle: Angle | None - scale: float | None - scale_err: float | None - pedestal: float | None - pedestal_err: float | None - pedestal_scale_cov: float | None - gradient_x: float | None - gradient_y: float | None - curvature_x: float | None - curvature_y: float | None - curvature_xy: float | None - global_reduced_chi_squared: float | None - global_degrees_of_freedom: int | None - psf_reduced_chi_squared: float | None - psf_degrees_of_freedom: int | None - psf_masked_flux_fraction: float | None - - # Mapping of metadata keys to attribute names - _metadata_attribute_map = { - "VISIT": "visit", - "DETECTOR": "detector", - "REF_ID": "ref_id", - "REF_MAG": "ref_mag", - "POSITION_X": "position.x", - "POSITION_Y": "position.y", - "FOCAL_PLANE_RADIUS": "focal_plane_radius", - "FOCAL_PLANE_ANGLE_DEGREES": "focal_plane_angle", - "SCALE": "scale", - "SCALE_ERR": "scale_err", - "PEDESTAL": "pedestal", - "PEDESTAL_ERR": "pedestal_err", - "PEDESTAL_SCALE_COV": "pedestal_scale_cov", - "GRADIENT_X": "gradient_x", - "GRADIENT_Y": "gradient_y", - "CURVATURE_X": "curvature_x", - "CURVATURE_Y": "curvature_y", - "CURVATURE_XY": "curvature_xy", - "GLOBAL_REDUCED_CHI_SQUARED": "global_reduced_chi_squared", - "GLOBAL_DEGREES_OF_FREEDOM": "global_degrees_of_freedom", - "PSF_REDUCED_CHI_SQUARED": "psf_reduced_chi_squared", - "PSF_DEGREES_OF_FREEDOM": "psf_degrees_of_freedom", - "PSF_MASKED_FLUX_FRACTION": "psf_masked_flux_fraction", - } - - def _getMetadata(self) -> PropertyList: - """Extract metadata from the stamp's attributes. - - This method constructs a `PropertyList` containing metadata - extracted from the stamp's attributes. It is used when writing the - stamp to a FITS file to store relevant metadata in the FITS headers. - - Returns - ------- - metadata : `PropertyList` - A `PropertyList` containing the metadata, or `None` if no - metadata attributes are defined. - """ - metadata = PropertyList() - for metadata_key, attribute_name in self._metadata_attribute_map.items(): - if "." in attribute_name: - top_attr, sub_attr = attribute_name.split(".") - value = getattr(getattr(self, top_attr), sub_attr) - elif metadata_key == "FOCAL_PLANE_ANGLE_DEGREES": - value = getattr(self, attribute_name).asDegrees() - else: - value = getattr(self, attribute_name) - metadata[metadata_key] = value - return metadata - - @property - def metadata(self) -> PropertyList: - """Return the stamp's metadata as a PropertyList.""" - return self._getMetadata() - - @classmethod - def factory( - cls, - stamp_im: MaskedImageF, - psf: Psf | None, - wcs: SkyWcs | None, - metadata: PropertyList, - ) -> BrightStarStamp: - """Construct a `BrightStarStamp` from image data and metadata. + visit: int | None = None + detector: int | None = None + ref_id: int | None = None + ref_mag: float | None = None + position_x: float | None = None + position_y: float | None = None + focal_plane_radius: Quantity | None = None + focal_plane_angle: Quantity | None = None + + def __str__(self) -> str: + attrs = ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()) + return f"BrightStarStampInfo({attrs})" + + __repr__ = __str__ + + +class BrightStarStampSerializationModel[P: BaseModel](MaskedImageSerializationModel[P]): + """A Pydantic model used to represent a serialized `BrightStarStamp`.""" + + image: ImageSerializationModel[P] = Field(description="The main data image.") + mask: MaskSerializationModel[P] = Field(description="Bitmask that annotates the main image's pixels.") + variance: ImageSerializationModel[P] = Field(description="Per-pixel variance estimates for the image.") + projection: ProjectionSerializationModel[P] | None = Field( + default=None, + exclude_if=is_none, + description="Projection to map pixels to the sky.", + ) + psf_kernel_image: ImageSerializationModel[P] | None = Field( + default=None, + exclude_if=is_none, + description="Kernel image of the PSF at the stamp center.", + ) + obs_info: ObservationInfo | None = Field( + default=None, + exclude_if=is_none, + description="Standardized description of visit metadata.", + ) + stamp_info: BrightStarStampInfo = Field(description="Information about the bright star in the stamp.") + + +class BrightStarStampsSerializationModel[P: BaseModel](ArchiveTree): + """A Pydantic model used to represent serialized `BrightStarStamps`.""" + + stamps: list[BrightStarStampSerializationModel[P]] = Field( + default_factory=list, + description="The bright star stamps in this collection.", + ) + + +class BrightStarStamp(MaskedImage): + """A postage stamp centered on a bright star, with associated metadata. - This method provides a standard interface to create a `BrightStarStamp` - from its image data, PSF, WCS, and associated metadata. - It is used by the `BrightStarStamps.readFits` method to construct - individual bright star stamps from FITS files. + Parameters + ---------- + image : `~lsst.images.Image` + The main data image for this bright star stamp. + mask : `~lsst.images.Mask`, optional + Bitmask that annotates the main image's pixels. + variance : `~lsst.images.Image`, optional + Per-pixel variance estimates for the image. + mask_schema : `~lsst.images.MaskSchema`, optional + Schema for the mask, required if a mask is provided. + projection : `~lsst.images.Projection`, optional + Projection to map pixels to the sky. + obs_info : `~astro_metadata_translator.ObservationInfo`, optional + Standardized description of visit metadata. + metadata : `dict` [`str`, `MetadataValue`], optional + Additional metadata to associate with this stamp. + psf : `~lsst.images.Image`, optional + Kernel image of the PSF at the stamp center. + stamp_info : `BrightStarStampInfo`, optional + Information about the bright star in the stamp. - Parameters - ---------- - stamp_im : `~lsst.afw.image.MaskedImageF` - Masked image for the stamp. - psf : `~lsst.afw.detection.Psf`, optional - Point-spread function for the stamp. - wcs : `~lsst.afw.geom.SkyWcs`, optional - World coordinate system for the stamp. - metadata : `PropertyList` - Metadata associated with the stamp, containing keys for all - required attributes. + Attributes + ---------- + psf : `~lsst.images.Image` + Kernel image of the PSF at the stamp center. + stamp_info : `BrightStarStampInfo` + Information about the bright star in this stamp. + """ - Returns - ------- - brightStarStamp : `BrightStarStamp` - The constructed `BrightStarStamp` instance. - """ - kwargs = {} + def __init__( + self, + image: Image, + *, + mask: Mask | None = None, + variance: Image | None = None, + mask_schema: MaskSchema | None = None, + projection: Projection | None = None, + obs_info: ObservationInfo | None = None, + metadata: dict[str, MetadataValue] | None = None, + psf: Image | None = None, + stamp_info: BrightStarStampInfo | None = None, + ): + super().__init__( + image, + mask=mask, + variance=variance, + mask_schema=mask_schema, + projection=projection, + obs_info=obs_info, + metadata=metadata, + ) + + self._psf = psf + self._stamp_info = stamp_info or BrightStarStampInfo() + + def __getitem__(self, bbox: Box | EllipsisType) -> BrightStarStamp: + super().__getitem__(bbox) + if bbox is ...: + return self + return self._transfer_metadata( + BrightStarStamp( + # Projection and obs_info propagate from the image. + self.image[bbox], + mask=self.mask[bbox], + variance=self.variance[bbox], + psf=self.psf, + stamp_info=self.stamp_info, + ), + bbox=bbox, + ) + + def __str__(self) -> str: + return f"BrightStarStamp({self.image!s}, {list(self.mask.schema.names)}, {self.stamp_info})" + + def __repr__(self) -> str: + return ( + f"BrightStarStamp({self.image!r}, mask_schema={self.mask.schema!r}, " + f"stamp_info={self.stamp_info!r})" + ) - for metadata_key, attribute_name in cls._metadata_attribute_map.items(): - if "." in attribute_name: # for nested attributes like position.x - top_attr, sub_attr = attribute_name.split(".") - if top_attr not in kwargs: # avoid overwriting position - if top_attr == "position": # make an initial Point2D - kwargs[top_attr] = Point2D(0, 0) - setattr(kwargs[top_attr], sub_attr, metadata[metadata_key]) - elif attribute_name == "focal_plane_angle": - kwargs[attribute_name] = Angle(metadata[metadata_key], degrees) - else: - kwargs[attribute_name] = metadata[metadata_key] + @property + def psf(self) -> Image: + """Kernel image of the PSF at the stamp center.""" + if self._psf is None: + raise RuntimeError("No PSF kernel image is attached to this BrightStarStamp.") + return self._psf - return cls(stamp_im=stamp_im, psf=psf, wcs=wcs, **kwargs) + @property + def stamp_info(self) -> BrightStarStampInfo: + """Return the BrightStarStampInfo associated with this stamp.""" + return self._stamp_info + + def copy(self) -> BrightStarStamp: + """Deep-copy the bright star stamp, metadata, and stamp info.""" + return self._transfer_metadata( + BrightStarStamp( + image=self._image.copy(), + mask=self._mask.copy(), + variance=self._variance.copy(), + psf=self._psf, + stamp_info=self._stamp_info.model_copy(), + ), + copy=True, + ) + + def serialize(self, archive: OutputArchive[Any]) -> BrightStarStampSerializationModel: + serialized_image = archive.serialize_direct( + "image", functools.partial(self.image.serialize, save_projection=False) + ) + serialized_mask = archive.serialize_direct( + "mask", functools.partial(self.mask.serialize, save_projection=False) + ) + serialized_variance = archive.serialize_direct( + "variance", functools.partial(self.variance.serialize, save_projection=False) + ) + serialized_projection = ( + archive.serialize_direct("projection", self.projection.serialize) + if self.projection is not None + else None + ) + serialized_psf_kernel_image = ( + archive.serialize_direct( + "psf_kernel_image", + functools.partial(self._psf.serialize, save_projection=False), + ) + if self._psf is not None + else None + ) + return BrightStarStampSerializationModel( + image=serialized_image, + mask=serialized_mask, + variance=serialized_variance, + projection=serialized_projection, + psf_kernel_image=serialized_psf_kernel_image, + obs_info=self.obs_info, + metadata=self.metadata, + stamp_info=self.stamp_info, + ) + + @staticmethod + def deserialize( + model: BrightStarStampSerializationModel[Any], + archive: InputArchive[Any], + *, + bbox: Box | None = None, + ) -> BrightStarStamp: + masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) + psf_kernel_image = ( + Image.deserialize(model.psf_kernel_image, archive) + if model.psf_kernel_image is not None + else None + ) + projection = ( + Projection.deserialize(model.projection, archive) if model.projection is not None else None + ) + return BrightStarStamp( + masked_image.image, + mask=masked_image.mask, + variance=masked_image.variance, + psf=psf_kernel_image, + projection=projection, + obs_info=model.obs_info, + stamp_info=model.stamp_info, + )._finish_deserialize(model) class BrightStarStamps(Sequence[BrightStarStamp]): @@ -244,20 +310,30 @@ class BrightStarStamps(Sequence[BrightStarStamp]): Parameters ---------- - brightStarStamps : `Iterable` [`BrightStarStamp`] + stamps : `Iterable` [`BrightStarStamp`] Collection of `BrightStarStamp` instances. - metadata : `~lsst.daf.base.PropertyList`, optional + metadata : `dict` [`str`, `MetadataValue`], optional + Global metadata associated with the collection. + + Attributes + ---------- + metadata : `dict` [`str`, `MetadataValue`] Global metadata associated with the collection. + ref_id_map : `dict` [`int`, `BrightStarStamp`] + A mapping from reference IDs to `BrightStarStamp` objects. + Only includes stamps with valid reference IDs. """ def __init__( self, - brightStarStamps: Sequence[BrightStarStamp], - metadata: PropertyList | None = None, + stamps: Sequence[BrightStarStamp], + metadata: dict[str, MetadataValue] | None = None, ): - self._stamps = list(brightStarStamps) - self._metadata = PropertyList() if metadata is None else metadata.deepCopy() - self.by_ref_id = {stamp.ref_id: stamp for stamp in self} + self._stamps = list(stamps) + self._metadata = {} if metadata is None else dict(metadata) + self._ref_id_map = { + stamp.stamp_info.ref_id: stamp for stamp in self if stamp.stamp_info.ref_id is not None + } def __len__(self): return len(self._stamps) @@ -270,11 +346,46 @@ def __getitem__(self, index): def __iter__(self): return iter(self._stamps) + def __str__(self) -> str: + return f"BrightStarStamps(length={len(self)})" + + __repr__ = __str__ + @property def metadata(self): - """Return the collection's global metadata as a PropertyList.""" + """Return the collection's global metadata as a dict.""" return self._metadata + @property + def ref_id_map(self): + """Map reference IDs to `BrightStarStamp` objects.""" + return self._ref_id_map + + def serialize(self, archive: OutputArchive[Any]) -> BrightStarStampsSerializationModel: + return BrightStarStampsSerializationModel( + stamps=[ + archive.serialize_direct(f"stamp_{index}", stamp.serialize) + for index, stamp in enumerate(self._stamps) + ], + metadata=self._metadata, + ) + + @staticmethod + def deserialize( + model: BrightStarStampsSerializationModel[Any], + archive: InputArchive[Any], + ) -> BrightStarStamps: + return BrightStarStamps( + [BrightStarStamp.deserialize(stamp_model, archive) for stamp_model in model.stamps], + metadata=model.metadata, + ) + + @staticmethod + def _get_archive_tree_type[P: BaseModel]( + pointer_type: type[P], + ) -> type[BrightStarStampsSerializationModel[P]]: + return BrightStarStampsSerializationModel[pointer_type] + @classmethod def readFits(cls, filename: str) -> BrightStarStamps: """Make a `BrightStarStamps` object from a FITS file. @@ -286,10 +397,10 @@ def readFits(cls, filename: str) -> BrightStarStamps: Returns ------- - brightStarStamps : `BrightStarStamps` + bright_star_stamps : `BrightStarStamps` The constructed `BrightStarStamps` instance. """ - return cls.readFitsWithOptions(filename, None) + return fits.read(cls, filename).deserialized @classmethod def readFitsWithOptions(cls, filename: str, options: PropertyList | None) -> BrightStarStamps: @@ -304,72 +415,10 @@ def readFitsWithOptions(cls, filename: str, options: PropertyList | None) -> Bri Returns ------- - brightStarStamps : `BrightStarStamps` + bright_star_stamps : `BrightStarStamps` The constructed `BrightStarStamps` instance. """ - with Fits(filename, "r") as fits_file: - stamp_planes = {} - stamp_psf_ids = {} - stamp_wcs_ids = {} - stamp_metadata = {} - archive = None - - for hdu_num in range(1, fits_file.countHdus()): # Skip primary HDU - metadata = readMetadata(filename, hdu=hdu_num) - extname = metadata["EXTNAME"] - stamp_id: int | None = metadata.get("EXTVER", None) - - # Skip non-image BINTABLEs (except ARCHIVE_INDEX) - if metadata["XTENSION"] == "BINTABLE" and not metadata.get("ZIMAGE", False): - if extname != "ARCHIVE_INDEX": - continue - - # Handle the archive index separately - if extname == "ARCHIVE_INDEX": - fits_file.setHdu(hdu_num) - archive = InputArchive.readFits(fits_file) - continue - elif metadata.get("EXTTYPE") == "ARCHIVE_DATA": - continue - - # Select reader and dtype - if extname == "IMAGE": - reader = ImageFitsReader(filename, hdu=hdu_num) - dtype = np.dtype(MaskedImageF.dtype) - stamp_psf_ids[stamp_id] = metadata.pop("PSF", None) - stamp_wcs_ids[stamp_id] = metadata.pop("WCS", None) - stamp_metadata[stamp_id] = metadata - elif extname == "MASK": - reader = MaskFitsReader(filename, hdu=hdu_num) - dtype = None - elif extname == "VARIANCE": - reader = ImageFitsReader(filename, hdu=hdu_num) - dtype = np.dtype("float32") - else: - raise ValueError(f"Unknown extension type: {extname}") - - if stamp_id is not None: - stamp_planes.setdefault(stamp_id, {})[extname.lower()] = reader.read(dtype=dtype) - - primary_metadata = readMetadata(filename, hdu=0) - num_stamps = primary_metadata["N_STAMPS"] - - if len(stamp_planes) != num_stamps: - raise ValueError( - f"Number of stamps read ({len(stamp_planes)}) does not agree with the " - f"number of stamps recorded in the primary HDU metadata ({num_stamps})." - ) - if archive is None: - raise ValueError("No archive index was found in the FITS file; cannot read PSF or WCS.") - - brightStarStamps = [] - for stamp_id in range(1, num_stamps + 1): # Need to increment by one as EXTVER starts at 1 - stamp = MaskedImageF(**stamp_planes[stamp_id]) - psf = archive.get(stamp_psf_ids[stamp_id]) - wcs = archive.get(stamp_wcs_ids[stamp_id]) - brightStarStamps.append(BrightStarStamp.factory(stamp, psf, wcs, stamp_metadata[stamp_id])) - - return cls(brightStarStamps, primary_metadata) + return cls.readFits(filename) def writeFits(self, filename: str): """Write this `BrightStarStamps` object to a FITS file. @@ -379,47 +428,4 @@ def writeFits(self, filename: str): filename : `str` Name of the FITS file to write. """ - metadata = self._metadata.deepCopy() - - # Store metadata in the primary HDU - metadata["N_STAMPS"] = len(self._stamps) - metadata["VERSION"] = 2 # Record version number in case of future code changes - metadata["STAMPCLS"] = get_full_type_name(self) - - # Create and write to the FITS file within a context manager - with Fits(filename, "w") as fits_file: - fits_file.createEmpty() - - # Store Persistables in an OutputArchive - output_archive = OutputArchive() - stamp_psf_ids = [] - stamp_wcs_ids = [] - for stamp in self._stamps: - stamp_psf_ids.append(output_archive.put(stamp.psf)) - stamp_wcs_ids.append(output_archive.put(stamp.wcs)) - - # Write to the FITS file - fits_file.writeMetadata(metadata) - del metadata - output_archive.writeFits(fits_file) - - # Add all pixel data to extension HDUs; note: EXTVER should be 1-based - for stamp_id, (stamp, stamp_psf_id, stamp_wcs_id) in enumerate( - zip(self._stamps, stamp_psf_ids, stamp_wcs_ids), - start=1, - ): - metadata = PropertyList() - metadata.update({"EXTVER": stamp_id, "EXTNAME": "IMAGE"}) - if stamp_metadata := stamp._getMetadata(): - metadata.update(stamp_metadata) - metadata["PSF"] = stamp_psf_id - metadata["WCS"] = stamp_wcs_id - stamp.stamp_im.getImage().writeFits(filename, metadata=metadata, mode="a") - - metadata = PropertyList() - metadata.update({"EXTVER": stamp_id, "EXTNAME": "MASK"}) - stamp.stamp_im.getMask().writeFits(filename, metadata=metadata, mode="a") - - metadata = PropertyList() - metadata.update({"EXTVER": stamp_id, "EXTNAME": "VARIANCE"}) - stamp.stamp_im.getVariance().writeFits(filename, metadata=metadata, mode="a") + fits.write(self, filename) From 51889c0f07fcc74a45c408e7aae8f97cc57ad390 Mon Sep 17 00:00:00 2001 From: Lee Kelvin Date: Thu, 9 Apr 2026 10:17:47 -0700 Subject: [PATCH 2/3] Update bright star stamp unit tests --- tests/test_brightStarStamps.py | 151 +++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 62 deletions(-) diff --git a/tests/test_brightStarStamps.py b/tests/test_brightStarStamps.py index de3dee798..850075904 100644 --- a/tests/test_brightStarStamps.py +++ b/tests/test_brightStarStamps.py @@ -21,13 +21,13 @@ import unittest from tempfile import NamedTemporaryFile +from typing import cast +import astropy.units as u import lsst.utils.tests import numpy as np -from lsst.afw.image import MaskedImageF -from lsst.daf.base import PropertyList -from lsst.meas.algorithms import BrightStarStamp, BrightStarStamps -from lsst.meas.algorithms.stamps import StampsBase +from lsst.images import Image, Mask, MaskedImage, MaskSchema +from lsst.meas.algorithms import BrightStarStamp, BrightStarStampInfo, BrightStarStamps class BrightStarStampsTestCase(lsst.utils.tests.TestCase): @@ -38,73 +38,100 @@ def setUp(self): stamp_size = (25, 25) # Generate simulated bright star stamps - brightStarStamps = [] - self.metadatas = [] + bright_star_stamps = [] + self.stamp_infos = [] + mask_schema = MaskSchema([]) + for i in range(3): - stamp = MaskedImageF(*stamp_size) - stamp_array = stamp.image.array - stamp_array += rng.random(stamp_size) - psf = None - wcs = None - metadata = PropertyList() - metadata.set("VISIT", i) - metadata.set("DETECTOR", i + 1) - metadata.set("REF_ID", f"ref{i}") - metadata.set("REF_MAG", float(i * 5)) - metadata.set("POSITION_X", rng.random()) - metadata.set("POSITION_Y", rng.random()) - metadata.set("FOCAL_PLANE_RADIUS", rng.random()) - metadata.set("FOCAL_PLANE_ANGLE_DEGREES", rng.random()) - metadata.set("SCALE", rng.random()) - metadata.set("SCALE_ERR", rng.random()) - metadata.set("PEDESTAL", rng.random()) - metadata.set("PEDESTAL_ERR", rng.random()) - metadata.set("PEDESTAL_SCALE_COV", rng.random()) - metadata.set("GRADIENT_X", rng.random()) - metadata.set("GRADIENT_Y", rng.random()) - metadata.set("CURVATURE_X", rng.random()) - metadata.set("CURVATURE_Y", rng.random()) - metadata.set("CURVATURE_XY", rng.random()) - metadata.set("GLOBAL_REDUCED_CHI_SQUARED", rng.random()) - metadata.set("GLOBAL_DEGREES_OF_FREEDOM", rng.integers(1, 100)) - metadata.set("PSF_REDUCED_CHI_SQUARED", rng.random()) - metadata.set("PSF_DEGREES_OF_FREEDOM", rng.integers(1, 100)) - metadata.set("PSF_MASKED_FLUX_FRACTION", rng.random()) - self.metadatas.append(metadata) - brightStarStamp = BrightStarStamp.factory(stamp, psf, wcs, metadata) - brightStarStamps.append(brightStarStamp) - self.primary_metadata = PropertyList() - self.primary_metadata.set("TEST_KEY", "TEST VALUE") - self.brightStarStamps = BrightStarStamps(brightStarStamps, self.primary_metadata) + stamp = MaskedImage( + image=Image(rng.random(stamp_size).astype(np.float32)), + mask=Mask(0, schema=mask_schema, shape=stamp_size), + variance=Image(rng.random(stamp_size).astype(np.float32)), + ) + + stamp_info = BrightStarStampInfo( + visit=100 + i, + detector=200 + i, + ref_id=1000 + i, + ref_mag=10.0 + i, + position_x=float(rng.random()), + position_y=float(rng.random()), + focal_plane_radius=float(rng.random()) * u.mm, + focal_plane_angle=float(rng.random()) * u.rad, + ) + self.stamp_infos.append(stamp_info) + + # Build a normalized 2-D Gaussian kernel image directly. + yy, xx = np.mgrid[: stamp_size[0], : stamp_size[1]] + cy = (stamp_size[0] - 1) / 2.0 + cx = (stamp_size[1] - 1) / 2.0 + sigma = 1.5 + kernel_array = np.exp(-((yy - cy) ** 2 + (xx - cx) ** 2) / (2.0 * sigma**2)) + kernel_array /= np.sum(kernel_array) + psf_kernel_image = Image(kernel_array.astype(np.float64)) + + bright_star_stamps.append( + BrightStarStamp( + image=stamp.image, + mask=stamp.mask, + variance=stamp.variance, + psf=psf_kernel_image, + stamp_info=stamp_info, + ) + ) + + self.global_metadata = {"TEST_KEY": "TEST VALUE"} + self.bright_star_stamps = BrightStarStamps(bright_star_stamps, self.global_metadata) def tearDown(self): - del self.brightStarStamps - del self.metadatas - del self.primary_metadata + del self.bright_star_stamps + del self.stamp_infos + del self.global_metadata def testBrightStarStamps(self): """Test that BrightStarStamps can be serialized and deserialized.""" with NamedTemporaryFile() as file: - self.brightStarStamps.writeFits(file.name) - # Test StampsBase correctly bounces to BrightStarStamps readFits - brightStarStamps = StampsBase.readFits(file.name) - - primary_metadata = brightStarStamps.metadata - self.assertEqual(self.primary_metadata["TEST_KEY"], primary_metadata["TEST_KEY"]) - self.assertEqual(len(self.metadatas), primary_metadata["N_STAMPS"]) - for input_metadata, input_stamp, output_stamp in zip( - self.metadatas, self.brightStarStamps, brightStarStamps + self.bright_star_stamps.writeFits(file.name) + bright_star_stamps = BrightStarStamps.readFits(file.name) + + global_metadata = bright_star_stamps.metadata + self.assertEqual(self.global_metadata["TEST_KEY"], global_metadata["TEST_KEY"]) + self.assertEqual(len(self.bright_star_stamps), len(bright_star_stamps)) + for input_info, input_stamp, output_stamp in zip( + self.stamp_infos, self.bright_star_stamps, bright_star_stamps ): - output_metadata = output_stamp.metadata - for key in output_metadata.names(): - input_value = input_metadata[key] - output_value = output_metadata[key] - if isinstance(input_value, float): - self.assertAlmostEqual(input_value, output_value, places=10) - else: - self.assertEqual(input_value, output_value) - self.assertMaskedImagesAlmostEqual(input_stamp.stamp_im, output_stamp.stamp_im) + self.assertEqual(input_stamp.metadata, {}) + self.assertEqual(output_stamp.metadata, {}) + + output_info = output_stamp.stamp_info + self.assertEqual(input_info.visit, output_info.visit) + self.assertEqual(input_info.detector, output_info.detector) + self.assertEqual(input_info.ref_id, output_info.ref_id) + self.assertAlmostEqual(input_info.ref_mag, output_info.ref_mag, places=10) + self.assertAlmostEqual(input_info.position_x, output_info.position_x, places=10) + self.assertAlmostEqual(input_info.position_y, output_info.position_y, places=10) + + self.assertIsNotNone(input_info.focal_plane_radius) + self.assertIsNotNone(output_info.focal_plane_radius) + input_radius = cast(u.Quantity, input_info.focal_plane_radius) + output_radius = cast(u.Quantity, output_info.focal_plane_radius) + self.assertAlmostEqual(input_radius.to_value(u.mm), output_radius.to_value(u.mm), places=10) + + self.assertIsNotNone(input_info.focal_plane_angle) + self.assertIsNotNone(output_info.focal_plane_angle) + input_angle = cast(u.Quantity, input_info.focal_plane_angle) + output_angle = cast(u.Quantity, output_info.focal_plane_angle) + self.assertAlmostEqual(input_angle.to_value(u.rad), output_angle.to_value(u.rad), places=10) + + np.testing.assert_allclose( + input_stamp.image.array, output_stamp.image.array, rtol=0.0, atol=1e-10 + ) + np.testing.assert_array_equal(input_stamp.mask.array, output_stamp.mask.array) + np.testing.assert_allclose( + input_stamp.variance.array, output_stamp.variance.array, rtol=0.0, atol=1e-10 + ) + np.testing.assert_allclose(input_stamp.psf.array, output_stamp.psf.array, rtol=0.0, atol=1e-10) class MemoryTester(lsst.utils.tests.MemoryTestCase): From 28995e637f8b91c233c00ca3adde0bc32ba0d7a1 Mon Sep 17 00:00:00 2001 From: Lee Kelvin Date: Tue, 14 Apr 2026 11:54:23 -0700 Subject: [PATCH 3/3] Move brightStarStamps.py into pipe_tasks This script was previously located in meas_algorithms. The decision to move it into pipe_tasks was two-fold: 1. Use of lsst.images format inside bright star stamps introduced a cyclical dependency issue. lsst.images depends on meas_extensions_psfex and meas_extensions_piff (optionally) because it has wrappers for those and needs to be able to import the legacy modules to test them. Moving the bright star stamp storage classes into pipe_tasks resolves this issue in the short term. 2. Having the storage classes located alongside the consuming pipeline tasks helps to streamline development, keeping edits contained within a single package. N.B.: this commit essentially undoes the prior two commits. However, they are being persisted here to aid assessment of what edits were made to the original scripts. --- python/lsst/meas/algorithms/__init__.py | 1 - .../lsst/meas/algorithms/brightStarStamps.py | 431 ------------------ tests/test_brightStarStamps.py | 147 ------ 3 files changed, 579 deletions(-) delete mode 100644 python/lsst/meas/algorithms/brightStarStamps.py delete mode 100644 tests/test_brightStarStamps.py diff --git a/python/lsst/meas/algorithms/__init__.py b/python/lsst/meas/algorithms/__init__.py index eff9e02ed..59e9742bb 100644 --- a/python/lsst/meas/algorithms/__init__.py +++ b/python/lsst/meas/algorithms/__init__.py @@ -56,7 +56,6 @@ from .dynamicDetection import * from .makePsfCandidates import * from .stamps import * -from .brightStarStamps import * from .accumulator_mean_stack import * from .scaleVariance import * from .noise_covariance import * diff --git a/python/lsst/meas/algorithms/brightStarStamps.py b/python/lsst/meas/algorithms/brightStarStamps.py deleted file mode 100644 index efdea80f7..000000000 --- a/python/lsst/meas/algorithms/brightStarStamps.py +++ /dev/null @@ -1,431 +0,0 @@ -# This file is part of meas_algorithms. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (https://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Collection of small images (postage stamps) centered on bright stars.""" - -from __future__ import annotations - -__all__ = [ - "BrightStarStampInfo", - "BrightStarStampSerializationModel", - "BrightStarStampsSerializationModel", - "BrightStarStamp", - "BrightStarStamps", -] - -import functools -from collections.abc import Sequence -from types import EllipsisType -from typing import Any - -from astro_metadata_translator import ObservationInfo -from pydantic import BaseModel, Field - -from lsst.daf.base import PropertyList -from lsst.images import ( - Box, - Image, - ImageSerializationModel, - Mask, - MaskedImage, - MaskedImageSerializationModel, - MaskSchema, - MaskSerializationModel, - Projection, - ProjectionSerializationModel, - fits, -) -from lsst.images.serialization import ( - ArchiveTree, - InputArchive, - MetadataValue, - OutputArchive, - Quantity, -) -from lsst.images.utils import is_none - - -class BrightStarStampInfo(BaseModel): - """Information about a bright star in a `BrightStarStamp`. - - Attributes - ---------- - visit : `int`, optional - The visit during which the bright star was observed. - detector : `int`, optional - The detector on which the bright star was observed. - ref_id : `int`, optional - The reference catalog ID for the bright star. - ref_mag : `float`, optional - The reference magnitude for the bright star. - position_x : `float`, optional - The x-coordinate of the bright star in the focal plane. - position_y : `float`, optional - The y-coordinate of the bright star in the focal plane. - focal_plane_radius : `~lsst.images.utils.Quantity`, optional - The radius of the bright star from the center of the focal plane. - focal_plane_angle : `~lsst.images.utils.Quantity`, optional - The angle of the bright star in the focal plane, - measured from the +x axis. - """ - - visit: int | None = None - detector: int | None = None - ref_id: int | None = None - ref_mag: float | None = None - position_x: float | None = None - position_y: float | None = None - focal_plane_radius: Quantity | None = None - focal_plane_angle: Quantity | None = None - - def __str__(self) -> str: - attrs = ", ".join(f"{k}={v!r}" for k, v in self.__dict__.items()) - return f"BrightStarStampInfo({attrs})" - - __repr__ = __str__ - - -class BrightStarStampSerializationModel[P: BaseModel](MaskedImageSerializationModel[P]): - """A Pydantic model used to represent a serialized `BrightStarStamp`.""" - - image: ImageSerializationModel[P] = Field(description="The main data image.") - mask: MaskSerializationModel[P] = Field(description="Bitmask that annotates the main image's pixels.") - variance: ImageSerializationModel[P] = Field(description="Per-pixel variance estimates for the image.") - projection: ProjectionSerializationModel[P] | None = Field( - default=None, - exclude_if=is_none, - description="Projection to map pixels to the sky.", - ) - psf_kernel_image: ImageSerializationModel[P] | None = Field( - default=None, - exclude_if=is_none, - description="Kernel image of the PSF at the stamp center.", - ) - obs_info: ObservationInfo | None = Field( - default=None, - exclude_if=is_none, - description="Standardized description of visit metadata.", - ) - stamp_info: BrightStarStampInfo = Field(description="Information about the bright star in the stamp.") - - -class BrightStarStampsSerializationModel[P: BaseModel](ArchiveTree): - """A Pydantic model used to represent serialized `BrightStarStamps`.""" - - stamps: list[BrightStarStampSerializationModel[P]] = Field( - default_factory=list, - description="The bright star stamps in this collection.", - ) - - -class BrightStarStamp(MaskedImage): - """A postage stamp centered on a bright star, with associated metadata. - - Parameters - ---------- - image : `~lsst.images.Image` - The main data image for this bright star stamp. - mask : `~lsst.images.Mask`, optional - Bitmask that annotates the main image's pixels. - variance : `~lsst.images.Image`, optional - Per-pixel variance estimates for the image. - mask_schema : `~lsst.images.MaskSchema`, optional - Schema for the mask, required if a mask is provided. - projection : `~lsst.images.Projection`, optional - Projection to map pixels to the sky. - obs_info : `~astro_metadata_translator.ObservationInfo`, optional - Standardized description of visit metadata. - metadata : `dict` [`str`, `MetadataValue`], optional - Additional metadata to associate with this stamp. - psf : `~lsst.images.Image`, optional - Kernel image of the PSF at the stamp center. - stamp_info : `BrightStarStampInfo`, optional - Information about the bright star in the stamp. - - Attributes - ---------- - psf : `~lsst.images.Image` - Kernel image of the PSF at the stamp center. - stamp_info : `BrightStarStampInfo` - Information about the bright star in this stamp. - """ - - def __init__( - self, - image: Image, - *, - mask: Mask | None = None, - variance: Image | None = None, - mask_schema: MaskSchema | None = None, - projection: Projection | None = None, - obs_info: ObservationInfo | None = None, - metadata: dict[str, MetadataValue] | None = None, - psf: Image | None = None, - stamp_info: BrightStarStampInfo | None = None, - ): - super().__init__( - image, - mask=mask, - variance=variance, - mask_schema=mask_schema, - projection=projection, - obs_info=obs_info, - metadata=metadata, - ) - - self._psf = psf - self._stamp_info = stamp_info or BrightStarStampInfo() - - def __getitem__(self, bbox: Box | EllipsisType) -> BrightStarStamp: - super().__getitem__(bbox) - if bbox is ...: - return self - return self._transfer_metadata( - BrightStarStamp( - # Projection and obs_info propagate from the image. - self.image[bbox], - mask=self.mask[bbox], - variance=self.variance[bbox], - psf=self.psf, - stamp_info=self.stamp_info, - ), - bbox=bbox, - ) - - def __str__(self) -> str: - return f"BrightStarStamp({self.image!s}, {list(self.mask.schema.names)}, {self.stamp_info})" - - def __repr__(self) -> str: - return ( - f"BrightStarStamp({self.image!r}, mask_schema={self.mask.schema!r}, " - f"stamp_info={self.stamp_info!r})" - ) - - @property - def psf(self) -> Image: - """Kernel image of the PSF at the stamp center.""" - if self._psf is None: - raise RuntimeError("No PSF kernel image is attached to this BrightStarStamp.") - return self._psf - - @property - def stamp_info(self) -> BrightStarStampInfo: - """Return the BrightStarStampInfo associated with this stamp.""" - return self._stamp_info - - def copy(self) -> BrightStarStamp: - """Deep-copy the bright star stamp, metadata, and stamp info.""" - return self._transfer_metadata( - BrightStarStamp( - image=self._image.copy(), - mask=self._mask.copy(), - variance=self._variance.copy(), - psf=self._psf, - stamp_info=self._stamp_info.model_copy(), - ), - copy=True, - ) - - def serialize(self, archive: OutputArchive[Any]) -> BrightStarStampSerializationModel: - serialized_image = archive.serialize_direct( - "image", functools.partial(self.image.serialize, save_projection=False) - ) - serialized_mask = archive.serialize_direct( - "mask", functools.partial(self.mask.serialize, save_projection=False) - ) - serialized_variance = archive.serialize_direct( - "variance", functools.partial(self.variance.serialize, save_projection=False) - ) - serialized_projection = ( - archive.serialize_direct("projection", self.projection.serialize) - if self.projection is not None - else None - ) - serialized_psf_kernel_image = ( - archive.serialize_direct( - "psf_kernel_image", - functools.partial(self._psf.serialize, save_projection=False), - ) - if self._psf is not None - else None - ) - return BrightStarStampSerializationModel( - image=serialized_image, - mask=serialized_mask, - variance=serialized_variance, - projection=serialized_projection, - psf_kernel_image=serialized_psf_kernel_image, - obs_info=self.obs_info, - metadata=self.metadata, - stamp_info=self.stamp_info, - ) - - @staticmethod - def deserialize( - model: BrightStarStampSerializationModel[Any], - archive: InputArchive[Any], - *, - bbox: Box | None = None, - ) -> BrightStarStamp: - masked_image = MaskedImage.deserialize(model, archive, bbox=bbox) - psf_kernel_image = ( - Image.deserialize(model.psf_kernel_image, archive) - if model.psf_kernel_image is not None - else None - ) - projection = ( - Projection.deserialize(model.projection, archive) if model.projection is not None else None - ) - return BrightStarStamp( - masked_image.image, - mask=masked_image.mask, - variance=masked_image.variance, - psf=psf_kernel_image, - projection=projection, - obs_info=model.obs_info, - stamp_info=model.stamp_info, - )._finish_deserialize(model) - - -class BrightStarStamps(Sequence[BrightStarStamp]): - """A collection of bright star stamps. - - Parameters - ---------- - stamps : `Iterable` [`BrightStarStamp`] - Collection of `BrightStarStamp` instances. - metadata : `dict` [`str`, `MetadataValue`], optional - Global metadata associated with the collection. - - Attributes - ---------- - metadata : `dict` [`str`, `MetadataValue`] - Global metadata associated with the collection. - ref_id_map : `dict` [`int`, `BrightStarStamp`] - A mapping from reference IDs to `BrightStarStamp` objects. - Only includes stamps with valid reference IDs. - """ - - def __init__( - self, - stamps: Sequence[BrightStarStamp], - metadata: dict[str, MetadataValue] | None = None, - ): - self._stamps = list(stamps) - self._metadata = {} if metadata is None else dict(metadata) - self._ref_id_map = { - stamp.stamp_info.ref_id: stamp for stamp in self if stamp.stamp_info.ref_id is not None - } - - def __len__(self): - return len(self._stamps) - - def __getitem__(self, index): - if isinstance(index, slice): - return BrightStarStamps(self._stamps[index], metadata=self._metadata) - return self._stamps[index] - - def __iter__(self): - return iter(self._stamps) - - def __str__(self) -> str: - return f"BrightStarStamps(length={len(self)})" - - __repr__ = __str__ - - @property - def metadata(self): - """Return the collection's global metadata as a dict.""" - return self._metadata - - @property - def ref_id_map(self): - """Map reference IDs to `BrightStarStamp` objects.""" - return self._ref_id_map - - def serialize(self, archive: OutputArchive[Any]) -> BrightStarStampsSerializationModel: - return BrightStarStampsSerializationModel( - stamps=[ - archive.serialize_direct(f"stamp_{index}", stamp.serialize) - for index, stamp in enumerate(self._stamps) - ], - metadata=self._metadata, - ) - - @staticmethod - def deserialize( - model: BrightStarStampsSerializationModel[Any], - archive: InputArchive[Any], - ) -> BrightStarStamps: - return BrightStarStamps( - [BrightStarStamp.deserialize(stamp_model, archive) for stamp_model in model.stamps], - metadata=model.metadata, - ) - - @staticmethod - def _get_archive_tree_type[P: BaseModel]( - pointer_type: type[P], - ) -> type[BrightStarStampsSerializationModel[P]]: - return BrightStarStampsSerializationModel[pointer_type] - - @classmethod - def readFits(cls, filename: str) -> BrightStarStamps: - """Make a `BrightStarStamps` object from a FITS file. - - Parameters - ---------- - filename : `str` - Name of the FITS file to read. - - Returns - ------- - bright_star_stamps : `BrightStarStamps` - The constructed `BrightStarStamps` instance. - """ - return fits.read(cls, filename).deserialized - - @classmethod - def readFitsWithOptions(cls, filename: str, options: PropertyList | None) -> BrightStarStamps: - """Make a `BrightStarStamps` object from a FITS file, with options. - - Parameters - ---------- - filename : `str` - Name of the FITS file to read. - options : `~lsst.daf.base.PropertyList`, optional - Options for reading the FITS file. Not currently used. - - Returns - ------- - bright_star_stamps : `BrightStarStamps` - The constructed `BrightStarStamps` instance. - """ - return cls.readFits(filename) - - def writeFits(self, filename: str): - """Write this `BrightStarStamps` object to a FITS file. - - Parameters - ---------- - filename : `str` - Name of the FITS file to write. - """ - fits.write(self, filename) diff --git a/tests/test_brightStarStamps.py b/tests/test_brightStarStamps.py deleted file mode 100644 index 850075904..000000000 --- a/tests/test_brightStarStamps.py +++ /dev/null @@ -1,147 +0,0 @@ -# This file is part of meas_algorithms. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (https://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import unittest -from tempfile import NamedTemporaryFile -from typing import cast - -import astropy.units as u -import lsst.utils.tests -import numpy as np -from lsst.images import Image, Mask, MaskedImage, MaskSchema -from lsst.meas.algorithms import BrightStarStamp, BrightStarStampInfo, BrightStarStamps - - -class BrightStarStampsTestCase(lsst.utils.tests.TestCase): - """Test BrightStarStamps.""" - - def setUp(self): - rng = np.random.Generator(np.random.MT19937(seed=5)) - stamp_size = (25, 25) - - # Generate simulated bright star stamps - bright_star_stamps = [] - self.stamp_infos = [] - mask_schema = MaskSchema([]) - - for i in range(3): - stamp = MaskedImage( - image=Image(rng.random(stamp_size).astype(np.float32)), - mask=Mask(0, schema=mask_schema, shape=stamp_size), - variance=Image(rng.random(stamp_size).astype(np.float32)), - ) - - stamp_info = BrightStarStampInfo( - visit=100 + i, - detector=200 + i, - ref_id=1000 + i, - ref_mag=10.0 + i, - position_x=float(rng.random()), - position_y=float(rng.random()), - focal_plane_radius=float(rng.random()) * u.mm, - focal_plane_angle=float(rng.random()) * u.rad, - ) - self.stamp_infos.append(stamp_info) - - # Build a normalized 2-D Gaussian kernel image directly. - yy, xx = np.mgrid[: stamp_size[0], : stamp_size[1]] - cy = (stamp_size[0] - 1) / 2.0 - cx = (stamp_size[1] - 1) / 2.0 - sigma = 1.5 - kernel_array = np.exp(-((yy - cy) ** 2 + (xx - cx) ** 2) / (2.0 * sigma**2)) - kernel_array /= np.sum(kernel_array) - psf_kernel_image = Image(kernel_array.astype(np.float64)) - - bright_star_stamps.append( - BrightStarStamp( - image=stamp.image, - mask=stamp.mask, - variance=stamp.variance, - psf=psf_kernel_image, - stamp_info=stamp_info, - ) - ) - - self.global_metadata = {"TEST_KEY": "TEST VALUE"} - self.bright_star_stamps = BrightStarStamps(bright_star_stamps, self.global_metadata) - - def tearDown(self): - del self.bright_star_stamps - del self.stamp_infos - del self.global_metadata - - def testBrightStarStamps(self): - """Test that BrightStarStamps can be serialized and deserialized.""" - - with NamedTemporaryFile() as file: - self.bright_star_stamps.writeFits(file.name) - bright_star_stamps = BrightStarStamps.readFits(file.name) - - global_metadata = bright_star_stamps.metadata - self.assertEqual(self.global_metadata["TEST_KEY"], global_metadata["TEST_KEY"]) - self.assertEqual(len(self.bright_star_stamps), len(bright_star_stamps)) - for input_info, input_stamp, output_stamp in zip( - self.stamp_infos, self.bright_star_stamps, bright_star_stamps - ): - self.assertEqual(input_stamp.metadata, {}) - self.assertEqual(output_stamp.metadata, {}) - - output_info = output_stamp.stamp_info - self.assertEqual(input_info.visit, output_info.visit) - self.assertEqual(input_info.detector, output_info.detector) - self.assertEqual(input_info.ref_id, output_info.ref_id) - self.assertAlmostEqual(input_info.ref_mag, output_info.ref_mag, places=10) - self.assertAlmostEqual(input_info.position_x, output_info.position_x, places=10) - self.assertAlmostEqual(input_info.position_y, output_info.position_y, places=10) - - self.assertIsNotNone(input_info.focal_plane_radius) - self.assertIsNotNone(output_info.focal_plane_radius) - input_radius = cast(u.Quantity, input_info.focal_plane_radius) - output_radius = cast(u.Quantity, output_info.focal_plane_radius) - self.assertAlmostEqual(input_radius.to_value(u.mm), output_radius.to_value(u.mm), places=10) - - self.assertIsNotNone(input_info.focal_plane_angle) - self.assertIsNotNone(output_info.focal_plane_angle) - input_angle = cast(u.Quantity, input_info.focal_plane_angle) - output_angle = cast(u.Quantity, output_info.focal_plane_angle) - self.assertAlmostEqual(input_angle.to_value(u.rad), output_angle.to_value(u.rad), places=10) - - np.testing.assert_allclose( - input_stamp.image.array, output_stamp.image.array, rtol=0.0, atol=1e-10 - ) - np.testing.assert_array_equal(input_stamp.mask.array, output_stamp.mask.array) - np.testing.assert_allclose( - input_stamp.variance.array, output_stamp.variance.array, rtol=0.0, atol=1e-10 - ) - np.testing.assert_allclose(input_stamp.psf.array, output_stamp.psf.array, rtol=0.0, atol=1e-10) - - -class MemoryTester(lsst.utils.tests.MemoryTestCase): - pass - - -def setup_module(module): - lsst.utils.tests.init() - - -if __name__ == "__main__": - lsst.utils.tests.init() - unittest.main()