From 1866fc317b761c169252af14d05b1999cb9795a1 Mon Sep 17 00:00:00 2001 From: Fabian Neumann Date: Wed, 14 Aug 2024 12:17:53 +0200 Subject: [PATCH] add option to post monthly concurrent requests --- atlite/data.py | 15 ++++++++++++++- atlite/datasets/era5.py | 21 +++++++++++++++++---- atlite/datasets/gebco.py | 4 +++- atlite/datasets/sarah.py | 2 ++ 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/atlite/data.py b/atlite/data.py index 15d9d5fa..46ef1772 100644 --- a/atlite/data.py +++ b/atlite/data.py @@ -25,7 +25,14 @@ from atlite.datasets import modules as datamodules -def get_features(cutout, module, features, tmpdir=None, monthly_requests=False): +def get_features( + cutout, + module, + features, + tmpdir=None, + monthly_requests=False, + concurrent_requests=False, +): """ Load the feature data for a given module. @@ -44,6 +51,7 @@ def get_features(cutout, module, features, tmpdir=None, monthly_requests=False): tmpdir=tmpdir, lock=lock, monthly_requests=monthly_requests, + concurrent_requests=concurrent_requests, **parameters, ) datasets.append(feature_data) @@ -121,6 +129,7 @@ def cutout_prepare( overwrite=False, compression={"zlib": True, "complevel": 9, "shuffle": True}, monthly_requests=False, + concurrent_requests=False, ): """ Prepare all or a selection of features in a cutout. @@ -157,6 +166,9 @@ def cutout_prepare( If True, the data is requested on a monthly basis in ERA5. This is useful for large cutouts, where the data is requested in smaller chunks. The default is False + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. The default is False. Returns ------- @@ -190,6 +202,7 @@ def cutout_prepare( missing_features, tmpdir=tmpdir, monthly_requests=monthly_requests, + concurrent_requests=concurrent_requests, ) prepared |= set(missing_features) diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index 16d84abe..27c8c260 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -21,6 +21,7 @@ import pandas as pd import xarray as xr from numpy import atleast_1d +from dask import delayed, compute from atlite.gis import maybe_swap_spatial_dims from atlite.pv.solar_position import SolarPosition @@ -392,7 +393,13 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates): def get_data( - cutout, feature, tmpdir, lock=None, monthly_requests=False, **creation_parameters + cutout, + feature, + tmpdir, + lock=None, + monthly_requests=False, + concurrent_requests=False, + **creation_parameters, ): """ Retrieve data from ECMWFs ERA5 dataset (via CDS). @@ -412,6 +419,9 @@ def get_data( If True, the data is requested on a monthly basis in ERA5. This is useful for large cutouts, where the data is requested in smaller chunks. The default is False + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. **creation_parameters : Additional keyword arguments. The only effective argument is 'sanitize' (default True) which sets sanitization of the data on or off. @@ -448,8 +458,11 @@ def retrieve_once(time): if feature in static_features: return retrieve_once(retrieval_times(coords, static=True)).squeeze() - datasets = map( - retrieve_once, retrieval_times(coords, monthly_requests=monthly_requests) - ) + time_chunks = retrieval_times(coords, monthly_requests=monthly_requests) + if concurrent_requests: + delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks] + datasets = compute(*delayed_datasets) + else: + datasets = map(retrieve_once, time_chunks) return xr.concat(datasets, dim="time").sel(time=coords["time"]) diff --git a/atlite/datasets/gebco.py b/atlite/datasets/gebco.py index 13af7146..fa880ade 100755 --- a/atlite/datasets/gebco.py +++ b/atlite/datasets/gebco.py @@ -45,7 +45,7 @@ def get_data_gebco_height(xs, ys, gebco_path): ) -def get_data(cutout, feature, tmpdir, monthly_requests=False, **creation_parameters): +def get_data(cutout, feature, tmpdir, monthly_requests=False, concurrent_requests=False, **creation_parameters): """ Get the gebco height data. @@ -58,6 +58,8 @@ def get_data(cutout, feature, tmpdir, monthly_requests=False, **creation_paramet Takes no effect, only here for consistency with other dataset modules. monthly_requests : bool Takes no effect, only here for consistency with other dataset modules. + concurrent_requests : bool + Takes no effect, only here for consistency with other dataset modules. **creation_parameters : Must include `gebco_path`. diff --git a/atlite/datasets/sarah.py b/atlite/datasets/sarah.py index 8849f3f1..3aff621f 100644 --- a/atlite/datasets/sarah.py +++ b/atlite/datasets/sarah.py @@ -177,6 +177,8 @@ def get_data( `atlite.datasets.sarah.features` monthly_requests : bool Takes no effect, only here for consistency with other dataset modules. + concurrent_requests : bool + Takes no effect, only here for consistency with other dataset modules. **creation_parameters : Mandatory arguments are: * 'sarah_dir', str. Directory of the stored SARAH data.