diff --git a/docs/changes/2916.feature.rst b/docs/changes/2916.feature.rst new file mode 100644 index 00000000000..357c1c6a648 --- /dev/null +++ b/docs/changes/2916.feature.rst @@ -0,0 +1 @@ +Add a new merge_strategy option 'combine-telescope-events' to HDF5Merger component to support the merging of telescope-wise data from different files for the same observation block. diff --git a/src/ctapipe/instrument/subarray.py b/src/ctapipe/instrument/subarray.py index 9e22fe97964..3b858b01b68 100644 --- a/src/ctapipe/instrument/subarray.py +++ b/src/ctapipe/instrument/subarray.py @@ -614,12 +614,18 @@ def to_hdf(self, h5file, overwrite=False, mode="a"): """ # here to prevent circular import from ..io import write_table + from ..io.hdf5dataformat import ( + CONFIG_INSTRUMENT_SUBARRAY, + CONFIG_INSTRUMENT_SUBARRAY_LAYOUT, + CONFIG_INSTRUMENT_TEL_CAMERA, + CONFIG_INSTRUMENT_TEL_OPTICS, + ) with ExitStack() as stack: if not isinstance(h5file, tables.File): h5file = stack.enter_context(tables.open_file(h5file, mode=mode)) - if "/configuration/instrument/subarray" in h5file.root and not overwrite: + if CONFIG_INSTRUMENT_SUBARRAY in h5file.root and not overwrite: raise OSError( "File already contains a SubarrayDescription and overwrite=False" ) @@ -630,26 +636,26 @@ def to_hdf(self, h5file, overwrite=False, mode="a"): write_table( subarray_table, h5file, - path="/configuration/instrument/subarray/layout", + path=CONFIG_INSTRUMENT_SUBARRAY_LAYOUT, overwrite=overwrite, ) write_table( self.to_table(kind="optics"), h5file, - path="/configuration/instrument/telescope/optics", + path=CONFIG_INSTRUMENT_TEL_OPTICS, overwrite=overwrite, ) for i, camera in enumerate(self.camera_types): write_table( camera.geometry.to_table(), h5file, - path=f"/configuration/instrument/telescope/camera/geometry_{i}", + path=f"{CONFIG_INSTRUMENT_TEL_CAMERA}/geometry_{i}", overwrite=overwrite, ) write_table( camera.readout.to_table(), h5file, - path=f"/configuration/instrument/telescope/camera/readout_{i}", + path=f"{CONFIG_INSTRUMENT_TEL_CAMERA}/readout_{i}", overwrite=overwrite, ) @@ -657,13 +663,16 @@ def to_hdf(self, h5file, overwrite=False, mode="a"): def from_hdf(cls, path, focal_length_choice=FocalLengthKind.EFFECTIVE): # here to prevent circular import from ..io import read_table + from ..io.hdf5dataformat import ( + CONFIG_INSTRUMENT_SUBARRAY_LAYOUT, + CONFIG_INSTRUMENT_TEL_CAMERA, + CONFIG_INSTRUMENT_TEL_OPTICS, + ) if isinstance(focal_length_choice, str): focal_length_choice = FocalLengthKind[focal_length_choice.upper()] - layout = read_table( - path, "/configuration/instrument/subarray/layout", table_cls=QTable - ) + layout = read_table(path, CONFIG_INSTRUMENT_SUBARRAY_LAYOUT, table_cls=QTable) version = layout.meta.get("TAB_VER") if version not in cls.COMPATIBLE_VERSIONS: @@ -677,22 +686,16 @@ def from_hdf(cls, path, focal_length_choice=FocalLengthKind.EFFECTIVE): for idx in set(layout["camera_index"]): geometry = CameraGeometry.from_table( - read_table( - path, f"/configuration/instrument/telescope/camera/geometry_{idx}" - ) + read_table(path, f"{CONFIG_INSTRUMENT_TEL_CAMERA}/geometry_{idx}") ) readout = CameraReadout.from_table( - read_table( - path, f"/configuration/instrument/telescope/camera/readout_{idx}" - ) + read_table(path, f"{CONFIG_INSTRUMENT_TEL_CAMERA}/readout_{idx}") ) cameras[idx] = CameraDescription( name=geometry.name, readout=readout, geometry=geometry ) - optics_table = read_table( - path, "/configuration/instrument/telescope/optics", table_cls=QTable - ) + optics_table = read_table(path, CONFIG_INSTRUMENT_TEL_OPTICS, table_cls=QTable) optics_version = optics_table.meta.get("TAB_VER") if optics_version not in OpticsDescription.COMPATIBLE_VERSIONS: @@ -808,3 +811,81 @@ def check_matching_subarrays(subarray_list: list) -> bool: set(subarray.tel_ids) == set(subarray_list[0].tel_ids) for subarray in subarray_list ) + + @staticmethod + def merge_subarrays( + subarray_list: list, + name=None, + overwrite_tel_ids: bool = False, + ) -> "SubarrayDescription": + """Merge multiple subarrays into one + + Parameters + ---------- + subarray_list: list(SubarrayDescription) + list of subarrays to merge + name: str + name of new merged subarray + overwrite_tel_ids: bool + if True, telescope entries from later subarrays replace earlier ones; + if False (default), encountering a duplicate telescope id raises a + ValueError. + Returns + ------- + SubarrayDescription + + """ + + tel_positions, tel_descriptions = {}, {} + tel_ids, tid_to_subarray = set(), {} + reference_location = subarray_list[ + 0 + ].reference_location # Get the reference location from the first subarray + for s, subarray in enumerate(subarray_list): + if not isinstance(subarray, SubarrayDescription): + raise TypeError( + "All elements of subarray_list must be 'SubarrayDescription' " + f"instances, got '{type(subarray)}' for element '{s}'." + ) + for tid in subarray.tel_ids: + if tid in tel_ids: + if overwrite_tel_ids: + # Warn about overwriting telescope entry + msg = ( + f"Overwriting telescope id '{tid}' from subarray " + f"'{subarray.name}' into merged subarray." + ) + warnings.warn(msg, UserWarning) + else: + raise ValueError( + "Duplicate telescope id encountered while merging subarrays. " + f"Telescope '{tid}' already defined; set overwrite_tel_ids=True to " + "allow later subarrays to replace earlier entries." + ) + tid_to_subarray[tid] = subarray + tel_ids.add(tid) + if subarray.reference_location != reference_location: + raise ValueError( + "All subarrays must have the same reference_location to be merged. " + f"Subarray '{subarray.name}' ({subarray.reference_location}) does not match " + f"the reference location of the first subarray '{subarray_list[0].name}' " + f"({reference_location})." + ) + + # Merge telescope positions and descriptions, optionally allowing later entries to overwrite + for tid in sorted(tel_ids): + # Copy/overwrite telescope position and description from the current subarray + subarray = tid_to_subarray[tid] + tel_positions[tid] = subarray.positions[tid] + tel_descriptions[tid] = subarray.tels[tid] + + if name is None: + name = "Merged_" + _range_extraction(sorted(tel_ids)) + + newsub = SubarrayDescription( + name, + tel_positions=tel_positions, + tel_descriptions=tel_descriptions, + reference_location=reference_location, + ) + return newsub diff --git a/src/ctapipe/instrument/tests/test_subarray.py b/src/ctapipe/instrument/tests/test_subarray.py index 95df7f1fdee..c01f8b36c42 100644 --- a/src/ctapipe/instrument/tests/test_subarray.py +++ b/src/ctapipe/instrument/tests/test_subarray.py @@ -313,3 +313,71 @@ def test_check_matchings_subarray(example_subarray, subarray_prod5_paranal): assert not SubarrayDescription.check_matching_subarrays( [example_subarray, subarray_prod5_paranal] ) + + +def test_merge_subarrays(example_subarray): + """Test SubarrayDescription.merge_subarrays static method""" + + sub1 = example_subarray.select_subarray([1, 2], name="s1") + sub2 = example_subarray.select_subarray([3, 4], name="s2") + expected_sub = example_subarray.select_subarray([1, 2, 3, 4], name="Merged_1-4") + merged_sub = SubarrayDescription.merge_subarrays([sub1, sub2]) + + assert expected_sub == merged_sub + + +def test_merge_subarrays_exceptions(example_subarray): + """Merging subarrays with invalid parameters should raise exceptions.""" + + sub1 = example_subarray.select_subarray([1, 2], name="s1") + sub2 = example_subarray.select_subarray([3, 4], name="s2") + + # Check that invalid inputs raise exceptions + with pytest.raises( + TypeError, match="All elements of subarray_list must be 'SubarrayDescription'" + ): + SubarrayDescription.merge_subarrays([sub1, int(67)]) + + # Check that duplicate telescope ids without overwrite raises exception + with pytest.raises(ValueError, match="Duplicate telescope id encountered"): + SubarrayDescription.merge_subarrays([sub1, sub1], overwrite_tel_ids=False) + + # Check that different reference locations raises exception + shifted_location = EarthLocation( + lon=sub2.reference_location.lon, + lat=sub2.reference_location.lat, + height=sub2.reference_location.height + 1 * u.m, + ) + sub2_shifted = SubarrayDescription( + name="shifted", + tel_positions=sub2.positions, + tel_descriptions=sub2.tel, + reference_location=shifted_location, + ) + with pytest.raises( + ValueError, match="All subarrays must have the same reference_location" + ): + SubarrayDescription.merge_subarrays( + [sub1, sub2_shifted], overwrite_tel_ids=True + ) + + +def test_merge_subarrays_overwrite_tel_ids(example_subarray): + """Later subarrays overwrite earlier telescope entries when enabled.""" + + sub1 = example_subarray.select_subarray([1, 2], name="s1") + sub2 = example_subarray.select_subarray([1, 2], name="s2") + + # Overwrite positions of the telescopes in sub2 + sub2.positions[1] = np.array([10, 0, 0]) * u.m + sub2.positions[2] = np.array([-10, 0, 0]) * u.m + + with pytest.warns(UserWarning, match="Overwriting telescope id"): + merged_sub = SubarrayDescription.merge_subarrays( + [sub1, sub2], overwrite_tel_ids=True + ) + + assert merged_sub.name == "Merged_1,2" + assert merged_sub.n_tels == 2 + np.testing.assert_allclose(merged_sub.positions[1], [10, 0, 0] * u.m) + np.testing.assert_allclose(merged_sub.positions[2], [-10, 0, 0] * u.m) diff --git a/src/ctapipe/io/hdf5dataformat.py b/src/ctapipe/io/hdf5dataformat.py index 0e24fc35372..29bd2d58b95 100644 --- a/src/ctapipe/io/hdf5dataformat.py +++ b/src/ctapipe/io/hdf5dataformat.py @@ -9,6 +9,11 @@ "SIMULATION_GROUP", "SIMULATION_TEL_TABLE", "CONFIG_GROUP", + "CONFIG_INSTRUMENT_SUBARRAY", + "CONFIG_INSTRUMENT_SUBARRAY_LAYOUT", + "CONFIG_INSTRUMENT_TEL", + "CONFIG_INSTRUMENT_TEL_OPTICS", + "CONFIG_INSTRUMENT_TEL_CAMERA", "SCHEDULING_BLOCK_TABLE", "OBSERVATION_BLOCK_TABLE", "SIMULATION_RUN_TABLE", @@ -38,6 +43,7 @@ "DL2_TEL_GEOMETRY_GROUP", "DL2_TEL_ENERGY_GROUP", "DL2_TEL_PARTICLETYPE_GROUP", + "DL2_TEL_IMPACT_GROUP", "DL0_TEL_POINTING_GROUP", "DL1_SUBARRAY_POINTING_GROUP", "DL1_TEL_POINTING_GROUP", @@ -60,10 +66,16 @@ # Configuration, service, and simulation group CONFIG_GROUP = "/configuration" +CONFIG_INSTRUMENT_SUBARRAY = "/configuration/instrument/subarray" +CONFIG_INSTRUMENT_SUBARRAY_LAYOUT = "/configuration/instrument/subarray/layout" +CONFIG_INSTRUMENT_TEL = "/configuration/instrument/telescope" +CONFIG_INSTRUMENT_TEL_OPTICS = "/configuration/instrument/telescope/optics" +CONFIG_INSTRUMENT_TEL_CAMERA = "/configuration/instrument/telescope/camera" SCHEDULING_BLOCK_TABLE = "/configuration/observation/scheduling_block" OBSERVATION_BLOCK_TABLE = "/configuration/observation/observation_block" SIMULATION_RUN_TABLE = "/configuration/simulation/run" FIXED_POINTING_GROUP = "/configuration/telescope/pointing" + DL1_IMAGE_STATISTICS_TABLE = "/dl1/service/image_statistics" DL2_EVENT_STATISTICS_GROUP = "/dl2/service/tel_event_statistics" SIMULATION_GROUP = "/simulation" @@ -93,6 +105,7 @@ DL2_TEL_GEOMETRY_GROUP = "/dl2/event/telescope/geometry" DL2_TEL_ENERGY_GROUP = "/dl2/event/telescope/energy" DL2_TEL_PARTICLETYPE_GROUP = "/dl2/event/telescope/particle_type" +DL2_TEL_IMPACT_GROUP = "/dl2/event/telescope/impact" DL2_GROUP = "/dl2" DL2_SUBARRAY_GROUP = "/dl2/event/subarray" diff --git a/src/ctapipe/io/hdf5merger.py b/src/ctapipe/io/hdf5merger.py index 85d96f23eed..cd60515aef2 100644 --- a/src/ctapipe/io/hdf5merger.py +++ b/src/ctapipe/io/hdf5merger.py @@ -5,6 +5,7 @@ from pathlib import Path import tables +from astropy.table import join, unique, vstack from astropy.time import Time from ..containers import EventType @@ -12,7 +13,7 @@ from ..instrument.optics import FocalLengthKind from ..instrument.subarray import SubarrayDescription from ..utils.arrays import recarray_drop_columns -from . import metadata +from . import metadata, read_table, write_table from .hdf5dataformat import ( DL0_TEL_POINTING_GROUP, DL1_CAMERA_COEFFICIENTS_GROUP, @@ -54,6 +55,8 @@ "v7.2.0", "v7.3.0", ] +SUBARRAY_EVENT_KEYS = ["obs_id", "event_id"] +TEL_EVENT_KEYS = ["obs_id", "event_id", "tel_id"] class NodeType(enum.Enum): @@ -181,13 +184,19 @@ class HDF5Merger(Component): ).tag(config=True) merge_strategy = traits.CaselessStrEnum( - ["events-multiple-obs", "events-single-ob", "monitoring-only"], + [ + "events-multiple-obs", + "events-single-ob", + "combine-telescope-data", + "monitoring-only", + ], default_value="events-multiple-obs", help=( "Strategy to handle different use cases when merging HDF5 files. " "'events-multiple-obs': allows merging event files (w and w/o monitoring data) from different observation blocks; " - "'events-single-ob': for merging events in consecutive chunks of the same OB." - "'monitoring-only': attaches horizontally monitoring data from the same observation block (requires monitoring=True)." + "'events-single-ob': for merging events in consecutive chunks of the same OB; " + "'combine-telescope-data': merges telescope-wise data from different files for the same OB (requires telescope_events=True); " + "'monitoring-only': attaches horizontally monitoring data from the same OB (requires monitoring=True)." ), ).tag(config=True) @@ -205,12 +214,21 @@ def __init__(self, output_path=None, **kwargs): self.single_ob = ( self.merge_strategy == "events-single-ob" or self.merge_strategy == "monitoring-only" + or self.merge_strategy == "combine-telescope-data" ) self.attach_monitoring = self.merge_strategy == "monitoring-only" if self.attach_monitoring and not self.monitoring: raise traits.TraitError( "Merge strategy 'monitoring-only' requires monitoring=True" ) + self.combine_telescope_data = self.merge_strategy == "combine-telescope-data" + if self.combine_telescope_data and ( + not self.telescope_events or self.dl2_telescope or self.dl2_subarray + ): + raise traits.TraitError( + "Merge strategy 'combine-telescope-data' requires telescope_events=True " + "and dl2_telescope=False and dl2_subarray=False" + ) output_exists = self.output_path.exists() appending = False @@ -232,6 +250,9 @@ def __init__(self, output_path=None, **kwargs): self.data_model_version = None self.data_category = None self.subarray = None + self.subarray_list = [] + self.tel_trigger_tables, self.shower_tables = [], [] + self.tel_ids_set = set() self.meta = None self._merged_obs_ids = set() self._n_merged = 0 @@ -249,12 +270,27 @@ def __init__(self, output_path=None, **kwargs): self.h5file, focal_length_choice=FocalLengthKind.EQUIVALENT, ) + self.subarray_list.append(self.subarray) + + # Update tel_ids_set with existing telescope IDs when appending + if self.combine_telescope_data: + self.tel_ids_set.update(self.subarray.tel_ids) # Get required nodes from existing output file self.required_nodes = self._get_required_nodes(self.h5file) # this will update _merged_obs_ids from existing input file self._check_obs_ids(self.h5file) + + # Append existing tel trigger tables and shower tables + # if the merge strategy is 'combine-telescope-data' + if self.combine_telescope_data: + self.tel_trigger_tables.append( + read_table(self.h5file, DL1_TEL_TRIGGER_TABLE) + ) + self.shower_tables.append( + read_table(self.h5file, SIMULATION_SHOWER_TABLE) + ) self._n_merged += 1 def __call__(self, other: str | Path | tables.File): @@ -395,14 +431,21 @@ def _get_required_nodes(self, h5file): def _append_simulation_data(self, other): """Append simulation-related data (run, shower, impact, images, parameters).""" + + if SIMULATION_SHOWER_TABLE in other.root: + if self.combine_telescope_data: + self.shower_tables.append(read_table(other, SIMULATION_SHOWER_TABLE)) + else: + self._append_table(other, other.root[SIMULATION_SHOWER_TABLE]) simulation_table_keys = [ SIMULATION_RUN_TABLE, SHOWER_DISTRIBUTION_TABLE, - SIMULATION_SHOWER_TABLE, ] for key in simulation_table_keys: if key in other.root: - self._append_table(other, other.root[key]) + self._append_table( + other, other.root[key], once=self.combine_telescope_data + ) if FIXED_POINTING_GROUP in other.root: self._append_table_group( @@ -437,15 +480,17 @@ def _append_waveform_data(self, other): def _append_dl1_data(self, other): """Append DL1 data (triggers, images, parameters, muon).""" - # DL1 subarray trigger table (always check) - if DL1_SUBARRAY_TRIGGER_TABLE in other.root: + if DL1_SUBARRAY_TRIGGER_TABLE in other.root and not self.combine_telescope_data: self._append_table(other, other.root[DL1_SUBARRAY_TRIGGER_TABLE]) if not self.telescope_events: return if DL1_TEL_TRIGGER_TABLE in other.root: - self._append_table(other, other.root[DL1_TEL_TRIGGER_TABLE]) + if self.combine_telescope_data: + self.tel_trigger_tables.append(read_table(other, DL1_TEL_TRIGGER_TABLE)) + else: + self._append_table(other, other.root[DL1_TEL_TRIGGER_TABLE]) if self.dl1_images and DL1_TEL_IMAGES_GROUP in other.root: self._append_table_group(other, other.root[DL1_TEL_IMAGES_GROUP]) @@ -509,7 +554,7 @@ def _append_monitoring_dl2_groups(self, other): DL2_SUBARRAY_CROSS_CALIBRATION_GROUP, ] for key in monitoring_dl2_subarray_groups: - if key in other.root: + if self.dl2_subarray and key in other.root: self._append_table(other, other.root[key], once=self.single_ob) def _append_pixel_statistics(self, other): @@ -554,6 +599,81 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() + self._flush() + + def _flush(self): + """Flush any remaining data to the output file. + + This is relevant for the 'combine-telescope-data' merge strategy, + where subarray and tel_trigger tables are only written at the end + after all files have been processed. + """ + + if not self.combine_telescope_data or not self.tel_trigger_tables: + return + + # Merge all subarrays into one + merged_subarray = SubarrayDescription.merge_subarrays(self.subarray_list) + # Write merged subarray to HDF5 (overwrite if existing) + merged_subarray.to_hdf(self.output_path, overwrite=True) + # Stack the tel_trigger tables vertically and sort by telescope event keys + combined_tel_triggers = vstack(self.tel_trigger_tables) + combined_tel_triggers.sort(TEL_EVENT_KEYS) + # Write combined telescope trigger table to HDF5 (overwrite if existing) + write_table( + combined_tel_triggers, + self.output_path, + path=DL1_TEL_TRIGGER_TABLE, + overwrite=True, + ) + # Create the subarray trigger table from the combined telescope triggers + subarray_trigger_table = combined_tel_triggers.copy() + subarray_trigger_table.keep_columns( + SUBARRAY_EVENT_KEYS + ["time", "event_type"] + ) + subarray_trigger_table = unique( + subarray_trigger_table, keys=SUBARRAY_EVENT_KEYS + ) + # Add tels_with_trigger column indicating which telescopes had a trigger for each event + tel_trigger_groups = combined_tel_triggers.group_by(SUBARRAY_EVENT_KEYS) + tel_with_trigger = [] + for tel_trigger in tel_trigger_groups.groups: + tel_with_trigger.append( + merged_subarray.tel_ids_to_mask(tel_trigger["tel_id"]) + ) + # Add the new column to the table indicating which telescopes had a trigger for each event + subarray_trigger_table.add_column( + tel_with_trigger, index=-2, name="tels_with_trigger" + ) + # Write subarray trigger table to HDF5 (overwrite if existing) + write_table( + subarray_trigger_table, + self.output_path, + DL1_SUBARRAY_TRIGGER_TABLE, + overwrite=True, + ) + # Create and write the merged shower table with only unique events + # that are also in the subarray trigger table + if self.shower_tables: + # Stack the shower tables vertically and keep only unique events + shower_table_stacked = vstack(self.shower_tables) + shower_table_stacked = unique( + shower_table_stacked, keys=SUBARRAY_EVENT_KEYS + ) + # Join with subarray trigger table to keep only events that had a trigger + shower_table = join( + shower_table_stacked, + subarray_trigger_table[SUBARRAY_EVENT_KEYS], + join_type="inner", + ) + shower_table.sort(SUBARRAY_EVENT_KEYS) + # Write shower table to HDF5 (overwrite if existing) + write_table( + shower_table, + self.output_path, + SIMULATION_SHOWER_TABLE, + overwrite=True, + ) def close(self): if hasattr(self, "h5file"): @@ -576,22 +696,40 @@ def _append_subarray(self, other): other, focal_length_choice=FocalLengthKind.EQUIVALENT ) + # Check for duplicate telescope IDs when combining telescope events + if self.combine_telescope_data: + new_tel_ids = set(subarray.tel_ids) + duplicates = self.tel_ids_set.intersection(new_tel_ids) + if duplicates: + raise ValueError( + f"Duplicate telescope IDs found when merging file {other.filename}: {sorted(duplicates)}. " + "Each telescope ID must be unique across all input files when using " + "the merge strategy 'combine-telescope-data'." + ) + self.tel_ids_set.update(new_tel_ids) + + self.subarray_list.append(subarray) + if self.subarray is None: self.subarray = subarray - self.subarray.to_hdf(self.h5file) - - # Relax subarray matching requirements for attaching - # monitoring data of the same observation block. - if not self.single_ob or not self.attach_monitoring: - if self.subarray != subarray: - raise CannotMerge(f"Subarrays do not match for file: {other.filename}") - else: - if not SubarrayDescription.check_matching_subarrays( - [self.subarray, subarray] - ): - raise CannotMerge( - f"Subarrays are not compatible for file: {other.filename}" - ) + if not self.combine_telescope_data: + self.subarray.to_hdf(self.h5file) + + if not self.combine_telescope_data: + # Relax subarray matching requirements for attaching + # monitoring data of the same observation block. + if not self.single_ob or not self.attach_monitoring: + if self.subarray != subarray: + raise CannotMerge( + f"Subarrays do not match for file: {other.filename}" + ) + else: + if not SubarrayDescription.check_matching_subarrays( + [self.subarray, subarray] + ): + raise CannotMerge( + f"Subarrays are not compatible for file: {other.filename}" + ) def _append_table_group(self, file, input_group, filter_columns=None, once=False): """Add a group that has a number of child tables to outputfile""" diff --git a/src/ctapipe/tools/merge.py b/src/ctapipe/tools/merge.py index 7c08a3622ae..4e119eaafe0 100644 --- a/src/ctapipe/tools/merge.py +++ b/src/ctapipe/tools/merge.py @@ -94,6 +94,10 @@ class MergeTool(Tool): {"HDF5Merger": {"merge_strategy": "monitoring-only"}}, ("Attach monitoring data from the same observation block."), ), + "combine-telescope-data": ( + {"HDF5Merger": {"merge_strategy": "combine-telescope-data"}}, + ("Combine telescope-wise data from the same observation block."), + ), "progress": ( {"MergeTool": {"progress_bar": True}}, "Show a progress bar for all given input files", diff --git a/src/ctapipe/tools/tests/test_merge.py b/src/ctapipe/tools/tests/test_merge.py index 429ef121289..d72c6f39c68 100644 --- a/src/ctapipe/tools/tests/test_merge.py +++ b/src/ctapipe/tools/tests/test_merge.py @@ -14,6 +14,14 @@ from ctapipe.core import ToolConfigurationError, run_tool from ctapipe.io import DataWriter, EventSource, TableLoader from ctapipe.io.astropy_helpers import read_table +from ctapipe.io.hdf5dataformat import ( + DL1_TEL_MUON_GROUP, + DL2_EVENT_STATISTICS_GROUP, + DL2_SUBARRAY_GEOMETRY_GROUP, + OBSERVATION_BLOCK_TABLE, + SCHEDULING_BLOCK_TABLE, + SIMULATION_IMAGES_GROUP, +) from ctapipe.io.tests.test_astropy_helpers import assert_table_equal from ctapipe.tools.process import ProcessorTool @@ -108,7 +116,7 @@ def test_skip_images(tmp_path, dl1_file, dl1_proton_file): assert "images" in f.root.simulation.event.telescope assert "parameters" in f.root.dl1.event.telescope - t = read_table(output, "/simulation/event/telescope/images/tel_001") + t = read_table(output, f"{SIMULATION_IMAGES_GROUP}/tel_001") assert "true_image" not in t.colnames assert "true_image_sum" in t.colnames @@ -128,13 +136,13 @@ def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file): ) table1 = read_table( - dl2_shower_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor" + dl2_shower_geometry_file, f"{DL2_SUBARRAY_GEOMETRY_GROUP}/HillasReconstructor" ) table2 = read_table( - dl2_proton_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor" + dl2_proton_geometry_file, f"{DL2_SUBARRAY_GEOMETRY_GROUP}/HillasReconstructor" ) table_merged = read_table( - output, "/dl2/event/subarray/geometry/HillasReconstructor" + output, f"{DL2_SUBARRAY_GEOMETRY_GROUP}/HillasReconstructor" ) diff = StringIO() @@ -143,7 +151,7 @@ def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file): f"Merged table not equal to individual tables. Diff:\n {diff.getvalue()}" ) - stats_key = "/dl2/service/tel_event_statistics/HillasReconstructor" + stats_key = f"{DL2_EVENT_STATISTICS_GROUP}/HillasReconstructor" merged_stats = read_table(output, stats_key) stats1 = read_table(dl2_shower_geometry_file, stats_key) stats2 = read_table(dl2_proton_geometry_file, stats_key) @@ -152,8 +160,8 @@ def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file): assert np.all(merged_stats[col] == (stats1[col] + stats2[col])) # test reading configurations as well: - obs = read_table(output, "/configuration/observation/observation_block") - sbs = read_table(output, "/configuration/observation/scheduling_block") + obs = read_table(output, OBSERVATION_BLOCK_TABLE) + sbs = read_table(output, SCHEDULING_BLOCK_TABLE) assert len(obs) == 2, "should have two OB entries" assert len(sbs) == 2, "should have two SB entries" @@ -182,8 +190,8 @@ def test_muon(tmp_path, dl1_muon_output_file): raises=True, ) - table = read_table(output, "/dl1/event/telescope/muon/tel_001") - input_table = read_table(dl1_muon_output_file, "/dl1/event/telescope/muon/tel_001") + table = read_table(output, f"{DL1_TEL_MUON_GROUP}/tel_001") + input_table = read_table(dl1_muon_output_file, f"{DL1_TEL_MUON_GROUP}/tel_001") n_input = len(input_table) assert len(table) == n_input @@ -295,6 +303,195 @@ def test_merge_single_ob_append(tmp_path, dl1_file, dl1_chunks): assert_table_equal(merged_tel_events, initial_tel_events) +def test_merge_telescope_data(tmp_path, prod6_gamma_simtel_path): + """ + Test merging telescope events from different files produces same result + as processing all telescopes together. + """ + + from ctapipe.core import traits + from ctapipe.io.hdf5merger import CannotMerge + from ctapipe.tools.merge import MergeTool + from ctapipe.tools.process import ProcessorTool + + # To be dropped from comparison + TIMING_COLUMNS = [ + "timing_intercept", + "timing_deviation", + "timing_slope", + ] + common_argv = [ + f"--input={prod6_gamma_simtel_path}", + "--write-images", + ] + outputs = { + "ref": tmp_path / "gamma_ref.dl1.h5", + "sub1": tmp_path / "gamma_sub1.dl1.h5", + "sub2": tmp_path / "gamma_sub2.dl1.h5", + "sub2_dl1b": tmp_path / "gamma_sub2_noimages.dl1b.h5", + "merged": tmp_path / "gamma_merged.dl1.h5", + "merged_appendmode": tmp_path / "gamma_merged_appendmode.dl1.h5", + "invalid": tmp_path / "invalid.dl1.h5", + "required_node_invalid": tmp_path / "required_node_invalid.dl1.h5", + } + # Select a few telescopes that cover different telescope types + # and have at least one triggered event in the simulated file. + allowed_tels = [1, 4, 5, 9, 13, 17, 25] + allowed_tels_strings = [ + f"--EventSource.allowed_tels={tel_id}" for tel_id in allowed_tels + ] + tel_sets = [ + ("ref", allowed_tels_strings), + ("sub1", allowed_tels_strings[:4]), + ("sub2", allowed_tels_strings[4:]), + ] + # Run ProcessorTool for each subset + for name, tel_args in tel_sets: + run_tool( + ProcessorTool(), + argv=[ + *common_argv, + *tel_args, + f"--output={outputs[name]}", + ], + cwd=tmp_path, + ) + + # For append mode test, copy one of the subset files to start with + shutil.copy(outputs["sub1"], outputs["merged_appendmode"]) + # Merge subset files into single file which should match reference + # Test both normal merge and append mode + merger_mode_argv = { + "merged": [str(outputs["sub1"])], + "merged_appendmode": ["--append"], + } + for merged_mode_name in ["merged", "merged_appendmode"]: + run_tool( + MergeTool(), + argv=merger_mode_argv[merged_mode_name] + + [ + str(outputs["sub2"]), + f"--output={outputs[merged_mode_name]}", + "--telescope-events", + "--no-dl2-telescope", + "--no-dl2-subarray", + "--combine-telescope-data", + ], + cwd=tmp_path, + raises=True, + ) + + # Compare merged result with reference + with ( + TableLoader(outputs[merged_mode_name]) as merged_loader, + TableLoader(outputs["ref"]) as ref_loader, + ): + # Compare telescope data for each telescope + for tel_id in allowed_tels: + merged_telescope_data = merged_loader.read_telescope_events( + telescopes=[tel_id], dl1_images=True + ) + reference_telescope_data = ref_loader.read_telescope_events( + telescopes=[tel_id], dl1_images=True + ) + # Assert equality of the two tables after removing timing columns + merged_telescope_data.remove_columns(TIMING_COLUMNS) + reference_telescope_data.remove_columns(TIMING_COLUMNS) + assert_table_equal(merged_telescope_data, reference_telescope_data) + # Compare subarray data + merged_subarray_data = merged_loader.read_subarray_events() + reference_subarray_data = ref_loader.read_subarray_events() + assert_table_equal(merged_subarray_data, reference_subarray_data) + + # Check that merging files with overlapping telescope IDs raises an error + # When combining telescope data, telescope IDs must be unique. + argv_options = [ + str(outputs["sub1"]), + str(outputs[merged_mode_name]), + f"--output={outputs['invalid']}", + "--combine-telescope-data", + ] + with pytest.raises( + ValueError, match="Duplicate telescope IDs found when merging file" + ): + run_tool( + MergeTool(), + argv=[ + *argv_options, + "--telescope-events", + "--no-dl2-telescope", + "--no-dl2-subarray", + ], + cwd=tmp_path, + raises=True, + ) + + # Check that merging files with incompatible options raises a TraitError + traits_error_msg = "Merge strategy 'combine-telescope-data' requires" + with pytest.raises(traits.TraitError, match=traits_error_msg): + run_tool( + MergeTool(), + argv=[ + *argv_options, + "--no-telescope-events", + "--no-dl2-subarray", + ], + cwd=tmp_path, + raises=True, + ) + with pytest.raises(traits.TraitError, match=traits_error_msg): + run_tool( + MergeTool(), + argv=[ + *argv_options, + "--telescope-events", + "--no-dl2-telescope", + "--dl2-subarray", + ], + cwd=tmp_path, + raises=True, + ) + with pytest.raises(traits.TraitError, match=traits_error_msg): + run_tool( + MergeTool(), + argv=[ + *argv_options, + "--telescope-events", + "--dl2-telescope", + "--no-dl2-subarray", + ], + cwd=tmp_path, + raises=True, + ) + # Check that merging files. with different data levels raises an error + # When combining telescope data, data levels must match. + run_tool( + ProcessorTool(), + argv=[ + f"--input={prod6_gamma_simtel_path}", + "--no-write-images", + *allowed_tels_strings[4:], + f"--output={outputs['sub2_dl1b']}", + ], + cwd=tmp_path, + ) + with pytest.raises(CannotMerge, match="Required node"): + run_tool( + MergeTool(), + argv=[ + str(outputs["sub1"]), + str(outputs["sub2_dl1b"]), + "--telescope-events", + "--combine-telescope-data", + "--no-dl2-subarray", + "--no-dl2-telescope", + f"--output={outputs['required_node_invalid']}", + ], + cwd=tmp_path, + raises=True, + ) + + def test_merge_exceptions( tmp_path, calibpipe_camcalib_sims_single_chunk, dl1_mon_pointing_file ):