Skip to content

memory error with large expected_groups #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
solomon-negusse opened this issue Apr 1, 2025 · 18 comments
Open

memory error with large expected_groups #428

solomon-negusse opened this issue Apr 1, 2025 · 18 comments

Comments

@solomon-negusse
Copy link

solomon-negusse commented Apr 1, 2025

Hi, I'm testing out flox (with dask) as replacement for scala based zonal stats on global rasters (30m resolution) and getting promising results in performance with cleaner and much smaller code! However I'm running into this memory issue that I wanted to see if has cleaner solution than what I'm doing now.

Here is the simple code for calculating tree cover loss area at three political boundary levels, grouped by a total of six dask array layers with expected_groups sizes of (23, 5, 7, 248, 854, 86).

tcl_by_year = xarray_reduce(
    areas.band_data,
    tcl_data,
    drivers_data,
    tcd_thresholds_data,
    gadm_adm0_data,
    gadm_adm1_data,
    gadm_adm2_data,
    func='sum',
    expected_groups=(tcl_years, drivers_cats, tcd_threshold_levels, gadm_adm0_ids, gadm_adm1_ids, gadm_adm2_ids),
).compute()

That runs into this error:

MemoryError: Unable to allocate 109 GiB for an array with shape (14662360160,) and data type int64

I'm getting around this by chunking the group-by layer with the highest unique labels (854) and doing the above in a dask delayed function over the chunks and concatenating the results.

from dask import delayed
chunk_size = 200
tasks = [
    delayed(reduce_chunk)(gadm_adm2_ids[i:i+chunk_size])
    for i in range(0, len(gadm_adm2_ids), chunk_size)
]

results = dask.compute(*tasks)

combined = xr.concat(results, dim="gadm_adm2")

chunked = final.chunk({'tcl_year':-1, 'drivers':-1, 'tcd_threshold': -1, 'gadm_adm0':1, 'gadm_adm2':-1, 'gadm_adm1':-1})

chunked.to_zarr("s3://**/gadm_results.zarr", mode="w")
Image

This works but I may not be persisting some of these layers correctly for use by the expected_group chunks and runs slower than expected. Is there a more efficient and elegant way to handle this situation? It'd be great for example if this MultiIndex can be built dynamically in the workers with the available groups. Thanks.

Environment:
Flox: 0.10.1
Dask: 2025.2.0
Numpy: 2.2.3
Xarray: 2025.1.2

@dcherian
Copy link
Collaborator

dcherian commented Apr 1, 2025

👏🏾 👏🏾 I've been waiting for you ;)

For dynamic discovery of groups at compute-time, you'd need reindex=False. I built it for a similar workload, but you're the first to actually ask about it AFAIR. Be warned, I haven't really used that option in a while but it is tested.

(I welcome any suggestions for making this option easier to understand, or better documented. It is rarely needed).


Though now that I think about it more, this is not going to work directly, it will accumulate to one single 50GB chunk at the end. What is tcl_year? What dimensions does it have? 🤞🏾 it has only a time dimension.


Now for the more interesting part.

I've been thinking about how to do this smarter, in particular, like this, since we can determine the bounding boxes for each polygon relatively cheaply from the vector geometries (it's incredibly expensive to do it from a 30m global raster). In that doc example, the "zone" raster is only 2GB so flox can scan it in memory and figure out how to do it.

This giant-raster-zonal-stat problem is the only one I've seen that actually needs something a lot more complicated. I'm quite interested in working it out (hopefully just-in-time for Cloud Native Geospatial)

So some questions:

  1. Would you mind showing me reprs for the areas.band_data, tcl_data, drivers_data, tcd_thresholds_data?
  2. How did you rasterize GADM to this grid? I didn't find any solutions a month ago and wrote this
  3. Are you effectively grouping by ADM_2 geometries? Or are you getting a result for every ADM_0, ADM_1, and ADM_2 geometry?

@dcherian
Copy link
Collaborator

dcherian commented Apr 1, 2025

Ah, so here are number of geometries at each admin level:

Admin Layer Number of Geometries Unique IDs
0 263 263
1 3,662 83
2 47,217 854
3 144,193
4 153,410
5 51,427

Which means at ADM2 level, the sparsity is (47217/263/83/854) ~ 0.002 . So another option here is to use sparse arrays for the intermediates.

@solomon-negusse
Copy link
Author

Thanks for the quick response and looking into this!:)
I actually tried reindex=False but I get ValueError: Please provide expected_groups if not grouping by a numpy array. error. The same xarray_reduce call as above with that kwarg added and removing expected_groups. Am I using it wrong? My thinking if that worked was to write the partial results from the beefy dask workers to zarr and read subsets from that for post-processing.

Here is some more info:

Would you mind showing me reprs for the areas.band_data, tcl_data, drivers_data, tcd_thresholds_data?

Areas.band_data:

Image

All the groupers have the same shape as the areas dataset - all fit internal gridding system we use. tcl_data is just aligned form of the tcl_year btw.

Image

How did you rasterize GADM to this grid? I didn't find any solutions a month ago and wrote this

We have a legacy pipeline that uses gdal_rasterize parallelized using AWS batch that does this. I had experimented with datashader for this a while ago which has dask array backend. Now that I use flox, I may revisit that.

Are you effectively grouping by ADM_2 geometries? Or are you getting a result for every ADM_0, ADM_1, and ADM_2 geometry?

We are doing the latter to report at every admin level. I tested out running without ADM_2 and that worked smoothly FYI.

@dcherian
Copy link
Collaborator

dcherian commented Apr 3, 2025

On #430 you should be able to

from flox import ReindexArrayType, ReindexStrategy

with raise_if_dask_computes():
    result = xarray_reduce(
        ds.areas,
        ds.tcl_year,
        ds.drivers,
        ds.tcd_thresholds,
        gadm.adm0,
        gadm.adm1,
        gadm.adm2,
        expected_groups=(
            np.arange(23),
            np.arange(1, 6),
            np.arange(1, 8),
            np.arange(248),
            np.arange(86),
            np.arange(854),
        ),
        func="sum",
        reindex=ReindexStrategy(
            blockwise=False, array_type=ReindexArrayType.SPARSE_COO
        ),
    )
Image

it's a bit slow (I suspect #298 will improve things) but memory use seems low. I didn't run it for too long.

Are you able to try it out?


There's got to be a smarter way to do this starting with just the geometries.

@dcherian
Copy link
Collaborator

dcherian commented Apr 3, 2025

I actually tried reindex=False but I get ValueError: Please provide expected_groups if not grouping by a numpy array. error. The same xarray_reduce call as above with that kwarg added and removing expected_groups. Am I using it wrong?

Forgot to respond, yes you'll need to provide expected_groups when grouping by dask arrays AND using Xarray. Are you able to try out #430?

@solomon-negusse
Copy link
Author

Thanks for the updates, @dcherian. I tested it out and getting this error writing the results. As immediate solution, I can create a single gadm layer that encodes all three levels and see if that works.

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[23], line 25
      1 from flox import ReindexArrayType, ReindexStrategy
      3 result = xarray_reduce(
      4     areas.band_data,
      5     tcl_data,
   (...)     22     ),
     23 )
---> 25 result.to_zarr("s3://gfw-data-lake/tsc_tree_cover_loss_drivers/v2023/raster/epsg-4326/zarr/gadm_adm2_results.zarr", mode="w")

File /opt/coiled/env/lib/python3.11/site-packages/xarray/core/dataarray.py:4428, in DataArray.to_zarr(self, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version)
   4424 else:
   4425     # No problems with the name - so we're fine!
   4426     dataset = self.to_dataset()
-> 4428 return to_zarr(  # type: ignore[call-overload,misc]
   4429     dataset,
   4430     store=store,
   4431     chunk_store=chunk_store,
   4432     mode=mode,
   4433     synchronizer=synchronizer,
   4434     group=group,
   4435     encoding=encoding,
   4436     compute=compute,
   4437     consolidated=consolidated,
   4438     append_dim=append_dim,
   4439     region=region,
   4440     safe_chunks=safe_chunks,
   4441     storage_options=storage_options,
   4442     zarr_version=zarr_version,
   4443 )

File [/opt/coiled/env/lib/python3.11/site-packages/xarray/backends/api.py:2240](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/xarray/backends/api.py#line=2239), in to_zarr(dataset, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, zarr_format, write_empty_chunks, chunkmanager_store_kwargs)
   2238 # TODO: figure out how to properly handle unlimited_dims
   2239 dump_to_store(dataset, zstore, writer, encoding=encoding)
-> 2240 writes = writer.sync(
   2241     compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs
   2242 )
   2244 if compute:
   2245     _finalize_store(writes, zstore)

File [/opt/coiled/env/lib/python3.11/site-packages/xarray/backends/common.py:358](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/xarray/backends/common.py#line=357), in ArrayWriter.sync(self, compute, chunkmanager_store_kwargs)
    355 if chunkmanager_store_kwargs is None:
    356     chunkmanager_store_kwargs = {}
--> 358 delayed_store = chunkmanager.store(
    359     self.sources,
    360     self.targets,
    361     lock=self.lock,
    362     compute=compute,
    363     flush=True,
    364     regions=self.regions,
    365     **chunkmanager_store_kwargs,
    366 )
    367 self.sources = []
    368 self.targets = []

File [/opt/coiled/env/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:247](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py#line=246), in DaskManager.store(self, sources, targets, **kwargs)
    239 def store(
    240     self,
    241     sources: Any | Sequence[Any],
    242     targets: Any,
    243     **kwargs: Any,
    244 ) -> Any:
    245     from dask.array import store
--> 247     return store(
    248         sources=sources,
    249         targets=targets,
    250         **kwargs,
    251     )

File [/opt/coiled/env/lib/python3.11/site-packages/dask/array/core.py:1257](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/dask/array/core.py#line=1256), in store(***failed resolving arguments***)
   1255 elif compute:
   1256     store_dsk = HighLevelGraph(layers, dependencies)
-> 1257     compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1258     return None
   1260 else:

File [/opt/coiled/env/lib/python3.11/site-packages/dask/base.py:399](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/dask/base.py#line=398), in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    397 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    398 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 399 return schedule(dsk2, keys, **kwargs)

File [/opt/coiled/env/lib/python3.11/site-packages/distributed/client.py:3492](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/distributed/client.py#line=3491), in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3490         should_rejoin = False
   3491 try:
-> 3492     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3493 finally:
   3494     for f in futures.values():

File [/opt/coiled/env/lib/python3.11/site-packages/distributed/client.py:2565](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/distributed/client.py#line=2564), in Client.gather(self, futures, errors, direct, asynchronous)
   2562     local_worker = None
   2564 with shorten_traceback():
-> 2565     return self.sync(
   2566         self._gather,
   2567         futures,
   2568         errors=errors,
   2569         direct=direct,
   2570         local_worker=local_worker,
   2571         asynchronous=asynchronous,
   2572     )

File [/opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1330](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/flox/core.py#line=1329), in _simple_combine()
   1326 if not reindex.blockwise:
   1327     # We didn't reindex at the blockwise step
   1328     # So now reindex before combining by reducing along DUMMY_AXIS
   1329     unique_groups = _find_unique_groups(x_chunk)
-> 1330     x_chunk = deepmap(
   1331         partial(
   1332             reindex_intermediates,
   1333             agg=agg,
   1334             unique_groups=unique_groups,
   1335             array_type=reindex.array_type,
   1336         ),
   1337         x_chunk,
   1338     )
   1339 else:
   1340     unique_groups = deepfirst(x_chunk)["groups"]

File [/opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1379](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/flox/core.py#line=1378), in reindex_intermediates()
   1377 new_shape = x["groups"].shape[:-1] + (len(unique_groups),)
   1378 newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)}
-> 1379 newx["intermediates"] = tuple(
   1380     reindex_(
   1381         v,
   1382         from_=np.atleast_1d(x["groups"].squeeze()),
   1383         to=pd.Index(unique_groups),
   1384         fill_value=f,
   1385         array_type=array_type,
   1386     )
   1387     for v, f in zip(x["intermediates"], agg.fill_value["intermediate"])
   1388 )
   1389 return newx

File [/opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1380](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/flox/core.py#line=1379), in <genexpr>()
   1377 new_shape = x["groups"].shape[:-1] + (len(unique_groups),)
   1378 newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)}
   1379 newx["intermediates"] = tuple(
-> 1380     reindex_(
   1381         v,
   1382         from_=np.atleast_1d(x["groups"].squeeze()),
   1383         to=pd.Index(unique_groups),
   1384         fill_value=f,
   1385         array_type=array_type,
   1386     )
   1387     for v, f in zip(x["intermediates"], agg.fill_value["intermediate"])
   1388 )
   1389 return newx

File [/opt/coiled/env/lib/python3.11/site-packages/flox/core.py:797](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/flox/core.py#line=796), in reindex_()
    795     reindexed = reindex_numpy(array, from_, to, fill_value, new_dtype, axis)
    796 elif array_type is ReindexArrayType.SPARSE_COO:
--> 797     reindexed = reindex_pydata_sparse_coo(array, from_, to, fill_value, new_dtype, axis)
    798 return reindexed

File [/opt/coiled/env/lib/python3.11/site-packages/flox/core.py:741](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/flox/core.py#line=740), in reindex_pydata_sparse_coo()
    738 ranges = np.broadcast_arrays(*np.ix_(*(tuple(np.arange(size) for size in shape[:axis]) + (idx,))))
    739 coords = np.stack(ranges, axis=0).reshape(array.ndim, -1)
--> 741 reindexed = sparse.COO(
    742     coords=coords,
    743     data=array.reshape(-1).astype(dtype, copy=False),
    744     shape=(*array.shape[:axis], to.size),
    745 )
    746 return reindexed

File [/opt/coiled/env/lib/python3.11/site-packages/sparse/numba_backend/_coo/core.py:229](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/sparse/numba_backend/_coo/core.py#line=228), in __init__()
    226         self.enable_caching()
    227     return
--> 229 self.data = np.asarray(data)
    230 self.coords = np.asarray(coords)
    232 if self.coords.ndim == 1:

File [/opt/coiled/env/lib/python3.11/site-packages/sparse/numba_backend/_sparse_array.py:276](https://cluster-llfub.dask.host/opt/coiled/env/lib/python3.11/site-packages/sparse/numba_backend/_sparse_array.py#line=275), in __array__()
    273 from ._settings import AUTO_DENSIFY
    275 if not AUTO_DENSIFY:
--> 276     raise RuntimeError(
    277         "Cannot convert a sparse array to dense automatically. To manually densify, use the todense method."
    278     )
    280 return np.asarray(self.todense(), *args, **kwargs)

RuntimeError: Cannot convert a sparse array to dense automatically. To manually densify, use the todense method.

@dcherian
Copy link
Collaborator

dcherian commented Apr 4, 2025

This is fixed on the latest version of #430 . But yes using a single array with all 3 in there will work better.

dcherian added a commit that referenced this issue Apr 5, 2025
dcherian added a commit that referenced this issue Apr 5, 2025
dcherian added a commit that referenced this issue Apr 5, 2025
dcherian added a commit that referenced this issue Apr 5, 2025
dcherian added a commit that referenced this issue Apr 5, 2025
@dcherian
Copy link
Collaborator

dcherian commented Apr 5, 2025

I just tagged v0.10.2 which should work for you.

I wrote a quick doc page for this workload: https://flox.readthedocs.io/en/latest/user-stories/large-zonal-stats.html

Let me know how it goes!

@solomon-negusse
Copy link
Author

Thanks for the user story docs and release, @dcherian! Looks like made it further with your latest fix but hitting this error now:

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1376, in _simple_combine()
   1372 if not reindex.blockwise:
   1373     # We didn't reindex at the blockwise step
   1374     # So now reindex before combining by reducing along DUMMY_AXIS
   1375     unique_groups = _find_unique_groups(x_chunk)
-> 1376     x_chunk = deepmap(
   1377         partial(
   1378             reindex_intermediates,
   1379             agg=agg,
   1380             unique_groups=unique_groups,
   1381             array_type=reindex.array_type,
   1382         ),
   1383         x_chunk,
   1384     )
   1385 else:
   1386     unique_groups = deepfirst(x_chunk)["groups"]

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1426, in reindex_intermediates()
   1424 new_shape = x["groups"].shape[:-1] + (len(unique_groups),)
   1425 newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)}
-> 1426 newx["intermediates"] = tuple(
   1427     reindex_(
   1428         v,
   1429         from_=np.atleast_1d(x["groups"].squeeze()),
   1430         to=pd.Index(unique_groups),
   1431         fill_value=f,
   1432         array_type=array_type,
   1433     )
   1434     for v, f in zip(x["intermediates"], agg.fill_value["intermediate"])
   1435 )
   1436 return newx

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1427, in <genexpr>()
   1424 new_shape = x["groups"].shape[:-1] + (len(unique_groups),)
   1425 newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)}
   1426 newx["intermediates"] = tuple(
-> 1427     reindex_(
   1428         v,
   1429         from_=np.atleast_1d(x["groups"].squeeze()),
   1430         to=pd.Index(unique_groups),
   1431         fill_value=f,
   1432         array_type=array_type,
   1433     )
   1434     for v, f in zip(x["intermediates"], agg.fill_value["intermediate"])
   1435 )
   1436 return newx

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:825, in reindex_()
    823     reindexed = reindex_numpy(array, from_, to, fill_value, new_dtype, axis)
    824 elif array_type is ReindexArrayType.SPARSE_COO:
--> 825     reindexed = reindex_pydata_sparse_coo(array, from_, to, fill_value, new_dtype, axis)
    826 return reindexed

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:768, in reindex_pydata_sparse_coo()
    764 coords = np.stack(ranges, axis=0).reshape(array.ndim, -1)
    766 data = array.data if isinstance(array, sparse.COO) else array.reshape(-1)
--> 768 reindexed = sparse.COO(
    769     coords=coords,
    770     data=data.astype(dtype, copy=False),
    771     shape=(*array.shape[:axis], to.size),
    772 )
    774 return reindexed

File /opt/coiled/env/lib/python3.11/site-packages/sparse/numba_backend/_coo/core.py:264, in __init__()
    262 if len(self.data) != self.coords.shape[1]:
    263     msg = "The data length does not match the coordinates given.\nlen(data) = {}, but {} coords specified."
--> 264     raise ValueError(msg.format(len(data), self.coords.shape[1]))
    265 if len(self.shape) != self.coords.shape[0]:
    266     msg = (
    267         "Shape specified by `shape` doesn't match the "
    268         "shape of `coords`; len(shape)={} != coords.shape[0]={}"
    269         "(and coords.shape={})"
    270     )

ValueError: The data length does not match the coordinates given.
len(data) = 0, but 1 coords specified.

@dcherian
Copy link
Collaborator

dcherian commented Apr 7, 2025

oops, thanks for testing it out.

With #437 , I now test quite a few edge cases. You will need to also specify fill_value=0 (the value for groups that don't actually exist). Please let me know how that goes.

@solomon-negusse
Copy link
Author

solomon-negusse commented Apr 7, 2025

Great news @dcherian: this works now with all but the drivers grouper included! The density of the final sparse result is 0.00154 so the strategy is super efficient for this use case. There's something weird about the drivers layer (the size explodes when aligned, for example) and it's throwing the sparse error ValueError: The data length does not match the coordinates given. len(data) = 0, but 1 coords specified. I'll do some inspection of the data tomorrow and report back with exact details. Thanks much for this work.

@dcherian
Copy link
Collaborator

dcherian commented Apr 8, 2025

here's something weird about the drivers layer (the size explodes when aligned, for example)

Hmmm this is probably floating point mismatch in the coordinate values. Using xr.align(..., join="override") should fix that.

it's throwing the sparse error ValueError: The data length does not match the coordinates given. len(data) = 0, but 1 coords specified.

That's weird. I just checked and I'm certainly running all the edge cases in the test suite with this option.

@solomon-negusse
Copy link
Author

The drivers data had same coordinates as areas but got cast to float32 when getting aligned up to the other datasets' dimensions. The sparse ValueError above may have been because of the mismatch between the data and the expected_groups types, not having noticed the type change (user error:).
I now clipped all the datasets to the drivers' extents as that's our primary grouper and this kept it in uint8. However, I'm getting the original memory error when I use all groupers and the error is reporting the full 23 x 5 x 7 x 248 x 86 x 854 size of the groupers (double-checked I'm using v0.10.3 ). It works if I take out either the tcd or drivers grouper. I also tried keeping both but limiting the tcd expected_groups to just two elements (suspecting result may be too dense). This doesn't raise the memory error on client side but workers get killed with signal 15 after getting to the last reshape task (attaching last few mins of worker logs). I'm testing this with 50 64GB workers.

2025-04-09T11_38_50.714Z_to_2025-04-09T11_50_52.764Z_tcl_dask_logs.csv.csv

Memory error with all groupers:

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
Cell In[22], line 1
----> 1 result = tcl_by_year.compute()

File /opt/coiled/env/lib/python3.11/site-packages/xarray/core/dataarray.py:1206, in DataArray.compute(self, **kwargs)
   1181 """Manually trigger loading of this array's data from disk or a
   1182 remote source into memory and return a new array.
   1183 
   (...)   1203 dask.compute
   1204 """
   1205 new = self.copy(deep=False)
-> 1206 return new.load(**kwargs)

File /opt/coiled/env/lib/python3.11/site-packages/xarray/core/dataarray.py:1174, in DataArray.load(self, **kwargs)
   1154 def load(self, **kwargs) -> Self:
   1155     """Manually trigger loading of this array's data from disk or a
   1156     remote source into memory and return this array.
   1157 
   (...)   1172     dask.compute
   1173     """
-> 1174     ds = self._to_temp_dataset().load(**kwargs)
   1175     new = self._from_temp_dataset(ds)
   1176     self._variable = new._variable

File /opt/coiled/env/lib/python3.11/site-packages/xarray/core/dataset.py:900, in Dataset.load(self, **kwargs)
    897 chunkmanager = get_chunked_array_type(*lazy_data.values())
    899 # evaluate all the chunked arrays simultaneously
--> 900 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    901     *lazy_data.values(), **kwargs
    902 )
    904 for k, data in zip(lazy_data, evaluated_data, strict=False):
    905     self.variables[k].data = data

File /opt/coiled/env/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
     80 def compute(
     81     self, *data: Any, **kwargs: Any
     82 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     83     from dask.array import compute
---> 85     return compute(*data, **kwargs)

File /opt/coiled/env/lib/python3.11/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    659     postcomputes.append(x.__dask_postcompute__())
    661 with shorten_traceback():
--> 662     results = schedule(dsk, keys, **kwargs)
    664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1360, in _aggregate()
   1358 """Final aggregation step of tree reduction"""
   1359 results = combine(x_chunk, agg, axis, keepdims, is_aggregate=True)
-> 1360 return _finalize_results(results, agg, axis, expected_groups, reindex=reindex)

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:1333, in _finalize_results()
   1331 # Final reindexing has to be here to be lazy
   1332 if not reindex.blockwise and expected_groups is not None:
-> 1333     finalized[agg.name] = reindex_(
   1334         finalized[agg.name],
   1335         squeezed["groups"],
   1336         expected_groups,
   1337         fill_value=fill_value,
   1338         array_type=reindex.array_type,
   1339     )
   1340     finalized["groups"] = expected_groups
   1341 else:

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:854, in reindex_()
    852     reindexed = reindex_numpy(array, from_, to, fill_value, new_dtype, axis)
    853 elif array_type is ReindexArrayType.SPARSE_COO:
--> 854     reindexed = reindex_pydata_sparse_coo(array, from_, to, fill_value, new_dtype, axis)
    855 return reindexed

File /opt/coiled/env/lib/python3.11/site-packages/flox/core.py:765, in reindex_pydata_sparse_coo()
    761 import sparse
    763 assert axis == -1
--> 765 needs_reindex = (from_.get_indexer(to) == -1).any()
    766 if needs_reindex and fill_value is None:
    767     raise ValueError("Filling is required. fill_value cannot be None.")

File properties.pyx:36, in pandas._libs.properties.CachedProperty.__get__()

File /opt/coiled/env/lib/python3.11/site-packages/pandas/core/indexes/range.py:244, in _data()
    237 @cache_readonly
    238 def _data(self) -> np.ndarray:  # type: ignore[override]
    239     """
    240     An int array that for performance reasons is created only when needed.
    241 
    242     The constructed array is saved in ``_cache``.
    243     """
--> 244     return np.arange(self.start, self.stop, self.step, dtype=np.int64)

MemoryError: Unable to allocate 109. GiB for an array with shape (14662360160,) and data type int64

@dcherian
Copy link
Collaborator

dcherian commented Apr 9, 2025

Ah, so close! that was the very last step. Should be fixed in #440

I also fixed another case where the RangeIndex was getting realized in to memory, so you should see faster graph construction too I think.

The sparse ValueError above may have been because of the mismatch between the data and the expected_groups types, not having noticed the type change (user error:).

Weird, that shouldn't affect things but seems like you don't see it any more.

I'm testing this with 50 64GB workers.

You should be able to use a lot less memory now!

@solomon-negusse
Copy link
Author

This is great @dcherian! Confirming that it works with all the groupers now and didn't cost more than the case without the adm2 level in the older version. Thanks!

@dcherian
Copy link
Collaborator

dcherian commented Apr 9, 2025

🥳 thanks for the fun problem hehe.

Are you able to tell me how long it took / how much peak memory it used and the cost please? I'm hoping you didn't have to run it with 64GB nodes.

@solomon-negusse
Copy link
Author

The whole run including reading the geotiffs (in our roadmap to migrate to zarr for analysis) located in the same aws region took about 24 minutes (I'll have to disaggregate this) and cost about $9. Had to ran it on the 64GB nodes as some step (aligning?) is taking a lot of memory but will need to spend more time on the dask dashboard to diagnose that issue!

Image

@dcherian
Copy link
Collaborator

Yeah something is really weird. Why is it using so much memory in the beginning, for example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants