diff --git a/docs/changes/2996.feature.rst b/docs/changes/2996.feature.rst new file mode 100644 index 00000000000..9472dcf9bba --- /dev/null +++ b/docs/changes/2996.feature.rst @@ -0,0 +1 @@ +Added a new class HistogramAggregator to compute histograms along a specified axis, and updated the documentation to reflect this new functionality. The documentation includes examples of how to use the HistogramsAggregator in practice. diff --git a/examples/tutorials/histogram_aggregation.py b/examples/tutorials/histogram_aggregation.py new file mode 100644 index 00000000000..ed24c02a000 --- /dev/null +++ b/examples/tutorials/histogram_aggregation.py @@ -0,0 +1,248 @@ +""" +Histogram aggregation with HistogramAggregator +============================================== + +This tutorial shows how to: + +1. Build an event table with camera-like data (images and peak times) and some invalid values. +2. Configure and run HistogramAggregator in chunks. +3. Access histogram counts, bin edges, summary statistics, and valid-event counts (n_events). +4. Plot one pixel histogram from the selected chunks and both gain channels for both image and peak_time columns. +5. Overlay mean, median, and std on top of the histogram curves. +6. Plot the same histogram using Hist's built-in plotting functionality after filling a Hist object with the aggregated histogram counts and variances. +""" + +import matplotlib.pyplot as plt +import numpy as np +import hist +from astropy.table import Table +from astropy.time import Time +from traitlets.config import Config + +from ctapipe.monitoring.aggregator import HistogramAggregator +from hist import Hist + + +# ------------------------------------------------------------------- +# Create synthetic event-wise camera data +# ------------------------------------------------------------------- +rng = np.random.default_rng(42) + +n_events = 2000 +n_channels = 2 +n_pixels = 100 + +times = Time( + np.linspace(60117.911, 60117.9258, num=n_events), + scale="tai", + format="mjd", +) +event_ids = np.arange(n_events) +images = rng.normal(loc=85.0, scale=10.0, size=(n_events, n_channels, n_pixels)) +images[:, 1, :] -= 15 # Simulate lower gain channel by shifting the mean down by 15 +peak_time = rng.normal(loc=20.0, scale=2.0, size=(n_events, n_channels, n_pixels)) + +# Add a few invalid values to demonstrate n_events behavior. +images[3, 0, 10] = np.nan +images[15, 0, 10] = np.nan +peak_time[5, 0, 10] = np.nan +peak_time[35, 1, 10] = np.nan + +# Optional static mask over sample dimensions (channel, pixel). +# Here we exclude channel 1, pixel 99 for all events. +masked_elements_of_sample = np.zeros((n_channels, n_pixels), dtype=bool) +masked_elements_of_sample[1, 99] = True + +table = Table( + [times, event_ids, images, peak_time], + names=("time", "event_id", "image", "peak_time"), +) + + +# ------------------------------------------------------------------- +# Configure and run histogram aggregation +# ------------------------------------------------------------------- +config_image = Config( + { + "HistogramAggregator": { + "chunking_type": "SizeChunking", + "axis_definition": { + "class_name": "Regular", + "bins": 50, + "start": 40.0, + "stop": 110.0, + }, + }, + "SizeChunking": {"chunk_size": 1000}, + } +) + +aggregator_image = HistogramAggregator(config=config_image) +result = aggregator_image( + table=table, + col_name="image", + masked_elements_of_sample=masked_elements_of_sample, +) + +config_peak_time = Config( + { + "HistogramAggregator": { + "chunking_type": "SizeChunking", + "axis_definition": { + "class_name": "Regular", + "bins": 50, + "start": 2.0, + "stop": 38.0, + }, + }, + "SizeChunking": {"chunk_size": 1000}, + } +) + +aggregator_peak_time = HistogramAggregator(config=config_peak_time) +result_peak_time = aggregator_peak_time( + table=table, + col_name="peak_time", + masked_elements_of_sample=masked_elements_of_sample, +) + +print(f"Number of chunks: {len(result)}") +print(f"histogram shape per chunk: {result[0]['histogram'].shape}") +print(f"bin edges shape per chunk: {result[0].meta['bin_edges'].shape}") +print(f"bin centers shape per chunk: {result[0].meta['bin_centers'].shape}") +print(f"n_events shape per chunk: {result[0]['n_events'].shape}") + + +# ------------------------------------------------------------------- +# Plot the histograms for one pixel with two gain channels +# ------------------------------------------------------------------- +# We aggreagted the histograms in two chunks of 1000 events each, so we have two histograms per gain channel +# for the selected pixel. We will plot both chunks for the selected pixel and gain channels +# on the same axes for comparison, and then do the same for the peak_time column in a separate figure. + +pixel_index = 10 +gain_label = {0: "High Gain", 1: "Low Gain"} + +fig, axes = plt.subplots(1, 2, figsize=(12, 4), sharey=True) +for chunk_index, ax in enumerate(axes): + bin_edges = result[chunk_index].meta["bin_edges"] + bin_centers = result[chunk_index].meta["bin_centers"] + channel_handles = [] + + for channel_index in range(n_channels): + counts = result[chunk_index]["histogram"][:, channel_index, pixel_index] + valid_events = result[chunk_index]["n_events"][channel_index, pixel_index] + + line = ax.step( + bin_edges[:-1], + counts, + where="post", + label=f"{gain_label[channel_index]} (n_events={valid_events})", + )[0] + channel_handles.append(line) + color = line.get_color() + + # Plot bin variances as error bars (use sqrt of variance for error) at bin centers + bin_errors = np.sqrt(counts) + ax.errorbar( + bin_centers, + counts, + yerr=bin_errors, + fmt="none", + color=color, + elinewidth=1.0, + capsize=3, + alpha=0.6, + ) + + ax.set_title(f"Chunk {chunk_index}, pixel {pixel_index}") + ax.set_xlabel("image value") + ax.set_ylabel("Counts") + + ax.legend( + handles=channel_handles, + loc="upper left", + fontsize=8, + ) + +plt.show() + + +# ------------------------------------------------------------------- +# Plot peak_time histograms in a separate figure +# ------------------------------------------------------------------- +fig, axes = plt.subplots(1, 2, figsize=(12, 4), sharey=True) +for chunk_index, ax in enumerate(axes): + bin_edges = result_peak_time[chunk_index].meta["bin_edges"] + bin_centers = result_peak_time[chunk_index].meta["bin_centers"] + + channel_handles = [] + + for channel_index in range(n_channels): + counts = result_peak_time[chunk_index]["histogram"][ + :, channel_index, pixel_index + ] + valid_events = result_peak_time[chunk_index]["n_events"][ + channel_index, pixel_index + ] + + line = ax.step( + bin_edges[:-1], + counts, + where="post", + label=f"{gain_label[channel_index]} (n_events={valid_events})", + )[0] + channel_handles.append(line) + color = line.get_color() + + # Plot bin variances as error bars (use sqrt of variance for error) at bin centers + bin_errors = np.sqrt(counts) + ax.errorbar( + bin_centers, + counts, + yerr=bin_errors, + fmt="none", + color=color, + elinewidth=1.0, + capsize=3, + alpha=0.6, + ) + + ax.set_title(f"Peak Time - Chunk {chunk_index}, pixel {pixel_index}") + ax.set_xlabel("peak_time value") + ax.set_ylabel("Counts") + ax.legend( + handles=channel_handles, + loc="upper left", + fontsize=8, + ) + +plt.show() + + +# ------------------------------------------------------------------- +# Initialize hist, fill it and plot via Hist functionality +# ------------------------------------------------------------------- + +# Create a Hist object with the same binning as the aggregator +bin_edges = result[0].meta["bin_edges"] +h = Hist( + hist.axis.Regular(len(bin_edges) - 1, bin_edges[0], bin_edges[-1], name="value") +) + +# Get the histogram counts and variances for the selected pixel and channel +chunk_index = 0 +counts = result[0]["histogram"][:, chunk_index, pixel_index] + +# Set the histogram values using the view interface +h.view(flow=False)[:] = counts + +# Plot the histogram with error bars using Hist's built-in plotting functionality +# Requires 'hist[plot]' to be installed in the environment. +h.plot(yerr=True) +plt.title( + f"Chunk {chunk_index}, Pixel {pixel_index} (High Gain) histogram from Hist object" +) +plt.xlabel("image value") +plt.ylabel("Counts") +plt.show() diff --git a/pyproject.toml b/pyproject.toml index 6e7d5a93c21..04792f77599 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "joblib", "numba >=0.59.0", "numpy >=1.26,<3.0.0", + "hist", "packaging", "psutil", "pyyaml >=5.1", @@ -79,6 +80,7 @@ docs = [ "ctapipe[all]", "ffmpeg-python", "graphviz", + "hist[plot]", "ipython", "jupyter", "nbsphinx", diff --git a/src/ctapipe/containers.py b/src/ctapipe/containers.py index 51d57822c25..641b2786c11 100644 --- a/src/ctapipe/containers.py +++ b/src/ctapipe/containers.py @@ -63,6 +63,7 @@ "StatisticsContainer", "ChunkContainer", "ChunkStatisticsContainer", + "ChunkHistogramsContainer", "ImageStatisticsContainer", "IntensityStatisticsContainer", "PeakTimeStatisticsContainer", @@ -1267,6 +1268,12 @@ class ChunkStatisticsContainer(ChunkContainer): std = Field(None, "standard deviation of the chunk distribution") +class ChunkHistogramsContainer(ChunkContainer): + """Container for histograms of the chunk distribution""" + + histogram = Field(None, "histogram of the chunk distribution") + + class PixelStatisticsContainer(Container): """ Container for pixel statistics from flat-field and sky pedestal events diff --git a/src/ctapipe/io/datawriter.py b/src/ctapipe/io/datawriter.py index 920cafa4177..a08267134bb 100644 --- a/src/ctapipe/io/datawriter.py +++ b/src/ctapipe/io/datawriter.py @@ -44,8 +44,9 @@ def _get_tel_index(event, tel_id): # (meaning readers need to update scripts) # - increase the minor number if new columns or datasets are added # - increase the patch number if there is a small bugfix to the model. -DATA_MODEL_VERSION = "v7.5.0" +DATA_MODEL_VERSION = "v7.6.0" DATA_MODEL_CHANGE_HISTORY = """ +- v7.6.0: - Add new monitoring group for pixel histograms: DL1_PIXEL_HISTOGRAMS_GROUP. - v7.5.0: - Add new field pixel_time_shift in R1CameraContainer and DL0CameraContainer - v7.4.0: - Add new data quality and top-level monitoring groups for DL0 and DL1 data. - v7.3.0: - Add possibility to attach monitoring data to the event HDF5 file. diff --git a/src/ctapipe/io/hdf5dataformat.py b/src/ctapipe/io/hdf5dataformat.py index c1542c8a928..fa1f73f5429 100644 --- a/src/ctapipe/io/hdf5dataformat.py +++ b/src/ctapipe/io/hdf5dataformat.py @@ -147,6 +147,9 @@ DL1_FLATFIELD_PEAK_TIME_GROUP = ( "/dl1/monitoring/telescope/calibration/camera/pixel_statistics/flatfield_peak_time" ) +DL1_PIXEL_HISTOGRAMS_GROUP = ( + "/dl1/monitoring/telescope/calibration/camera/pixel_histograms" +) DL2_SUBARRAY_MONITORING_GROUP = "/dl2/monitoring/subarray" DL2_SUBARRAY_INTER_CALIBRATION_GROUP = "/dl2/monitoring/subarray/inter_calibration" DL2_SUBARRAY_CROSS_CALIBRATION_GROUP = "/dl2/monitoring/subarray/cross_calibration" diff --git a/src/ctapipe/io/hdf5eventsource.py b/src/ctapipe/io/hdf5eventsource.py index 9674c87db70..c23d9d4ed5e 100644 --- a/src/ctapipe/io/hdf5eventsource.py +++ b/src/ctapipe/io/hdf5eventsource.py @@ -108,6 +108,7 @@ "v7.3.0", "v7.4.0", "v7.5.0", + "v7.6.0", ] diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index 7adb1489688..9c5ee41da2d 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -20,6 +20,7 @@ DL1_CAMERA_COEFFICIENTS_GROUP, DL1_COLUMN_NAMES, DL1_IMAGE_STATISTICS_TABLE, + DL1_PIXEL_HISTOGRAMS_GROUP, DL1_PIXEL_STATISTICS_GROUP, DL1_SUBARRAY_POINTING_GROUP, DL1_SUBARRAY_QUALITY_GROUP, @@ -59,6 +60,7 @@ "v7.3.0", "v7.4.0", "v7.5.0", + "v7.6.0", ] @@ -95,6 +97,7 @@ class NodeType(enum.Enum): DL1_SUBARRAY_POINTING_GROUP: NodeType.TABLE, DL1_TEL_POINTING_GROUP: NodeType.TEL_GROUP, DL1_PIXEL_STATISTICS_GROUP: NodeType.ITER_TEL_GROUP, + DL1_PIXEL_HISTOGRAMS_GROUP: NodeType.ITER_TEL_GROUP, DL1_CAMERA_COEFFICIENTS_GROUP: NodeType.TEL_GROUP, DL1_TEL_MUON_THROUGHPUT_GROUP: NodeType.TEL_GROUP, DL2_TEL_GROUP: NodeType.ITER_TEL_GROUP, @@ -485,6 +488,7 @@ def _append_monitoring_data(self, other): if self.telescope_events: self._append_monitoring_telescope_groups(other) self._append_pixel_statistics(other) + self._append_pixel_histograms(other) self._append_quality_telescope_groups(other) def _append_monitoring_subarray_groups(self, other): @@ -531,6 +535,16 @@ def _append_pixel_statistics(self, other): other, other.root[key], once=self.single_ob ) + def _append_pixel_histograms(self, other): + """Append pixel histogram monitoring data.""" + for dl1_colname in DL1_COLUMN_NAMES: + for event_type in EventType: + key = f"{DL1_PIXEL_HISTOGRAMS_GROUP}/{event_type.name.lower()}_{dl1_colname}" + if key in other.root: + self._append_table_group( + other, other.root[key], once=self.single_ob + ) + def _append_quality_telescope_groups(self, other): """Append quality telescope groups. diff --git a/src/ctapipe/io/tests/test_hdf5eventsource.py b/src/ctapipe/io/tests/test_hdf5eventsource.py index 4d49902a745..02958ebdeed 100644 --- a/src/ctapipe/io/tests/test_hdf5eventsource.py +++ b/src/ctapipe/io/tests/test_hdf5eventsource.py @@ -31,7 +31,7 @@ def test_is_compatible(compatible_file, request): def test_metadata(dl1_file): with HDF5EventSource(input_url=dl1_file) as source: assert source.is_simulation - assert source.datamodel_version == (7, 5, 0) + assert source.datamodel_version == (7, 6, 0) assert set(source.datalevels) == { DataLevel.DL1_IMAGES, DataLevel.DL1_PARAMETERS, diff --git a/src/ctapipe/monitoring/aggregator.py b/src/ctapipe/monitoring/aggregator.py index 458b492581e..5e682d924c8 100644 --- a/src/ctapipe/monitoring/aggregator.py +++ b/src/ctapipe/monitoring/aggregator.py @@ -1,12 +1,12 @@ """ -Algorithms to compute aggregated time-series statistics from columns of an astropy table. +Algorithms to compute aggregated time-series statistics and histograms from columns of an astropy table. These classes take as input an events table containing any event-wise quantities (e.g., images, scalars, vectors), divide it into time chunks, which may optionally -overlap, and compute various aggregated statistics for each chunk. The statistics -include the count, mean, median, and standard deviation. The result is a monitoring -table with columns describing the start and stop time of the chunk and the -aggregated statistic values. +overlap, and compute various aggregated statistics and histograms for each chunk. +The statistics include the mean, median, and standard deviation. The result +is a monitoring table with columns describing the start and stop time of the chunk and the +aggregated statistic values or histograms. The aggregation is always performed along axis=0 (the event dimension), making these classes suitable for any N-dimensional event-wise data. @@ -17,6 +17,7 @@ "SizeChunking", "TimeChunking", "BaseAggregator", + "HistogramAggregator", "StatisticsAggregator", "PlainAggregator", "SigmaClippingAggregator", @@ -27,13 +28,16 @@ from collections.abc import Generator import astropy.units as u +import hist import numpy as np from astropy.stats import sigma_clip from astropy.table import Table +from hist import Hist +from traitlets import TraitError -from ..containers import ChunkStatisticsContainer +from ..containers import ChunkHistogramsContainer, ChunkStatisticsContainer from ..core import Component -from ..core.traits import AstroQuantity, Bool, ComponentName, Enum, Int +from ..core.traits import AstroQuantity, Bool, ComponentName, Dict, Enum, Int class BaseChunking(Component, metaclass=ABCMeta): @@ -272,10 +276,10 @@ def _generate_chunks(self, table) -> Generator[Table, None, None]: class BaseAggregator(Component, metaclass=ABCMeta): """ - Base class for aggregators that compute statistics over chunks of data. + Base class for aggregators that compute statistics and histograms over chunks of data. Aggregators use a chunking strategy to divide input tables and compute - aggregated statistics for each chunk. + aggregated statistics and histograms for each chunk. """ chunking_type = ComponentName( @@ -305,7 +309,7 @@ def __call__( col_name="image", ) -> Table: r""" - Divide table into chunks and compute aggregated statistic values. + Divide table into chunks and compute aggregated statistic values or histograms. Parameters ---------- @@ -322,7 +326,7 @@ def __call__( ------- astropy.table.Table table containing the start and end values as timestamps and event IDs - as well as the aggregated statistic values for each chunk + as well as the aggregated statistic values or histograms for each chunk """ # Get chunks using the chunking strategy chunks = self.chunking(table) @@ -347,11 +351,20 @@ def __call__( # Compute aggregator-specific statistics self._add_result_columns( - chunk[col_name].data, masked_elements_of_sample, results + chunk[col_name].data, + masked_elements_of_sample, + results, ) + # Deal with metadata if present in results + metadata = {} + if "meta" in results: + metadata["meta"] = results.pop("meta") + # Create and return table result_table = Table(results) + if "meta" in metadata: + result_table.meta = metadata["meta"] # Preserve units if present if hasattr(table[col_name], "unit") and table[col_name].unit is not None: @@ -360,9 +373,14 @@ def __call__( return result_table @abstractmethod - def _add_result_columns(self, data, masked_elements_of_sample, results_dict): + def _add_result_columns( + self, + data, + masked_elements_of_sample, + results_dict, + ): r""" - Compute statistics and add columns to results dictionary. + Compute statistics and histograms. Add columns to results dictionary. Parameters ---------- @@ -371,7 +389,7 @@ def _add_result_columns(self, data, masked_elements_of_sample, results_dict): masked_elements_of_sample : ndarray, optional Boolean mask of shape (\*data_dimensions) for elements to exclude results_dict : dict - Dictionary to which statistic columns should be added. + Dictionary to which statistic or histogram columns should be added. """ pass @@ -381,6 +399,111 @@ def _set_result_units(self, table, unit): pass +class HistogramAggregator(BaseAggregator): + """ + Compute aggregated histograms from a chunk of event-wise data using Hist. + + Works with any N-dimensional event-wise data by aggregating along axis=0 (event dimension). + """ + + axis_definition = Dict( + allow_none=False, + help=( + "Dictionary that contains ``class_name`` and the corresponding kwargs " + "to construct a ``hist.axis.(**kwargs)`` instance. " + "E.g. ``{'class_name': 'Regular', 'bins': 40, 'start': 20.0, 'stop': 80.0}``." + ), + ).tag(config=True) + + def __init__(self, config=None, parent=None, **kwargs): + """ + Parameters + ---------- + config : traitlets.loader.Config + Configuration specified by config file or cmdline arguments + parent : ctapipe.core.Component or ctapipe.core.Tool + Parent of this component in the configuration hierarchy + """ + super().__init__(config=config, parent=parent, **kwargs) + kwargs = self.axis_definition.copy() + if "class_name" not in kwargs.keys(): + raise TraitError( + "The ``axis_definition`` trait is missing required key 'class_name'." + ) + cls = kwargs.pop("class_name") + kwargs["name"] = "value" + self.hist_axis = getattr(hist.axis, cls)(**kwargs) + + def _add_result_columns( + self, + data, + masked_elements_of_sample, + results_dict, + ): + histograms = self.compute_histograms(data, masked_elements_of_sample) + results_dict["n_events"].append(histograms.n_events) + results_dict["histogram"].append(histograms.histogram) + if "meta" not in results_dict and histograms.meta: + results_dict["meta"] = histograms.meta + + def _set_result_units(self, table, unit): + """ + Set units for histogram columns that inherit from the input data. + + For HistogramAggregator, the histogram columns should have the same units as the input data. + """ + for col in ("bin_edges", "bin_centers"): + table.meta[col].unit = unit + + def compute_histograms( + self, data, masked_elements_of_sample + ) -> ChunkHistogramsContainer: + # Build the histograms over the event dimension (axis=0) for each element of the data dimensions + event_dim = data.shape[0] + spatial_shape = data.shape[1:] + n_elements = int(np.prod(spatial_shape)) + # Broadcast mask to full shape + if masked_elements_of_sample is not None: + mask = np.broadcast_to(masked_elements_of_sample, data.shape) + else: + mask = np.zeros_like(data, dtype=bool) + # Mask invalid values (NaN, inf) + invalid = ~np.isfinite(data) + mask = mask | invalid + # The histogram is computed for each element of the data dimensions, so we need to flatten + flat_data = data.reshape(event_dim, n_elements) + flat_mask = mask.reshape(event_dim, n_elements) + # Build histogram object over the event dimension for each element of the data dimensions + hist_object = Hist( + self.hist_axis, + hist.axis.Integer(0, n_elements, name="element"), + storage=hist.storage.Int64(), + ) + # Vectorized filling - all valid values and their dimension indices at once + valid_mask = ~flat_mask + values = flat_data[valid_mask] + dimension_indices = np.nonzero(valid_mask)[ + 1 + ] # column indices (which dimension) + if len(values) > 0: + hist_object.fill(value=values, element=dimension_indices) + # Extract histogram counts and reshape to original data dimensions (with bin dimension first) + n_bins = hist_object.axes[0].size + hist_counts = hist_object.values() # shape: (bins, n_elements) + hist_counts = hist_counts.reshape((n_bins,) + spatial_shape) + # Count valid entries per element (excludes masked and invalid values) + n_events_valid = np.sum(~flat_mask, axis=0).reshape(spatial_shape) + # Build and return the ChunkHistogramsContainer + return ChunkHistogramsContainer( + n_events=n_events_valid, + histogram=hist_counts, + meta={ + "bin_edges": hist_object.axes[0].edges, + "bin_centers": hist_object.axes[0].centers, + }, + ) + + class StatisticsAggregator(BaseAggregator): """ Base component to handle the computation of aggregated statistic values from a table @@ -389,7 +512,12 @@ class StatisticsAggregator(BaseAggregator): Aggregation is performed along axis=0 (the event dimension) for any N-dimensional data. """ - def _add_result_columns(self, data, masked_elements_of_sample, results_dict): + def _add_result_columns( + self, + data, + masked_elements_of_sample, + results_dict, + ): stats = self.compute_stats(data, masked_elements_of_sample) results_dict["n_events"].append(stats.n_events) results_dict["mean"].append(stats.mean) @@ -400,7 +528,7 @@ def _set_result_units(self, table, unit): """ Set units for statistics columns that inherit from the input data. - For StatisticsAggregator, the mean, median, and std columns + For StatisticsAggregator, the mean, median, std, and histogram columns should have the same units as the input data. """ for col in ("mean", "median", "std"): @@ -422,8 +550,8 @@ def compute_stats( Returns ------- - StatisticsContainer - Container with computed statistics + ChunkStatisticsContainer + Container with computed statistics for the chunk """ pass diff --git a/src/ctapipe/monitoring/calculator.py b/src/ctapipe/monitoring/calculator.py index 1f4a5de32dd..1a4f511cc0d 100644 --- a/src/ctapipe/monitoring/calculator.py +++ b/src/ctapipe/monitoring/calculator.py @@ -15,7 +15,7 @@ TelescopeParameter, TraitError, ) -from .aggregator import StatisticsAggregator +from .aggregator import BaseAggregator from .outlier import OutlierDetector __all__ = [ @@ -38,11 +38,9 @@ class PixelStatisticsCalculator(TelescopeComponent): """ stats_aggregator_type = TelescopeParameter( - trait=ComponentName( - StatisticsAggregator, default_value="SigmaClippingAggregator" - ), + trait=ComponentName(BaseAggregator, default_value="SigmaClippingAggregator"), default_value="SigmaClippingAggregator", - help="Name of the StatisticsAggregator subclass to be used.", + help="Name of the BaseAggregator subclass to be used.", ).tag(config=True) outlier_detector_list = List( @@ -87,12 +85,10 @@ def __init__( """ super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) - # Initialize the instances of StatisticsAggregator + # Initialize the instances of BaseAggregator self.stats_aggregators = {} for _, _, name in self.stats_aggregator_type: - self.stats_aggregators[name] = StatisticsAggregator.from_name( - name, parent=self - ) + self.stats_aggregators[name] = BaseAggregator.from_name(name, parent=self) # Initialize the instances of OutlierDetector from the configuration self.outlier_detectors, self.apply_to_list = [], [] @@ -188,7 +184,7 @@ def second_pass( col_name="image", ) -> Table: """ - Conduct a second pass over the data to refine the statistics in regions with a high percentage of faulty pixels. + Conduct a second pass over the data to refine the statistics or histograms in regions with a high percentage of faulty pixels. This method performs a second pass over the data with a refined shift of the chunk in regions where a high percentage of faulty pixels were detected during the first pass. Note: Multiple first passes of different calibration events are @@ -212,7 +208,7 @@ def second_pass( Returns ------- astropy.table.Table - Table containing the aggregated statistics after the second pass, their outlier masks, and the validity of the chunks. + Table containing the aggregated statistics or histograms after the second pass, their outlier masks, and the validity of the chunks. """ # Check if at least one chunk is faulty if np.all(valid_chunks): @@ -278,20 +274,20 @@ def second_pass( def _find_and_append_outliers(self, aggregated_stats): """ - Find outliers and append the masks in the aggregated statistics. + Find outliers and append the masks in the aggregated statistics or histograms. - This method detects outliers in the aggregated statistics using the + This method detects outliers in the aggregated statistics or histograms using the outlier detectors defined in the configuration. Table containing the - aggregated statistics will be appended with the outlier masks for each + aggregated statistics or histograms will be appended with the outlier masks for each detector and a combined outlier mask. Parameters ---------- aggregated_stats : astropy.table.Table - Table containing the aggregated statistics. + Table containing the aggregated statistics or histograms. """ - outlier_mask = np.zeros_like(aggregated_stats["mean"], dtype=bool) + outlier_mask = np.zeros_like(aggregated_stats["n_events"], dtype=bool) for d, (column_name, outlier_detector) in enumerate( zip(self.apply_to_list, self.outlier_detectors) ): @@ -316,7 +312,7 @@ def _get_valid_chunks(self, outlier_mask): ---------- outlier_mask : numpy.ndarray Boolean array indicating outlier pixels. The shape of the array should - match the shape of the aggregated statistics. + match the shape of the aggregated statistics or histograms. Returns ------- diff --git a/src/ctapipe/monitoring/outlier.py b/src/ctapipe/monitoring/outlier.py index 940d779470f..de7c3f34129 100644 --- a/src/ctapipe/monitoring/outlier.py +++ b/src/ctapipe/monitoring/outlier.py @@ -36,12 +36,12 @@ def __call__(self, column) -> bool: ---------- column : astropy.table.Column column with chunk-wise aggregated statistic values (mean, median, or std) - of shape (n_entries, n_channels, n_pixels) + of shape (n_entries, n_channels, n_pixels) or histograms of shape (n_bins, n_channels, n_pixels). Returns ------- boolean mask - mask of outliers of shape (n_entries, n_channels, n_pixels) + mask of outliers of shape (n_entries or n_bins, n_channels, n_pixels) """ pass diff --git a/src/ctapipe/monitoring/tests/test_aggregator.py b/src/ctapipe/monitoring/tests/test_aggregator.py index 78f124dd553..0491a7a7fd8 100644 --- a/src/ctapipe/monitoring/tests/test_aggregator.py +++ b/src/ctapipe/monitoring/tests/test_aggregator.py @@ -1,5 +1,5 @@ """ -Tests for StatisticsAggregator and related functions +Tests for aggregators and related functions """ import astropy.units as u @@ -10,6 +10,7 @@ from traitlets.config import Config from ctapipe.monitoring.aggregator import ( + HistogramAggregator, PlainAggregator, SigmaClippingAggregator, ) @@ -106,6 +107,183 @@ def test_aggregators(): np.testing.assert_allclose(time_stats[0]["std"], 5.0, atol=1.5) +def test_histograms_aggregator(): + """Test histogram computation shape, values and entry counts for N-D event data.""" + + rng = np.random.default_rng(10) + data = rng.normal(77.0, 10.0, size=(200, 2, 8)) + low_gain_shift = 12.0 + data[:, 1, :] += low_gain_shift + + config = Config( + { + "HistogramAggregator": { + "axis_definition": { + "class_name": "Regular", + "bins": 40, + "start": 0.0, + "stop": 200.0, + } + } + } + ) + aggregator = HistogramAggregator(config=config) + + histo_chunk = aggregator.compute_histograms(data, masked_elements_of_sample=None) + + assert histo_chunk.histogram.shape == (40, 2, 8) + assert histo_chunk.meta["bin_edges"].shape == (41,) + assert histo_chunk.n_events.shape == (2, 8) + np.testing.assert_array_equal(histo_chunk.n_events, np.full((2, 8), 200)) + + # Configured Regular axis is [0, 200] with 40 bins. + np.testing.assert_allclose(histo_chunk.meta["bin_edges"], np.linspace(0, 200, 41)) + + # With a wide range all events should be counted for each pixel. + np.testing.assert_array_equal( + histo_chunk.histogram.sum(axis=0), np.full((2, 8), 200) + ) + + # Recover channel means from the histogram and check the introduced low-gain shift. + bin_centers = histo_chunk.meta["bin_centers"] + weighted_sum = np.sum( + histo_chunk.histogram * bin_centers[:, np.newaxis, np.newaxis], axis=0 + ) + counts = np.sum(histo_chunk.histogram, axis=0) + weighted_mean = weighted_sum / counts + channel_mean = weighted_mean.mean(axis=1) + np.testing.assert_allclose( + channel_mean[1] - channel_mean[0], low_gain_shift, atol=2.0 + ) + + +def test_histograms_aggregator_chunked_call(): + """Test chunked table aggregation path and metadata for HistogramAggregator.""" + + times = Time( + np.linspace(60117.911, 60117.912, num=120), + scale="tai", + format="mjd", + ) + event_ids = np.arange(120) + rng = np.random.default_rng(11) + data = rng.normal(10.0, 2.0, size=(120, 2, 5)) + + table = Table([times, event_ids, data], names=("time", "event_id", "image")) + + config = Config( + { + "HistogramAggregator": { + "chunking_type": "SizeChunking", + "axis_definition": { + "class_name": "Regular", + "bins": 50, + "start": 0.0, + "stop": 20.0, + }, + }, + "SizeChunking": {"chunk_size": 60}, + } + ) + aggregator = HistogramAggregator(config=config) + result = aggregator(table=table) + + assert len(result) == 2 + + assert result[0]["time_start"] == times[0] + assert result[1]["time_end"] == times[-1] + assert result[0]["event_id_start"] == event_ids[0] + assert result[1]["event_id_end"] == event_ids[-1] + assert result[0]["histogram"].shape == (50, 2, 5) + assert "meta" not in result[0].keys() + assert result[0].meta["bin_edges"].shape == (51,) + assert result[0].meta["bin_centers"].shape == (50,) + assert result[0]["n_events"].shape == (2, 5) + np.testing.assert_array_equal(result[0]["n_events"], np.full((2, 5), 60)) + assert np.all(np.diff(result[0].meta["bin_edges"]) >= 0) + + # Histogram edges are config-defined and must be identical for all chunks. + np.testing.assert_array_equal( + result[0].meta["bin_edges"], result[1].meta["bin_edges"] + ) + + +def test_histograms_aggregator_masks_and_nan_handling(): + """Test masking and NaN handling in histogram computation.""" + + rng = np.random.default_rng(12) + n_events = 100 + data = rng.normal(5.0, 1.0, size=(n_events, 2, 4)) + data[3, 0, 0] = np.nan + + mask = np.zeros((2, 4), dtype=bool) + mask[0, 1] = True + + config = Config( + { + "HistogramAggregator": { + "axis_definition": { + "class_name": "Regular", + "bins": 25, + "start": 0.0, + "stop": 10.0, + } + } + } + ) + aggregator = HistogramAggregator(config=config) + histo_chunk = aggregator.compute_histograms( + data, + masked_elements_of_sample=mask, + ) + + # Fully masked pixel should receive no entries. + assert histo_chunk["histogram"][:, 0, 1].sum() == 0 + assert histo_chunk["n_events"][0, 1] == 0 + # One NaN should be dropped for this pixel. + assert histo_chunk["histogram"][:, 0, 0].sum() == histo_chunk["n_events"][0, 0] + # Unaffected pixels should retain the full number of events. + assert histo_chunk["n_events"][1, 3] == n_events + + +@pytest.mark.parametrize( + ("shape", "spatial_shape"), + [ + ((200,), ()), + ((200, 10), (10,)), + ((200, 2, 8, 3), (2, 8, 3)), + ], +) +def test_histograms_aggregator_input_shapes(shape, spatial_shape): + """Test histogram computation for 1D, 2D and higher-dimensional inputs.""" + + rng = np.random.default_rng(13) + data = rng.normal(50.0, 5.0, size=shape) + n_events = shape[0] + + config = Config( + { + "HistogramAggregator": { + "axis_definition": { + "class_name": "Regular", + "bins": 20, + "start": 0.0, + "stop": 100.0, + } + } + } + ) + aggregator = HistogramAggregator(config=config) + histo_chunk = aggregator.compute_histograms(data, masked_elements_of_sample=None) + + assert histo_chunk.histogram.shape == (20,) + spatial_shape + assert np.shape(histo_chunk.n_events) == spatial_shape + + expected_counts = np.full(spatial_shape, n_events) + np.testing.assert_array_equal(histo_chunk.n_events, expected_counts) + np.testing.assert_array_equal(histo_chunk.histogram.sum(axis=0), expected_counts) + + def test_chunk_shift(): """test the chunk shift option and the boundary case for the last chunk""" diff --git a/src/ctapipe/monitoring/tests/test_calculator.py b/src/ctapipe/monitoring/tests/test_calculator.py index 747dfd777a0..0b5ebeab962 100644 --- a/src/ctapipe/monitoring/tests/test_calculator.py +++ b/src/ctapipe/monitoring/tests/test_calculator.py @@ -88,6 +88,53 @@ def test_statistics_calculator(example_subarray): assert len(stats_stacked) == expected_len_firstpass + expected_len_secondpass +def test_statistics_calculator_with_histogram_aggregator(example_subarray): + """test that PixelStatisticsCalculator works with HistogramAggregator""" + + n_images = 120 + times = Time( + np.linspace(60117.911, 60117.913, num=n_images), scale="tai", format="mjd" + ) + event_ids = np.arange(n_images) + rng = np.random.default_rng(7) + charge_data = rng.normal(77.0, 10.0, size=(n_images, 2, 16)) + + charge_table = Table( + [times, event_ids, charge_data], + names=("time", "event_id", "image"), + ) + + config = Config( + { + "PixelStatisticsCalculator": { + "stats_aggregator_type": [ + ("id", 1, "HistogramAggregator"), + ], + }, + "HistogramAggregator": { + "chunking_type": "SizeChunking", + "axis_definition": { + "class_name": "Regular", + "bins": 30, + "start": 0.0, + "stop": 140.0, + }, + }, + "SizeChunking": { + "chunk_size": 60, + }, + } + ) + + calculator = PixelStatisticsCalculator(subarray=example_subarray, config=config) + stats = calculator.first_pass(table=charge_table, tel_id=1) + + assert len(stats) == 2 + assert stats[0]["histogram"].shape == (30, 2, 16) + assert stats[0]["n_events"].shape == (2, 16) + assert stats[0].meta["bin_edges"].shape == (31,) + + def test_outlier_detector(example_subarray): """test the chunk shift option and the boundary case for the last chunk""" diff --git a/src/ctapipe/tools/calculate_pixel_stats.py b/src/ctapipe/tools/calculate_pixel_stats.py index ad25bf4f502..49c43d33e9e 100644 --- a/src/ctapipe/tools/calculate_pixel_stats.py +++ b/src/ctapipe/tools/calculate_pixel_stats.py @@ -19,7 +19,11 @@ ) from ctapipe.exceptions import InputMissing from ctapipe.io import HDF5Merger, write_table -from ctapipe.io.hdf5dataformat import DL1_COLUMN_NAMES, DL1_PIXEL_STATISTICS_GROUP +from ctapipe.io.hdf5dataformat import ( + DL1_COLUMN_NAMES, + DL1_PIXEL_HISTOGRAMS_GROUP, + DL1_PIXEL_STATISTICS_GROUP, +) from ctapipe.io.tableloader import TableLoader from ctapipe.monitoring.calculator import PixelStatisticsCalculator @@ -205,17 +209,22 @@ def start(self): ) # Construct the output table name based on the event type and the selected column name output_table_name = f"{EventType(dl1_table['event_type'][0]).name.lower()}_{self.input_column_name}" + table_group = ( + DL1_PIXEL_HISTOGRAMS_GROUP + if "histogram" in aggregated_stats.colnames + else DL1_PIXEL_STATISTICS_GROUP + ) # Write the aggregated statistics and their outlier mask to the output file write_table( aggregated_stats, self.output_path, - f"{DL1_PIXEL_STATISTICS_GROUP}/{output_table_name}/tel_{tel_id:03d}", + f"{table_group}/{output_table_name}/tel_{tel_id:03d}", overwrite=self.overwrite, ) self.log.info( "DL1 monitoring data was stored in '%s' under '%s'", self.output_path, - f"{DL1_PIXEL_STATISTICS_GROUP}/{output_table_name}", + f"{table_group}/{output_table_name}", ) def finish(self): diff --git a/src/ctapipe/tools/tests/test_calculate_pixel_stats.py b/src/ctapipe/tools/tests/test_calculate_pixel_stats.py index 96dcf3cacd6..4c22669bd72 100644 --- a/src/ctapipe/tools/tests/test_calculate_pixel_stats.py +++ b/src/ctapipe/tools/tests/test_calculate_pixel_stats.py @@ -9,7 +9,11 @@ from ctapipe.core import run_tool from ctapipe.core.tool import ToolConfigurationError from ctapipe.io import read_table -from ctapipe.io.hdf5dataformat import DL1_COLUMN_NAMES, DL1_PIXEL_STATISTICS_GROUP +from ctapipe.io.hdf5dataformat import ( + DL1_COLUMN_NAMES, + DL1_PIXEL_HISTOGRAMS_GROUP, + DL1_PIXEL_STATISTICS_GROUP, +) from ctapipe.tools.calculate_pixel_stats import PixelStatisticsCalculatorTool from ctapipe.tools.merge import MergeTool @@ -85,6 +89,58 @@ def test_calculate_pixel_stats_tool(tmp_path, dl1_image_file): ) +def test_calculate_pixel_stats_tool_with_histogram_aggregator(tmp_path, dl1_image_file): + """check tool execution with HistogramAggregator""" + + tel_id = 3 + output_file = tmp_path / "subarray_image_hist_monitoring.dl1.h5" + config = Config( + { + "PixelStatisticsCalculatorTool": { + "allowed_tels": [3], + "input_column_name": "image", + }, + "PixelStatisticsCalculator": { + "stats_aggregator_type": [ + ("type", "*", "HistogramAggregator"), + ], + }, + "HistogramAggregator": { + "chunking_type": "SizeChunking", + "axis_definition": { + "class_name": "Regular", + "bins": 20, + "start": 0.0, + "stop": 200.0, + }, + }, + "SizeChunking": { + "chunk_size": 1, + }, + } + ) + + run_tool( + PixelStatisticsCalculatorTool(config=config), + argv=[ + f"--input_url={dl1_image_file}", + f"--output_path={output_file}", + "--overwrite", + ], + cwd=tmp_path, + raises=True, + ) + + stats = read_table( + output_file, + path=f"{DL1_PIXEL_HISTOGRAMS_GROUP}/subarray_image/tel_{tel_id:03d}", + ) + + assert "histogram" in stats.colnames + assert "bin_edges" in stats.meta + assert stats["histogram"].ndim == 4 + + def test_tool_config_error(tmp_path, dl1_image_file): """check tool configuration error"""