diff --git a/pytensor/tensor/math.py b/pytensor/tensor/math.py index d1e4dc6195..aa1fdbdf26 100644 --- a/pytensor/tensor/math.py +++ b/pytensor/tensor/math.py @@ -2926,6 +2926,87 @@ def nan_to_num(x, nan=0.0, posinf=None, neginf=None): return x +def quantile(x: TensorLike, q: float | list[float], axis=None) -> TensorVariable: + """ + Computes the q-th quantile along the given axis(es) of a tensor `input`. + + Parameters + ---------- + x: TensorVariable + The input tensor. + q: float or list of float + The quantile(s) to compute, which must be between 0 and 1 inclusive. + 0 corresponds to the minimum, 0.5 to the median, and 1 to the maximum. + axis: None or int or (list of int) (see `Sum`) + Compute the quantile along this axis of the tensor. + None means all axes (like numpy). + """ + + x = as_tensor_variable(x) + q = as_tensor_variable(q) + x_ndim = x.type.ndim + if axis is None: + axis = list(range(x_ndim)) + else: + axis = list(normalize_axis_tuple(axis, x_ndim)) + + non_axis = [i for i in range(x_ndim) if i not in axis] + non_axis_shape = [x.shape[i] for i in non_axis] + + # Put axis at the end and unravel them + x_raveled = x.transpose(*non_axis, *axis) + if len(axis) > 1: + x_raveled = x_raveled.reshape((*non_axis_shape, -1)) + raveled_size = x_raveled.shape[-1] + + # Ensure q is between 0 and 1 + q = clip(q, 0.0, 1.0) + + # Compute quantile indices + k = (q * (raveled_size - 1)).astype("int64") + k_float = q * (raveled_size - 1) + + # Sort the input tensor along the specified axis + x_sorted = x_raveled.sort(axis=-1) + + # Get the values at index k and k + 1 for linear interpolation + k_values = x_sorted[..., k] + kp1_values = x_sorted[..., minimum(k + 1, raveled_size - 1)] + + # Interpolation between the two values if needed + frac = k_float - k.astype(k_float.dtype) + quantile_value = (1 - frac) * k_values + frac * kp1_values + + return quantile_value + + +def percentile(x: TensorLike, p: float | list[float], axis=None) -> TensorVariable: + """ + Computes the p-th percentile along the given axis(es) of a tensor `input`. + + Parameters + ---------- + x: TensorVariable + The input tensor. + p: float or list of float + The percentile(s) to compute, which must be between 0 and 100 inclusive. + 0 corresponds to the minimum, 50 to the median, and 100 to the maximum. + axis: None or int or (list of int) (see `Sum`) + Compute the percentile along this axis of the tensor. + None means all axes (like numpy). + + Returns + ------- + TensorVariable + The computed percentile values. + """ + # Convert percentiles (0-100) to quantiles (0-1) + q = as_tensor_variable(p) / 100.0 + + # Call the quantile function + return quantile(x, q, axis=axis) + + # NumPy logical aliases square = sqr @@ -3080,6 +3161,8 @@ def nan_to_num(x, nan=0.0, posinf=None, neginf=None): "outer", "any", "all", + "percentile", + "quantile", "ptp", "power", "logaddexp", diff --git a/tests/tensor/test_math.py b/tests/tensor/test_math.py index 14bc2614e3..11a54a6685 100644 --- a/tests/tensor/test_math.py +++ b/tests/tensor/test_math.py @@ -102,9 +102,11 @@ neg, neq, outer, + percentile, polygamma, power, ptp, + quantile, rad2deg, reciprocal, round_half_away_from_zero, @@ -3766,3 +3768,70 @@ def test_median(ndim, axis): assert np.allclose(result_odd, expected_odd) assert np.allclose(result_even, expected_even) + + +@pytest.mark.parametrize( + "ndim, axis, q", + [ + (2, None, 50), + (2, 1, 33), + (2, (0, 1), 50), + (3, (1, 2), 50), + (4, (1, 3, 0), 25), + (2, None, [25, 50, 75]), + (3, (1, 2), [10, 90]), + (3, 1, 75), + (3, 0, 50), + ], +) +def test_percentile(ndim, axis, q): + shape = tuple(np.arange(1, ndim + 1)) + data = np.random.rand(*shape) + x = tensor(shape=np.array(data).shape) + f = function([x], percentile(x, q, axis=axis)) + result = f(data.astype(x.dtype)) + expected = np.percentile(data.astype(x.dtype), q, axis=axis) + assert np.allclose(result, expected) + + +@pytest.mark.parametrize( + "ndim, axis, q", + [ + (2, None, 0.5), + (2, None, [0.25, 0.75]), + (2, 0, 0.5), + (2, (0, 1), 0.5), + (3, None, 0.5), + (3, None, [0.25, 0.75]), + (3, 0, 0.5), + (3, (1, 2), 0.5), + (3, (1, 2), [0.1, 0.9]), + ], +) +def test_quantile(ndim, axis, q): + shape = tuple(np.random.randint(2, 6) for _ in range(ndim)) + data = np.random.rand(*shape) + + x = tensor(dtype="float64", shape=(None,) * ndim) + f = function([x], quantile(x, q, axis=axis)) + + result = f(data.astype(x.dtype)) + expected = np.quantile(data.astype(x.dtype), q, axis=axis) + + assert np.allclose(result, expected) + + +@pytest.mark.parametrize( + "ndim, axis, q, is_percentile", + [ + (2, None, [50, 120], True), + (2, 1, -0.5, False), + ], +) +def test_invalid_percentile_quantile(ndim, axis, q, is_percentile): + x = tensor(dtype="float64", shape=(None,) * ndim) + with pytest.raises(ValueError): + if is_percentile: + percentile(x, q, axis) + else: + quantile(x, q, axis)