Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

toggle for (concurrent) monthly ERA5 requests; progressbar disabled by default #372

Merged
merged 12 commits into from
Aug 23, 2024
Merged
10 changes: 9 additions & 1 deletion RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ Release Notes
Upcoming Release
================

- Adds option to toggle whether ERA5 downloads are requested in monthly or
annual chunks with keyword argument ``cutout.prepare(monthly_requests=True)``.
The default is now annual requests. The monthly requests can also be posted
concurrently using ``cutout.prepare(monthly_requests=True,
concurrent_requests=True)``.

- Improved parallelization of ``atlite.convert.build_line_rating`` by adding
keyword arguments for ``dask.compute`` (``dask_kwargs={}``) and an option to
disable the progressbar (``show_progressbar=True``).
disable the progressbar (``show_progress=False``).

- Default to ``show_progress=False`` for performance reasons.

Version 0.2.13
==============
Expand Down
10 changes: 5 additions & 5 deletions atlite/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def convert_and_aggregate(
return_capacity=False,
capacity_factor=False,
capacity_factor_timeseries=False,
show_progress=True,
show_progress=False,
dask_kwargs={},
**convert_kwds,
):
Expand Down Expand Up @@ -91,7 +91,7 @@ def convert_and_aggregate(
capacity_factor_timeseries : boolean
If True, the capacity factor time series of the chosen resource for
each grid cell is computed.
show_progress : boolean, default True
show_progress : boolean, default False
Whether to show a progress bar.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
Expand Down Expand Up @@ -882,7 +882,7 @@ def hydro(
hydrobasins,
flowspeed=1,
weight_with_height=False,
show_progress=True,
show_progress=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -1047,7 +1047,7 @@ def convert_line_rating(


def line_rating(
cutout, shapes, line_resistance, show_progress=True, dask_kwargs={}, **params
cutout, shapes, line_resistance, show_progress=False, dask_kwargs={}, **params
):
"""
Create a dynamic line rating time series based on the IEEE-738 standard.
Expand All @@ -1073,7 +1073,7 @@ def line_rating(
line_resistance : float/series
Resistance of the lines in Ohm/meter. Alternatively in p.u. system in
Ohm/1000km (see example below).
show_progress : boolean, default True
show_progress : boolean, default False
Whether to show a progress bar.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
Expand Down
51 changes: 46 additions & 5 deletions atlite/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
from atlite.datasets import modules as datamodules


def get_features(cutout, module, features, tmpdir=None):
def get_features(
cutout,
module,
features,
tmpdir=None,
monthly_requests=False,
concurrent_requests=False,
):
"""
Load the feature data for a given module.

Expand All @@ -39,7 +46,13 @@ def get_features(cutout, module, features, tmpdir=None):

for feature in features:
feature_data = delayed(get_data)(
cutout, feature, tmpdir=tmpdir, lock=lock, **parameters
cutout,
feature,
tmpdir=tmpdir,
lock=lock,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
**parameters,
)
datasets.append(feature_data)

Expand Down Expand Up @@ -115,6 +128,10 @@ def cutout_prepare(
tmpdir=None,
overwrite=False,
compression={"zlib": True, "complevel": 9, "shuffle": True},
show_progress=False,
dask_kwargs=None,
monthly_requests=False,
concurrent_requests=False,
):
"""
Prepare all or a selection of features in a cutout.
Expand Down Expand Up @@ -147,12 +164,26 @@ def cutout_prepare(
To efficiently reduce cutout sizes, specify the number of 'least_significant_digits': n here.
To disable compression, set "complevel" to None.
Default is {'zlib': True, 'complevel': 9, 'shuffle': True}.
show_progress : bool, optional
If True, a progress bar is shown. The default is False.
dask_kwargs : dict, default {}
Dict with keyword arguments passed to `dask.compute`.
monthly_requests : bool, optional
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True. The default is False.

Returns
-------
cutout : atlite.Cutout
Cutout with prepared data. The variables are stored in `cutout.data`.
"""
if dask_kwargs is None:
dask_kwargs = {}

if cutout.prepared and not overwrite:
logger.info("Cutout already prepared.")
return cutout
Expand All @@ -174,7 +205,14 @@ def cutout_prepare(
continue
logger.info(f"Calculating and writing with module {module}:")
missing_features = missing_vars.index.unique("feature")
ds = get_features(cutout, module, missing_features, tmpdir=tmpdir)
ds = get_features(
cutout,
module,
missing_features,
tmpdir=tmpdir,
monthly_requests=monthly_requests,
concurrent_requests=concurrent_requests,
)
prepared |= set(missing_features)

cutout.data.attrs.update(dict(prepared_features=list(prepared)))
Expand All @@ -198,8 +236,11 @@ def cutout_prepare(
# Delayed writing for large cutout
# cf. https://stackoverflow.com/questions/69810367/python-how-to-write-large-netcdf-with-xarray
write_job = ds.to_netcdf(tmp, compute=False)
with ProgressBar():
write_job.compute()
if show_progress:
with ProgressBar(minimum=2):
write_job.compute(**dask_kwargs)
else:
write_job.compute(**dask_kwargs)
if cutout.path.exists():
cutout.data.close()
cutout.path.unlink()
Expand Down
49 changes: 42 additions & 7 deletions atlite/datasets/era5.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import numpy as np
import pandas as pd
import xarray as xr
from dask import compute, delayed
from numpy import atleast_1d

from atlite.gis import maybe_swap_spatial_dims
Expand Down Expand Up @@ -274,7 +275,7 @@ def _area(coords):
return [y1, x0, y0, x1]


def retrieval_times(coords, static=False):
def retrieval_times(coords, static=False, monthly_requests=False):
"""
Get list of retrieval cdsapi arguments for time dimension in coordinates.

Expand All @@ -287,6 +288,11 @@ def retrieval_times(coords, static=False):
Parameters
----------
coords : atlite.Cutout.coords
static : bool, optional
monthly_requests : bool, optional
If True, the data is requested on a monthly basis. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False

Returns
-------
Expand All @@ -305,12 +311,21 @@ def retrieval_times(coords, static=False):
times = []
for year in time.year.unique():
t = time[time.year == year]
for month in t.month.unique():
if monthly_requests:
for month in t.month.unique():
query = {
"year": str(year),
"month": str(month),
"day": list(t[t.month == month].day.unique()),
"time": ["%02d:00" % h for h in t[t.month == month].hour.unique()],
}
times.append(query)
else:
query = {
"year": str(year),
"month": str(month),
"day": list(t[t.month == month].day.unique()),
"time": ["%02d:00" % h for h in t[t.month == month].hour.unique()],
"month": list(t.month.unique()),
"day": list(t.day.unique()),
"time": ["%02d:00" % h for h in t.hour.unique()],
}
times.append(query)
return times
Expand Down Expand Up @@ -377,7 +392,15 @@ def retrieve_data(product, chunks=None, tmpdir=None, lock=None, **updates):
return ds


def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
def get_data(
cutout,
feature,
tmpdir,
lock=None,
monthly_requests=False,
concurrent_requests=False,
**creation_parameters,
):
"""
Retrieve data from ECMWFs ERA5 dataset (via CDS).

Expand All @@ -392,6 +415,13 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
`atlite.datasets.era5.features`
tmpdir : str/Path
Directory where the temporary netcdf files are stored.
monthly_requests : bool, optional
If True, the data is requested on a monthly basis in ERA5. This is useful for
large cutouts, where the data is requested in smaller chunks. The
default is False
concurrent_requests : bool, optional
If True, the monthly data requests are posted concurrently.
Only has an effect if `monthly_requests` is True.
**creation_parameters :
Additional keyword arguments. The only effective argument is 'sanitize'
(default True) which sets sanitization of the data on or off.
Expand Down Expand Up @@ -428,6 +458,11 @@ def retrieve_once(time):
if feature in static_features:
return retrieve_once(retrieval_times(coords, static=True)).squeeze()

datasets = map(retrieve_once, retrieval_times(coords))
time_chunks = retrieval_times(coords, monthly_requests=monthly_requests)
if concurrent_requests:
delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks]
datasets = compute(*delayed_datasets)
else:
datasets = map(retrieve_once, time_chunks)

return xr.concat(datasets, dim="time").sel(time=coords["time"])
13 changes: 12 additions & 1 deletion atlite/datasets/gebco.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ def get_data_gebco_height(xs, ys, gebco_path):
)


def get_data(cutout, feature, tmpdir, **creation_parameters):
def get_data(
cutout,
feature,
tmpdir,
monthly_requests=False,
concurrent_requests=False,
**creation_parameters
):
"""
Get the gebco height data.

Expand All @@ -56,6 +63,10 @@ def get_data(cutout, feature, tmpdir, **creation_parameters):
Takes no effect, only here for consistency with other dataset modules.
tmpdir : str
Takes no effect, only here for consistency with other dataset modules.
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Must include `gebco_path`.

Expand Down
8 changes: 7 additions & 1 deletion atlite/datasets/sarah.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def hourly_mean(ds):
return ds


def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
def get_data(
cutout, feature, tmpdir, lock=None, monthly_requests=False, **creation_parameters
):
"""
Load stored SARAH data and reformat to matching the given cutout.

Expand All @@ -173,6 +175,10 @@ def get_data(cutout, feature, tmpdir, lock=None, **creation_parameters):
feature : str
Name of the feature data to retrieve. Must be in
`atlite.datasets.sarah.features`
monthly_requests : bool
Takes no effect, only here for consistency with other dataset modules.
concurrent_requests : bool
Takes no effect, only here for consistency with other dataset modules.
**creation_parameters :
Mandatory arguments are:
* 'sarah_dir', str. Directory of the stored SARAH data.
Expand Down
2 changes: 1 addition & 1 deletion atlite/gis.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ def _process_func(i):


def compute_availabilitymatrix(
cutout, shapes, excluder, nprocesses=None, disable_progressbar=False
cutout, shapes, excluder, nprocesses=None, disable_progressbar=True
):
"""
Compute the eligible share within cutout cells in the overlap with shapes.
Expand Down
4 changes: 2 additions & 2 deletions atlite/hydro.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def find_upstream_basins(meta, hid):
return hids


def determine_basins(plants, hydrobasins, show_progress=True):
def determine_basins(plants, hydrobasins, show_progress=False):
if isinstance(hydrobasins, str):
hydrobasins = gpd.read_file(hydrobasins)

Expand Down Expand Up @@ -81,7 +81,7 @@ def determine_basins(plants, hydrobasins, show_progress=True):


def shift_and_aggregate_runoff_for_plants(
basins, runoff, flowspeed=1, show_progress=True
basins, runoff, flowspeed=1, show_progress=False
):
inflow = xr.DataArray(
np.zeros((len(basins.plants), runoff.indexes["time"].size)),
Expand Down
7 changes: 7 additions & 0 deletions test/test_preparation_and_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,13 @@ def test_pv_era5_and_era5t(tmp_path_factory):
cutout, time=str(first_day_prev_month), skip_optimal_sum_test=True
)

@staticmethod
@pytest.mark.parametrize("concurrent_requests", [True, False])
def test_era5_monthly_requests(tmp_path_factory, concurrent_requests):
tmp_path = tmp_path_factory.mktemp("era5_mon")
cutout = Cutout(path=tmp_path / "era5", module="era5", bounds=BOUNDS, time=TIME)
cutout.prepare(monthly_requests=True, concurrent_requests=concurrent_requests)

@staticmethod
def test_wind_era5(cutout_era5):
return wind_test(cutout_era5)
Expand Down
Loading