@@ -354,7 +354,7 @@ def rechunk_for_cohorts(
354
354
def rechunk_for_blockwise (array : DaskArray , axis : T_Axis , labels : np .ndarray ) -> DaskArray :
355
355
"""
356
356
Rechunks array so that group boundaries line up with chunk boundaries, allowing
357
- embarassingly parallel group reductions.
357
+ embarrassingly parallel group reductions.
358
358
359
359
This only works when the groups are sequential
360
360
(e.g. labels = ``[0,0,0,1,1,1,1,2,2]``).
@@ -849,7 +849,7 @@ def _finalize_results(
849
849
"""
850
850
squeezed = _squeeze_results (results , axis )
851
851
852
- if agg .min_count is not None :
852
+ if agg .min_count > 0 :
853
853
counts = squeezed ["intermediates" ][- 1 ]
854
854
squeezed ["intermediates" ] = squeezed ["intermediates" ][:- 1 ]
855
855
@@ -860,7 +860,7 @@ def _finalize_results(
860
860
else :
861
861
finalized [agg .name ] = agg .finalize (* squeezed ["intermediates" ], ** agg .finalize_kwargs )
862
862
863
- if agg .min_count is not None :
863
+ if agg .min_count > 0 :
864
864
count_mask = counts < agg .min_count
865
865
if count_mask .any ():
866
866
# For one count_mask.any() prevents promoting bool to dtype(fill_value) unless
@@ -1598,7 +1598,11 @@ def _lazy_factorize_wrapper(*by: T_By, **kwargs) -> np.ndarray:
1598
1598
1599
1599
1600
1600
def _factorize_multiple (
1601
- by : T_Bys , expected_groups : T_ExpectIndexTuple , any_by_dask : bool , reindex : bool
1601
+ by : T_Bys ,
1602
+ expected_groups : T_ExpectIndexTuple ,
1603
+ any_by_dask : bool ,
1604
+ reindex : bool ,
1605
+ sort : bool = True ,
1602
1606
) -> tuple [tuple [np .ndarray ], tuple [np .ndarray , ...], tuple [int , ...]]:
1603
1607
if any_by_dask :
1604
1608
import dask .array
@@ -1617,6 +1621,7 @@ def _factorize_multiple(
1617
1621
expected_groups = expected_groups ,
1618
1622
fastpath = True ,
1619
1623
reindex = reindex ,
1624
+ sort = sort ,
1620
1625
)
1621
1626
1622
1627
fg , gs = [], []
@@ -1643,6 +1648,7 @@ def _factorize_multiple(
1643
1648
expected_groups = expected_groups ,
1644
1649
fastpath = True ,
1645
1650
reindex = reindex ,
1651
+ sort = sort ,
1646
1652
)
1647
1653
1648
1654
return (group_idx ,), found_groups , grp_shape
@@ -1653,10 +1659,16 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) ->
1653
1659
return (None ,) * nby
1654
1660
1655
1661
if nby == 1 and not isinstance (expected_groups , tuple ):
1656
- if isinstance (expected_groups , pd .Index ):
1662
+ if isinstance (expected_groups , ( pd .Index , np . ndarray ) ):
1657
1663
return (expected_groups ,)
1658
1664
else :
1659
- return (np .asarray (expected_groups ),)
1665
+ array = np .asarray (expected_groups )
1666
+ if np .issubdtype (array .dtype , np .integer ):
1667
+ # preserve default dtypes
1668
+ # on pandas 1.5/2, on windows
1669
+ # when a list is passed
1670
+ array = array .astype (np .int64 )
1671
+ return (array ,)
1660
1672
1661
1673
if nby > 1 and not isinstance (expected_groups , tuple ): # TODO: test for list
1662
1674
raise ValueError (
@@ -1833,21 +1845,28 @@ def groupby_reduce(
1833
1845
# (pd.IntervalIndex or not)
1834
1846
expected_groups = _convert_expected_groups_to_index (expected_groups , isbins , sort )
1835
1847
1836
- is_binning = any ([ isinstance ( e , pd . IntervalIndex ) for e in expected_groups ])
1837
-
1838
- # TODO: could restrict this to dask-only
1839
- factorize_early = ( nby > 1 ) or (
1840
- is_binning and method == "cohorts" and is_duck_dask_array ( array )
1848
+ # Don't factorize "early only when
1849
+ # grouping by dask arrays, and not having expected_groups
1850
+ factorize_early = not (
1851
+ # can't do it if we are grouping by dask array but don't have expected_groups
1852
+ any ( is_dask and ex_ is None for is_dask , ex_ in zip ( by_is_dask , expected_groups ) )
1841
1853
)
1842
1854
if factorize_early :
1843
1855
bys , final_groups , grp_shape = _factorize_multiple (
1844
- bys , expected_groups , any_by_dask = any_by_dask , reindex = reindex
1856
+ bys ,
1857
+ expected_groups ,
1858
+ any_by_dask = any_by_dask ,
1859
+ # This is the only way it makes sense I think.
1860
+ # reindex controls what's actually allocated in chunk_reduce
1861
+ # At this point, we care about an accurate conversion to codes.
1862
+ reindex = True ,
1863
+ sort = sort ,
1845
1864
)
1846
1865
expected_groups = (pd .RangeIndex (math .prod (grp_shape )),)
1847
1866
1848
1867
assert len (bys ) == 1
1849
- by_ = bys [ 0 ]
1850
- expected_groups = expected_groups [ 0 ]
1868
+ ( by_ ,) = bys
1869
+ ( expected_groups ,) = expected_groups
1851
1870
1852
1871
if axis is None :
1853
1872
axis_ = tuple (array .ndim + np .arange (- by_ .ndim , 0 ))
@@ -1898,7 +1917,12 @@ def groupby_reduce(
1898
1917
min_count = 1
1899
1918
1900
1919
# TODO: set in xarray?
1901
- if min_count is not None and func in ["nansum" , "nanprod" ] and fill_value is None :
1920
+ if (
1921
+ min_count is not None
1922
+ and min_count > 0
1923
+ and func in ["nansum" , "nanprod" ]
1924
+ and fill_value is None
1925
+ ):
1902
1926
# nansum, nanprod have fill_value=0, 1
1903
1927
# overwrite than when min_count is set
1904
1928
fill_value = np .nan
0 commit comments