Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Fixed:
- Fixed counting threshold for `CookieboxCalibration` (!498).
- [CookieboxCalibration][extra.applications.CookieboxCalibration] bug fix for
new `extra.applications.base.SerializableMixin` interface (!506).
- [CookieboxCalibration][extra.applications.CookieboxCalibration] transmission
corrected based on pulse energy (!512).

Changed:

Expand Down
94 changes: 46 additions & 48 deletions src/extra/applications/cookiebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import xarray as xr
import pandas as pd
from scipy.signal import find_peaks
from scipy.ndimage import gaussian_filter

from .base import SerializableMixin
from .cookiebox_deconvolve import TOFAnalogResponse
Expand Down Expand Up @@ -47,7 +48,7 @@ def search_offset(trace: np.ndarray, sigma: float=20) -> int:
peak_idx = np.argmax(smoothened)
return peak_idx - 200

def search_roi(roi: np.ndarray) -> np.ndarray:
def search_roi(roi: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Find highest peaks in the 1D trace.

Expand All @@ -56,9 +57,21 @@ def search_roi(roi: np.ndarray) -> np.ndarray:

Returns: Peak position.
"""
import scipy
p, _ = scipy.signal.find_peaks(roi, prominence=(0.25*np.max(roi), None))
return p
roi_smooth = gaussian_filter(roi, sigma=5)
p, prop = find_peaks(roi_smooth, prominence=0.25*np.max(roi_smooth), width=1, height=0)
idx = np.argsort(prop["peak_heights"])[::-1]
p = p[idx]
w = prop["widths"][idx]
if len(p) == 0:
logging.info(f"Failed to find peaks.")
return -1, 0
if len(p) == 1:
logging.info(f"Failed to find two peaks. If there is no Auger peak, set start_roi=0")
return -1, 0
idx = np.argsort(p[:2])
p = p[:2][idx]
w = w[:2][idx]
return p[-1], w[-1]

def model(ts: np.ndarray, c: float, e0: float, t0: float) -> np.ndarray:
"""
Expand Down Expand Up @@ -177,16 +190,13 @@ def calc_mean(itr: Tuple[int, int], scan: Scan, xgm_data: xr.DataArray, tof: Dic
tof_data = tof_data.pulse_data(pulse_dim='pulseIndex', parallel=parallel)

# select XGM
if xgm_threshold > 0:
mask = xgm_data.coords["trainId"].isin(good_ids)
sel_xgm_data = xgm_data[mask]
tof_data = tof_data.loc[sel_xgm_data > xgm_threshold, :]
tof_xgm_data = sel_xgm_data.loc[sel_xgm_data > xgm_threshold]
mask = xgm_data.coords["trainId"].isin(good_ids)
sel_xgm_data = xgm_data[mask]
tof_data = tof_data.loc[sel_xgm_data > xgm_threshold, :]
tof_xgm_data = sel_xgm_data.loc[sel_xgm_data > xgm_threshold].to_numpy()

out_data = -tof_data.mean('pulse')
out_xgm = 0.0
if xgm_threshold > 0:
out_xgm = tof_xgm_data.mean('pulse').to_numpy()
out_data = -tof_data.mean("pulse")
out_xgm = np.mean(tof_xgm_data)
else:
# option 2: count photon peaks
# in this case, ignore the XGM, as it is only used for cleaning the data
Expand All @@ -198,9 +208,7 @@ def calc_mean(itr: Tuple[int, int], scan: Scan, xgm_data: xr.DataArray, tof: Dic
out_data, _ = np.histogram(tof_data.edge, bins=bins, weights=-tof_data.amplitude)

out_data = xr.DataArray(out_data, dims=('sample'), coords={'sample': bins[:-1]})
out_xgm = 0.0
if xgm_threshold > 0:
out_xgm = xgm_data.mean('pulse').to_numpy()
out_xgm = np.mean(xgm_data.to_numpy())

if correction_fn is not None:
out_data = correction_fn[tof_id](out_data)
Expand Down Expand Up @@ -691,16 +699,13 @@ def update_roi(self, parallel=None):
logging.info("Reading calibration data ... (this takes a while)")
self.select_calibration_data(parallel)
# find RoI if needed
if (self.auger_start_roi is None
or self.start_roi is None
or self.stop_roi is None):
logging.info("Finding RoI ...")
logging.info("(This may fail. If it does, please provide a `auger_start_roi`, `start_roi` and `stop_roi`.)")
for tof_id in self.kwargs_adq.keys():
for tof_id in self.kwargs_adq.keys():
if (self.auger_start_roi[tof_id] is None
or self.start_roi[tof_id] is None
or self.stop_roi[tof_id] is None):
logging.info(f"Finding RoI for TOF {tof_id}...")
self.find_roi(tof_id)
logging.info(f"Auger start RoIs found: {self.auger_start_roi}")
logging.info(f"Start RoIs found: {self.start_roi}")
logging.info(f"Stop RoIs found: {self.stop_roi}")
logging.info(f"Start RoIs found: {self.start_roi[tof_id]}")

def update_fit_result(self):
"""
Expand Down Expand Up @@ -781,38 +786,32 @@ def find_roi(self, tof_id: int):
Args:
tof_id: The eTOF ID.
"""
auger = list()
roi = list()
p = list()
w = list()
N = self.calibration_data[tof_id].shape[-1]
for e in range(self.calibration_data[tof_id].shape[0]):
d = self.calibration_data[tof_id][e, :]
peaks = search_roi(d)
peaks = sorted(peaks)
if len(peaks) < 2:
logging.info(f"Failed to find peaks for eTOF {tof_id}, energy index {e}. "
f"Check the data quality.")
peak, width = search_roi(d)
if peak < 0:
continue
auger += [peaks[0]]
roi += [peaks[1]]
if len(auger) == 0 or len(roi) == 0:
p += [peak]
w += [width]
if len(p) == 0:
logging.info(f"No peaks found in eTOF {tof_id}. "
f"Check the data quality. "
f"I will set the RoI to collect non-sense,"
f" so this TOF data will be meaningless. "
f"It will also be masked.")
self.auger_start_roi[tof_id] = 0
self.start_roi[tof_id] = 100
self.stop_roi[tof_id] = 200
self.stop_roi[tof_id] = N
self.mask[tof_id] = False
return
a = min(auger)
b = min(roi)
dab = abs(b - a)
stop_roi = max(roi) + int(dab/2)
auger_start_roi = int(a - dab/2)
start_roi = int(b - dab/2)
self.auger_start_roi[tof_id] = auger_start_roi
self.start_roi[tof_id] = start_roi
self.stop_roi[tof_id] = stop_roi
idx = np.argmin(p)
pos = p[idx] - 2*w[idx]
self.auger_start_roi[tof_id] = 0
self.start_roi[tof_id] = int(pos)
self.stop_roi[tof_id] = N

def plot_calibration_data(self):
"""Plot data for checks.
Expand Down Expand Up @@ -966,9 +965,8 @@ def calculate_calibration_and_transmission(self, tof_id: int):
detected = self.tof_fit_result[tof_id].A
#produced = self.calibration_mean_xgm[tof_id] * self.tof_fit_result[tof_id].Aa * dsig_dth
#produced = self.tof_fit_result[tof_id].Aa * dsig_dth
produced = dsig_dth
if produced == 0:
produced += 1e-1
produced = dsig_dth*self.calibration_mean_xgm[tof_id][mask][eidx]
produced[produced == 0] = 0.1 # avoid division by zero
en = detected[mask][eidx]/produced
# interpolate normalization
self.normalization[tof_id] = np.interp(self.energy_axis,
Expand Down
46 changes: 46 additions & 0 deletions tests/test_applications_cookiebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,52 @@ def test_fit(tmp_path):
for tof_id, v in correct.items():
assert np.allclose(cal_read.model_params[tof_id], v, rtol=1e-2, atol=1e-2)

def test_detect_roi_and_fit_single_channel(mock_sqs_etof_calibration_run, tmp_path, mock_etof_mono_energies, mock_etof_calibration_constants):
# use mock data
pulse_timing = 'SQS_RR_UTC/TSYS/TIMESERVER'
monochromator_energy = 'SA3_XTD10_MONO/MDL/PHOTON_ENERGY'
digitizer = 'SQS_DIGITIZER_UTC4/ADC/1:network'
digitizer_control = 'SQS_DIGITIZER_UTC4/ADC/1'
pulse_energy = 'SQS_DIAG1_XGMD/XGM/DOOCS'
mock_sqs_etof_calibration_run = mock_sqs_etof_calibration_run.select([pulse_timing,
digitizer, digitizer_control,
pulse_energy, f"{pulse_energy}:output",
monochromator_energy], require_all=True).select_trains(np.s_[10:])
channel_name = "1_A"
tof_ids = [0]
tof_channel = {}
tof_channel[0] = AdqRawChannel(mock_sqs_etof_calibration_run,
channel_name,
digitizer=digitizer,
first_pulse_offset=1000)
scan = Scan(mock_sqs_etof_calibration_run[monochromator_energy, "actualEnergy"], resolution=2)
energy_axis = np.linspace(965, 1070, 160)
xgm = XGM(mock_sqs_etof_calibration_run, pulse_energy)
cal = CookieboxCalibration(
auger_start_roi=None,
start_roi=None,
stop_roi=None,
)
cal.setup(run=mock_sqs_etof_calibration_run, energy_axis=energy_axis, tof_settings=tof_channel,
xgm=xgm,
scan=scan)
correct_energies = np.unique(mock_etof_mono_energies)
correct_constants = np.array(mock_etof_calibration_constants)
for tof_id in tof_ids:
assert np.allclose(cal.tof_fit_result[tof_id].energy, correct_energies, rtol=1e-2, atol=1e-2)

energy = correct_energies

# get calibration curve
c, e0, t0 = cal.model_params[tof_id]
ts = t0 + np.sqrt(c/(energy - e0))

c_true, e0_true, t0_true = correct_constants
ts_true = t0_true + np.sqrt(c_true/(energy - e0_true))

# check how well it matches
assert np.allclose(ts, ts_true, rtol=1e-2, atol=1e-2)

def test_avg_and_fit_single_channel(mock_sqs_etof_calibration_run, tmp_path, mock_etof_mono_energies, mock_etof_calibration_constants):
# use mock data
pulse_timing = 'SQS_RR_UTC/TSYS/TIMESERVER'
Expand Down
Loading