|
| 1 | +from copy import deepcopy |
| 2 | + |
1 | 3 | import astropy.units as u |
2 | 4 | import numpy as np |
3 | 5 | import pytest |
|
6 | 8 |
|
7 | 9 | from ctapipe.image.reducer import NullDataVolumeReducer, TailCutsDataVolumeReducer |
8 | 10 | from ctapipe.instrument import SubarrayDescription |
| 11 | +from ctapipe.io import EventSource |
9 | 12 |
|
10 | 13 |
|
11 | 14 | @pytest.fixture(scope="module") |
@@ -92,3 +95,72 @@ def test_tailcuts_data_volume_reducer(subarray_lst): |
92 | 95 |
|
93 | 96 | assert (reduced_waveforms != 0).sum() == (1 + 4 + 14) * n_samples |
94 | 97 | assert_array_equal(expected_waveforms, reduced_waveforms) |
| 98 | + |
| 99 | + |
| 100 | +def test_readout_window_reducer(prod5_gamma_simtel_path): |
| 101 | + from ctapipe.image.reducer import ReadoutWindowReducer |
| 102 | + |
| 103 | + def check(subarray, event, event_reduced, event_no_op): |
| 104 | + checked = 0 |
| 105 | + for dl in ("r0", "r1", "dl0"): |
| 106 | + for tel_id in getattr(event, dl).tel.keys(): |
| 107 | + if str(subarray.tel[tel_id]).startswith("LST"): |
| 108 | + start = 10 |
| 109 | + end = 20 |
| 110 | + n_samples = end - start |
| 111 | + elif "Nectar" in str(subarray.tel[tel_id]): |
| 112 | + start = 15 |
| 113 | + end = 40 |
| 114 | + n_samples = end - start |
| 115 | + else: |
| 116 | + start = None |
| 117 | + end = None |
| 118 | + n_samples = subarray.tel[tel_id].camera.readout.n_samples |
| 119 | + |
| 120 | + original_waveform = getattr(event, dl).tel[tel_id].waveform |
| 121 | + reduced_waveform = getattr(event_reduced, dl).tel[tel_id].waveform |
| 122 | + no_op_waveform = getattr(event_no_op, dl).tel[tel_id].waveform |
| 123 | + |
| 124 | + assert reduced_waveform.ndim == 3 |
| 125 | + assert reduced_waveform.shape[-1] == n_samples |
| 126 | + np.testing.assert_array_equal(original_waveform, no_op_waveform) |
| 127 | + np.testing.assert_array_equal( |
| 128 | + original_waveform[..., start:end], reduced_waveform |
| 129 | + ) |
| 130 | + |
| 131 | + checked += 1 |
| 132 | + return checked |
| 133 | + |
| 134 | + with EventSource(prod5_gamma_simtel_path) as source: |
| 135 | + reducer_no_op = ReadoutWindowReducer(source.subarray) |
| 136 | + reducer = ReadoutWindowReducer( |
| 137 | + source.subarray, |
| 138 | + window_start=[ |
| 139 | + ("type", "*", None), |
| 140 | + ("type", "LST*", 10), |
| 141 | + ("type", "*Nectar*", 15), |
| 142 | + ], |
| 143 | + window_end=[ |
| 144 | + ("type", "*", None), |
| 145 | + ("type", "LST*", 20), |
| 146 | + ("type", "*Nectar*", 40), |
| 147 | + ], |
| 148 | + ) |
| 149 | + |
| 150 | + # make sure we didn't modify the original subarray |
| 151 | + assert source.subarray.tel[1].camera.readout.n_samples == 40 |
| 152 | + assert reducer_no_op.subarray.tel[1].camera.readout.n_samples == 40 |
| 153 | + # new subarray should have reduced window |
| 154 | + assert reducer.subarray.tel[1].camera.readout.n_samples == 10 |
| 155 | + |
| 156 | + n_checked = 0 |
| 157 | + for event in source: |
| 158 | + event_no_op = deepcopy(event) |
| 159 | + event_reduced = deepcopy(event) |
| 160 | + |
| 161 | + reducer(event_reduced) |
| 162 | + reducer_no_op(event_no_op) |
| 163 | + |
| 164 | + n_checked += check(source.subarray, event, event_reduced, event_no_op) |
| 165 | + |
| 166 | + assert n_checked > 0 |
0 commit comments