Skip to content

Commit

Permalink
add option to post monthly concurrent requests
Browse files Browse the repository at this point in the history
  • Loading branch information
fneum committed Aug 14, 2024
1 parent d04aff0 commit 1866fc3
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 6 deletions.
15 changes: 14 additions & 1 deletion atlite/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
from atlite.datasets import modules as datamodules


def get_features(cutout, module, features, tmpdir=None, monthly_requests=False):
def get_features(
cutout,
module,
features,
tmpdir=None,
monthly_requests=False,
concurrent_requests=False,
):
"""
Load the feature data for a given module.
Expand All @@ -44,6 +51,7 @@ def get_features(cutout, module, features, tmpdir=None, monthly_requests=False):
tmpdir=tmpdir,
lock=lock,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
**parameters,
)
datasets.append(feature_data)
Expand Down Expand Up @@ -121,6 +129,7 @@ def cutout_prepare(
overwrite=False,
compression={"zlib": True, "complevel": 9, "shuffle": True},
monthly_requests=False,
concurrent_requests=False,
):
"""
Prepare all or a selection of features in a cutout.
Expand Down Expand Up @@ -157,6 +166,9 @@ def cutout_prepare(
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True. The default is False.
Returns
-------
Expand Down Expand Up @@ -190,6 +202,7 @@ def cutout_prepare(
missing_features,
tmpdir=tmpdir,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
)
prepared |= set(missing_features)

Expand Down
21 changes: 17 additions & 4 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pandas as pd
import xarray as xr
from numpy import atleast_1d
from dask import delayed, compute

from atlite.gis import maybe_swap_spatial_dims
from atlite.pv.solar_position import SolarPosition
Expand Down Expand Up @@ -392,7 +393,13 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):


def get_data(
cutout, feature, tmpdir, lock=None, monthly_requests=False, **creation_parameters
cutout,
feature,
tmpdir,
lock=None,
monthly_requests=False,
concurrent_requests=False,
**creation_parameters,
):
"""
Retrieve data from ECMWFs ERA5 dataset (via CDS).
Expand All @@ -412,6 +419,9 @@ def get_data(
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True.
**creation_parameters :
Additional keyword arguments. The only effective argument is 'sanitize'
(default True) which sets sanitization of the data on or off.
Expand Down Expand Up @@ -448,8 +458,11 @@ def retrieve_once(time):
if feature in static_features:
return retrieve_once(retrieval_times(coords, static=True)).squeeze()

datasets = map(
retrieve_once, retrieval_times(coords, monthly_requests=monthly_requests)
)
time_chunks = retrieval_times(coords, monthly_requests=monthly_requests)
if concurrent_requests:
delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks]
datasets = compute(*delayed_datasets)
else:
datasets = map(retrieve_once, time_chunks)

return xr.concat(datasets, dim="time").sel(time=coords["time"])
4 changes: 3 additions & 1 deletion atlite/datasets/gebco.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_data_gebco_height(xs, ys, gebco_path):
)


def get_data(cutout, feature, tmpdir, monthly_requests=False, **creation_parameters):
def get_data(cutout, feature, tmpdir, monthly_requests=False, concurrent_requests=False, **creation_parameters):
"""
Get the gebco height data.
Expand All @@ -58,6 +58,8 @@ def get_data(cutout, feature, tmpdir, monthly_requests=False, **creation_paramet
Takes no effect, only here for consistency with other dataset modules.
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Must include `gebco_path`.
Expand Down
2 changes: 2 additions & 0 deletions atlite/datasets/sarah.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def get_data(
`atlite.datasets.sarah.features`
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Mandatory arguments are:
* 'sarah_dir', str. Directory of the stored SARAH data.
Expand Down

0 comments on commit 1866fc3

Please sign in to comment.