Skip to content

Commit

Permalink
Trapezes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Willemsen committed Feb 12, 2025
1 parent a4795d3 commit 1c2e034
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 130 deletions.
62 changes: 35 additions & 27 deletions src/ibex_bluesky_core/callbacks/fitting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,36 +132,44 @@ def update_fit(self) -> None:


def center_of_mass(x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> float:
"""Compute our own centre of mass.
"""Compute the centre of mass of the area under a curve.
The "area under the curve" is a shape bounded by:
- Straight line segments joining (x, y) data points, along the top edge
- min(y), at the bottom
- min(x), on the left-hand side
- max(x), on the right-hand side
Follow these rules:
Background does not skew CoM
Order of data does not skew CoM
Non constant point spacing does not skew CoM
Assumes that the peak is positive
- Positive or negative background does not skew CoM
- Order of data does not skew CoM
- Non constant point spacing does not skew CoM
- Always calculates CoM of the area *under* the curve, regardless of sign of Y data.
"""
# Offset points for any background
# Sort points in terms of x
arg_sorted = np.argsort(x)
x_sorted = np.take_along_axis(x, arg_sorted, axis=None)
y_sorted = np.take_along_axis(y - np.min(y), arg_sorted, axis=None)

# Each point has its own weight given by its distance to its neighbouring point
# Edge cases are calculated as x_1 - x_0 and x_-1 - x_-2

x_diff = np.diff(x_sorted)

weight = np.array([x_diff[0]])
weight = np.append(weight, (x_diff[1:] + x_diff[:-1]) / 2)
weight = np.append(weight, [x_diff[-1]])

weight /= np.max(weight) # Normalise weights in terms of max(weights)

sum_xyw = np.sum(x_sorted * y_sorted * weight) # Weighted CoM calculation
sum_yw = np.sum(y_sorted * weight)
com_x = sum_xyw / sum_yw

return com_x
# Ensure we do not take CoM of negative masses, sort for ascending X data.
sort_indices = np.argsort(x, kind="stable")
x = np.take_along_axis(x, sort_indices, axis=None)
y = np.take_along_axis(y - np.min(y), sort_indices, axis=None)

widths = np.diff(x)

# Area under the curve for two adjacent points is a right trapezoid.
# Split that trapezoid into a rectangular region, plus a right triangle.
# Find area and effective X CoM for each.
rect_areas = widths * np.minimum(y[:-1], y[1:])
rect_x_com = (x[:-1] + x[1:]) / 2.0
triangle_areas = widths * np.abs(y[:-1] - y[1:]) / 2.0
triangle_x_com = np.where(
y[:-1] > y[1:], x[:-1] + (widths / 3.0), x[:-1] + (2.0 * widths / 3.0)
)

if np.sum(rect_areas + triangle_areas) == 0.0:
# If all data was flat, return central x
return (x[0] + x[-1]) / 2.0

return np.sum(rect_areas * rect_x_com + triangle_areas * triangle_x_com) / np.sum(
rect_areas + triangle_areas
)


@make_class_safe(logger=logger) # pyright: ignore (pyright doesn't understand this decorator)
Expand Down
161 changes: 58 additions & 103 deletions tests/callbacks/fitting/test_centre_of_mass.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,69 @@
import numpy as np
import numpy.typing as npt
import pytest

from ibex_bluesky_core.callbacks.fitting import PeakStats

# Tests:
# Test with normal scan with gaussian data
# Check that asymmetrical data does not skew CoM
# Check that having a background on data does not skew CoM
# Check that order of documents does not skew CoM
# Check that point spacing does not skew CoM


def gaussian(
x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
return (x, amp * np.exp(-((x - x0) ** 2) / (2 * sigma**2)) + bg)


def simulate_run_and_return_com(xy: tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]):
ps = PeakStats("x", "y")

ps.start({}) # pyright: ignore

for x, y in np.vstack(xy).T:
ps.event({"data": {"x": x, "y": y}}) # pyright: ignore

ps.stop({}) # pyright: ignore

return ps["com"]


@pytest.mark.parametrize(
("x", "amp", "sigma", "x0", "bg"),
[
(np.arange(-2, 3), 1, 1, 0, 0),
(np.arange(-4, 1), 1, 1, -2, 0),
],
)
def test_normal_scan(x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float):
xy = gaussian(x, amp, sigma, x0, bg)
com = simulate_run_and_return_com(xy)
assert com == pytest.approx(x0, abs=1e-4)


@pytest.mark.parametrize(
("x", "amp", "sigma", "x0", "bg"),
("data", "expected_com"),
[
(np.arange(-4, 10), 1, 1, 0, 0),
(np.arange(-6, 20), 1, 1, -2, 0),
# Simplest case:
# - Flat, non-zero Y data
# - Evenly spaced, monotonically increasing X data
([(0, 1), (2, 1), (4, 1), (6, 1), (8, 1), (10, 1)], 5.0),
# Simple triangular peak
([(0, 0), (1, 1), (2, 2), (3, 1), (4, 0), (5, 0)], 2.0),
# Simple triangular peak with non-zero base
([(0, 10), (1, 11), (2, 12), (3, 11), (4, 10), (5, 10)], 2.0),
# No data at all
([], None),
# Only one point, com should be at that one point regardless whether it
# measured zero or some other y value.
([(5, 0)], 5.0),
([(5, 50)], 5.0),
# Two points, flat data, com should be in the middle
([(0, 5), (10, 5)], 5.0),
# Flat, logarithmically spaced data - CoM should be in centre of measured range.
([(1, 3), (10, 3), (100, 3), (1000, 3), (10000, 3)], 5000.5),
# "triangle" defined by area under two points
# (CoM of a right triangle is 1/3 along x from right angle)
([(0, 0), (3, 6)], 2.0),
([(0, 6), (3, 0)], 1.0),
# Cases with the first/last points not having equal spacings with each other
([(0, 1), (0.1, 1), (4, 1), (5, 0), (6, 1), (10, 1)], 5.0),
([(0, 1), (4, 1), (5, 0), (6, 1), (9.9, 1), (10, 1)], 5.0),
# Two triangular peaks next to each other, with different point spacings
# but same shapes, over a base of zero.
([(0, 0), (1, 1), (2, 2), (3, 1), (4, 0), (6, 2), (8, 0), (10, 0)], 4.0),
([(0, 0), (2, 2), (4, 0), (5, 1), (6, 2), (7, 1), (8, 0), (10, 0)], 4.0),
# Two triangular peaks next to each other, with different point spacings
# but same shapes, over a base of 10.
([(0, 10), (1, 11), (2, 12), (3, 11), (4, 10), (6, 12), (8, 10), (10, 10)], 4.0),
([(0, 10), (2, 12), (4, 10), (5, 11), (6, 12), (7, 11), (8, 10), (10, 10)], 4.0),
# "Narrow" peak over a base of 0
([(0, 0), (4.999, 0), (5.0, 10), (5.001, 0)], 5.0),
# "Narrow" peak as above, over a base of 10 (y translation should not
# affect CoM)
([(0, 10), (4.999, 10), (5.0, 20), (5.001, 10)], 5.0),
# Non-monotonically increasing x data (e.g. from adaptive scan)
([(0, 0), (2, 2), (1, 1), (3, 1), (4, 0)], 2.0),
# Overscanned data (all measurements duplicated, e.g. there-and-back scan)
([(0, 0), (1, 1), (2, 0), (2, 0), (1, 1), (0, 0)], 1.0),
# Mixed positive/negative data. This explicitly calculates area *under* curve,
# so CoM should still be the CoM of the positive peak in this data.
([(0, -1), (1, 0), (2, -1), (3, -1)], 1.0),
# Y data with a single positive peak, which happens
# to sum to zero but never contains zero.
([(0, -1), (1, 3), (2, -1), (3, -1)], 1.0),
# Y data which happens to sum to *nearly* zero
([(0, -1), (1, 3.000001), (2, -1), (3, -1)], 1.0),
],
)
def test_asymmetrical_scan(
x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float
):
xy = gaussian(x, amp, sigma, x0, bg)
com = simulate_run_and_return_com(xy)
assert com == pytest.approx(x0, abs=1e-4)


@pytest.mark.parametrize(
("x", "amp", "sigma", "x0", "bg"),
[
(np.arange(-2, 3), 1, 1, 0, 3),
(np.arange(-4, 1), 1, 1, -2, -0.5),
(np.arange(-4, 1), 1, 1, -2, -3),
],
)
def test_background_gaussian_scan(
x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float
):
xy = gaussian(x, amp, sigma, x0, bg)
com = simulate_run_and_return_com(xy)
assert com == pytest.approx(x0, abs=1e-4)


@pytest.mark.parametrize(
("x", "amp", "sigma", "x0", "bg"),
[
(np.array([0, -2, 2, -1, 1]), 1, 1, 0, 0),
(np.array([-4, 0, -2, -3, -1]), 1, 1, -2, 0),
],
)
def test_non_continuous_scan(
x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float
):
xy = gaussian(x, amp, sigma, x0, bg)
com = simulate_run_and_return_com(xy)
assert com == pytest.approx(x0, abs=1e-4)
def test_compute_com(data: list[tuple[float, float]], expected_com):
ps = PeakStats("x", "y")
ps.start({}) # pyright: ignore

for x, y in data:
ps.event({"data": {"x": x, "y": y}}) # pyright: ignore

@pytest.mark.parametrize(
("x", "amp", "sigma", "x0", "bg"),
[
(np.append(np.arange(-10, -2, 0.05), np.arange(-2, 4, 0.5)), 1, 0.5, 0, 0),
(
np.concatenate(
(np.arange(-5, -2.0, 0.5), np.arange(-2.5, -1.45, 0.05), np.arange(-1.5, 1, 0.5)),
axis=0,
),
1,
0.25,
0,
0,
),
],
)
def test_non_constant_point_spacing_scan(
x: npt.NDArray[np.float64], amp: float, sigma: float, x0: float, bg: float
):
xy = gaussian(x, amp, sigma, x0, bg)
com = simulate_run_and_return_com(xy)
assert com == pytest.approx(x0, abs=1e-3)
ps.stop({}) # pyright: ignore
assert ps["com"] == pytest.approx(expected_com)

0 comments on commit 1c2e034

Please sign in to comment.