Skip to content

BUG: Fix #46726; wrong result with varying window size min/max rolling calc. #61288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 100 additions & 77 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libc.math cimport (
sqrt,
)
from libcpp.deque cimport deque
from libcpp.stack cimport stack
from libcpp.unordered_map cimport unordered_map

from pandas._libs.algos cimport TiebreakEnumType
Expand Down Expand Up @@ -991,36 +992,24 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
# Moving maximum / minimum code taken from Bottleneck
# Licence at LICENSES/BOTTLENECK_LICENCE

cdef int64_t bisect_left(
deque[int64_t]& a,
int64_t x,
int64_t lo=0,
int64_t hi=-1
) nogil:
cdef int64_t mid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can you add a similar docstring.

if hi == -1:
hi = a.size()
while lo < hi:
mid = (lo + hi) // 2
if a.at(mid) < x:
lo = mid + 1
else:
hi = mid
return lo

cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) noexcept nogil:

if ai == ai:
nobs[0] = nobs[0] + 1
elif is_max:
ai = MINfloat64
else:
ai = MAXfloat64

return ai


cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) noexcept nogil:
""" remove a value from the mm calc """
if aold == aold:
nobs[0] = nobs[0] - 1


cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs,
float64_t value) noexcept nogil:
cdef:
float64_t result

if nobs >= minp:
result = value
else:
result = NaN

return result
from libc.math cimport isnan


def roll_max(ndarray[float64_t] values, ndarray[int64_t] start,
Expand Down Expand Up @@ -1068,69 +1057,103 @@ def roll_min(ndarray[float64_t] values, ndarray[int64_t] start,
return _roll_min_max(values, start, end, minp, is_max=0)


cdef _roll_min_max(ndarray[float64_t] values,
ndarray[int64_t] starti,
ndarray[int64_t] endi,
int64_t minp,
bint is_max):
def _roll_min_max(
ndarray[float64_t] values,
ndarray[int64_t] start,
ndarray[int64_t] end,
int64_t minp,
bint is_max
):
cdef:
float64_t ai
int64_t curr_win_size, start
Py_ssize_t i, k, nobs = 0, N = len(starti)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start)
deque Q[int64_t]
stack Dominators[int64_t]
ndarray[float64_t, ndim=1] output

Py_ssize_t this_start, this_end, stash_start
int64_t q_idx

output = np.empty(N, dtype=np.float64)
Q = deque[int64_t]()
W = deque[int64_t]()
Dominators = stack[int64_t]()

# This function was "ported" / translated from sliding_min_max()
# in /pandas/core/_numba/kernels/min_max_.py.
# (See there for credits and some comments.)
# Code translation assumptions/rules:
# - min_periods --> minp
# - deque[0] --> front()
# - deque[-1] --> back()
# - stack[-1] --> top()
# - bool(stack/deque) --> !empty()
# - deque.append() --> push_back()
# - stack.append() --> push()
# - deque.popleft --> pop_front()
# - deque.pop() --> pop_back()

with nogil:
if minp < 1:
minp = 1

if N>2:
i_next = N - 1
for i in range(N - 2, -1, -1):
if start[i_next] < start[i] \
and (
Dominators.empty()
or start[Dominators.top()] > start[i_next]
):
Dominators.push(i_next)
i_next = i

# This is using a modified version of the C++ code in this
# SO post: https://stackoverflow.com/a/12239580
# The original impl didn't deal with variable window sizes
# So the code was optimized for that
valid_start = -minp

last_end = 0
last_start = -1

# first window's size
curr_win_size = endi[0] - starti[0]
# GH 32865
# Anchor output index to values index to provide custom
# BaseIndexer support
for i in range(N):
this_start = start[i]
this_end = end[i]

if (not Dominators.empty() and Dominators.top() == i):
Dominators.pop()

curr_win_size = endi[i] - starti[i]
if i == 0:
start = starti[i]
if not (this_end > last_end
or (this_end == last_end and this_start >= last_start)):
raise ValueError(
"Start/End ordering requirement is violated at index {}".format(i))

if Dominators.empty():
stash_start = this_start
else:
start = endi[i - 1]

for k in range(start, endi[i]):
ai = init_mm(values[k], &nobs, is_max)
# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
Q.push_back(k)
W.push_back(k)

# Discard entries outside and left of current window
while not Q.empty() and Q.front() <= starti[i] - 1:
stash_start = min(this_start, start[Dominators.top()])

while not Q.empty() and Q.front() < stash_start:
Q.pop_front()
while not W.empty() and W.front() <= starti[i] - 1:
remove_mm(values[W.front()], &nobs)
W.pop_front()

# Save output based on index in input value array
if not Q.empty() and curr_win_size > 0:
output[i] = calc_mm(minp, nobs, values[Q.front()])
else:
for k in range(last_end, this_end):
if not isnan(values[k]):
valid_start += 1
while valid_start >= 0 and isnan(values[valid_start]):
valid_start += 1

if is_max:
while not Q.empty() and values[k] >= values[Q.back()]:
Q.pop_back()
else:
while not Q.empty() and values[k] <= values[Q.back()]:
Q.pop_back()
Q.push_back(k)

if Q.empty() or this_start > valid_start:
output[i] = NaN
elif Q.front() >= this_start:
output[i] = values[Q.front()]
else:
q_idx = bisect_left(Q, this_start, lo=1)
output[i] = values[Q[q_idx]]
last_end = this_end
last_start = this_start

return output

Expand Down
123 changes: 83 additions & 40 deletions pandas/core/_numba/kernels/min_max_.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Any,
)

import numba
import numpy as np
Expand All @@ -18,6 +21,19 @@
from pandas._typing import npt


@numba.njit(nogil=True, parallel=False)
def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int:
if hi == -1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here; I think just a single line is fine. Something like

"""Get index of smallest value greater than or equal to x between lo and hi."""

hi = len(a)
while lo < hi:
mid = (lo + hi) // 2
if a[mid] < x:
lo = mid + 1
else:
hi = mid
return lo


@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_min_max(
values: np.ndarray,
Expand All @@ -27,55 +43,82 @@ def sliding_min_max(
min_periods: int,
is_max: bool,
) -> tuple[np.ndarray, list[int]]:
# Basic idea of the algorithm: https://stackoverflow.com/a/12239580
# It was generalized to work with an arbitrary list of any window size and position
# by adding the Dominators stack.

N = len(start)
nobs = 0
output = np.empty(N, dtype=result_dtype)
na_pos = []
# Use deque once numba supports it
# https://github.com/numba/numba/issues/7417
Q: list = []
W: list = []
for i in range(N):
curr_win_size = end[i] - start[i]
if i == 0:
st = start[i]
output = np.empty(N, dtype=result_dtype)

def cmp(a: Any, b: Any, is_max: bool) -> bool:
if is_max:
return a >= b
else:
st = end[i - 1]

for k in range(st, end[i]):
ai = values[k]
if not np.isnan(ai):
nobs += 1
elif is_max:
ai = -np.inf
else:
ai = np.inf
# Discard previous entries if we find new min or max
if is_max:
while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
Q.pop()
else:
while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
Q.pop()
Q.append(k)
W.append(k)
return a <= b

# Discard entries outside and left of current window
while Q and Q[0] <= start[i] - 1:
Q: list = [] # this is a queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what this queue tracks. Something like

# Track indices of potential extrema. Corresponding values will always be monotonic (increasing/decreasing depending on min/max).

Also, I think type-hint can be list[int].

Dominators: list = [] # this is a stack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what this stack tracks. Something like

# Track `start` values we need to consider when building `Q`.

Can also be list[int].


if min_periods < 1:
min_periods = 1

if N > 2:
i_next = N - 1 # equivalent to i_next = i+1 inside the loop
for i in range(N - 2, -1, -1):
next_dominates = start[i_next] < start[i]
if next_dominates and (
not Dominators or start[Dominators[-1]] > start[i_next]
):
Dominators.append(i_next)
i_next = i

valid_start = -min_periods

last_end = 0
last_start = -1

for i in range(N):
this_start = start[i].item()
this_end = end[i].item()

if Dominators and Dominators[-1] == i:
Dominators.pop()

if not (
this_end > last_end or (this_end == last_end and this_start >= last_start)
):
raise ValueError(
"Start/End ordering requirement is violated at index " + str(i)
)

stash_start = (
this_start if not Dominators else min(this_start, start[Dominators[-1]])
)
while Q and Q[0] < stash_start:
Q.pop(0)
while W and W[0] <= start[i] - 1:
if not np.isnan(values[W[0]]):
nobs -= 1
W.pop(0)

# Save output based on index in input value array
if Q and curr_win_size > 0 and nobs >= min_periods:
output[i] = values[Q[0]]
else:
for k in range(last_end, this_end):
if not np.isnan(values[k]):
valid_start += 1
while valid_start >= 0 and np.isnan(values[valid_start]):
valid_start += 1
while Q and cmp(values[k], values[Q[-1]], is_max):
Q.pop() # Q.pop_back()
Q.append(k) # Q.push_back(k)

if not Q or (this_start > valid_start):
if values.dtype.kind != "i":
output[i] = np.nan
else:
na_pos.append(i)
elif Q[0] >= this_start:
output[i] = values[Q[0]]
else:
q_idx = bisect_left(Q, this_start, lo=1)
output[i] = values[Q[q_idx]]
last_end = this_end
last_start = this_start

return output, na_pos

Expand Down
15 changes: 15 additions & 0 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,18 @@ def test_npfunc_no_warnings():
df = DataFrame({"col1": [1, 2, 3, 4, 5]})
with tm.assert_produces_warning(False):
df.col1.rolling(2).apply(np.prod, raw=True, engine="numba")


from .test_rolling import TestMinMax


@td.skip_if_no("numba")
class TestMinMaxNumba:
parent = TestMinMax()

@pytest.mark.parametrize("is_max, has_nan, exp_list", TestMinMax.TestData)
def test_minmax(self, is_max, has_nan, exp_list):
TestMinMaxNumba.parent.test_minmax(is_max, has_nan, exp_list, "numba")

def test_wrong_order(self):
TestMinMaxNumba.parent.test_wrong_order("numba")
Comment on lines +590 to +598
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you essentially duplicate the test in pandas/tests/window/test_rolling.py? The pandas test suite avoids test dependencies like this.

Loading
Loading