From b1334c5931bed5b46b0007e9ca550fe5dacc938b Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Sun, 28 Apr 2024 10:09:19 +0100 Subject: [PATCH 01/14] Use pyproject --- .gitignore | 2 + docs/building/handling-missing-dates.rst | 2 + setup.py | 97 ------------------- {anemoi => src/anemoi}/datasets/__init__.py | 3 +- {anemoi => src/anemoi}/datasets/__main__.py | 0 .../anemoi}/datasets/commands/__init__.py | 0 .../anemoi}/datasets/commands/compare.py | 0 .../anemoi}/datasets/commands/copy.py | 0 .../anemoi}/datasets/commands/create.py | 0 .../datasets/commands/inspect/__init__.py | 0 .../anemoi}/datasets/commands/inspect/zarr.py | 0 .../anemoi}/datasets/commands/scan.py | 0 .../anemoi}/datasets/create/__init__.py | 0 .../anemoi}/datasets/create/check.py | 0 .../anemoi}/datasets/create/config.py | 0 .../datasets/create/functions/__init__.py | 0 .../create/functions/actions/__init__.py | 0 .../create/functions/actions/accumulations.py | 0 .../create/functions/actions/constants.py | 0 .../create/functions/actions/empty.py | 0 .../create/functions/actions/forcings.py | 0 .../datasets/create/functions/actions/grib.py | 0 .../datasets/create/functions/actions/mars.py | 0 .../create/functions/actions/netcdf.py | 0 .../create/functions/actions/opendap.py | 0 .../create/functions/actions/perturbations.py | 0 .../create/functions/actions/source.py | 0 .../create/functions/actions/tendencies.py | 0 .../create/functions/filters/__init__.py | 0 .../create/functions/filters/empty.py | 0 .../datasets/create/functions/filters/noop.py | 0 .../create/functions/filters/rename.py | 0 .../create/functions/filters/rotate_winds.py | 0 .../functions/filters/unrotate_winds.py | 0 .../anemoi}/datasets/create/input.py | 0 .../anemoi}/datasets/create/loaders.py | 0 .../anemoi}/datasets/create/patch.py | 0 .../anemoi}/datasets/create/statistics.py | 0 .../anemoi}/datasets/create/template.py | 0 .../anemoi}/datasets/create/utils.py | 0 .../anemoi}/datasets/create/writer.py | 0 .../anemoi}/datasets/create/zarr.py | 0 .../anemoi}/datasets/data/__init__.py | 0 .../anemoi}/datasets/data/concat.py | 0 .../anemoi}/datasets/data/dataset.py | 0 {anemoi => src/anemoi}/datasets/data/debug.py | 0 .../anemoi}/datasets/data/ensemble.py | 0 .../anemoi}/datasets/data/forewards.py | 0 {anemoi => src/anemoi}/datasets/data/grids.py | 0 .../anemoi}/datasets/data/indexing.py | 0 {anemoi => src/anemoi}/datasets/data/join.py | 0 .../anemoi}/datasets/data/masked.py | 0 {anemoi => src/anemoi}/datasets/data/misc.py | 0 .../anemoi}/datasets/data/select.py | 0 .../anemoi}/datasets/data/statistics.py | 0 .../anemoi}/datasets/data/stores.py | 0 .../anemoi}/datasets/data/subset.py | 0 .../anemoi}/datasets/data/unchecked.py | 0 .../anemoi}/datasets/dates/__init__.py | 0 .../anemoi}/datasets/dates/groups.py | 0 {anemoi => src/anemoi}/datasets/grids.py | 0 .../anemoi}/datasets/utils/__init__.py | 0 62 files changed, 5 insertions(+), 99 deletions(-) delete mode 100644 setup.py rename {anemoi => src/anemoi}/datasets/__init__.py (95%) rename {anemoi => src/anemoi}/datasets/__main__.py (100%) rename {anemoi => src/anemoi}/datasets/commands/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/commands/compare.py (100%) rename {anemoi => src/anemoi}/datasets/commands/copy.py (100%) rename {anemoi => src/anemoi}/datasets/commands/create.py (100%) rename {anemoi => src/anemoi}/datasets/commands/inspect/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/commands/inspect/zarr.py (100%) rename {anemoi => src/anemoi}/datasets/commands/scan.py (100%) rename {anemoi => src/anemoi}/datasets/create/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/create/check.py (100%) rename {anemoi => src/anemoi}/datasets/create/config.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/accumulations.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/constants.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/empty.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/forcings.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/grib.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/mars.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/netcdf.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/opendap.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/perturbations.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/source.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/actions/tendencies.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/empty.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/noop.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/rename.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/rotate_winds.py (100%) rename {anemoi => src/anemoi}/datasets/create/functions/filters/unrotate_winds.py (100%) rename {anemoi => src/anemoi}/datasets/create/input.py (100%) rename {anemoi => src/anemoi}/datasets/create/loaders.py (100%) rename {anemoi => src/anemoi}/datasets/create/patch.py (100%) rename {anemoi => src/anemoi}/datasets/create/statistics.py (100%) rename {anemoi => src/anemoi}/datasets/create/template.py (100%) rename {anemoi => src/anemoi}/datasets/create/utils.py (100%) rename {anemoi => src/anemoi}/datasets/create/writer.py (100%) rename {anemoi => src/anemoi}/datasets/create/zarr.py (100%) rename {anemoi => src/anemoi}/datasets/data/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/data/concat.py (100%) rename {anemoi => src/anemoi}/datasets/data/dataset.py (100%) rename {anemoi => src/anemoi}/datasets/data/debug.py (100%) rename {anemoi => src/anemoi}/datasets/data/ensemble.py (100%) rename {anemoi => src/anemoi}/datasets/data/forewards.py (100%) rename {anemoi => src/anemoi}/datasets/data/grids.py (100%) rename {anemoi => src/anemoi}/datasets/data/indexing.py (100%) rename {anemoi => src/anemoi}/datasets/data/join.py (100%) rename {anemoi => src/anemoi}/datasets/data/masked.py (100%) rename {anemoi => src/anemoi}/datasets/data/misc.py (100%) rename {anemoi => src/anemoi}/datasets/data/select.py (100%) rename {anemoi => src/anemoi}/datasets/data/statistics.py (100%) rename {anemoi => src/anemoi}/datasets/data/stores.py (100%) rename {anemoi => src/anemoi}/datasets/data/subset.py (100%) rename {anemoi => src/anemoi}/datasets/data/unchecked.py (100%) rename {anemoi => src/anemoi}/datasets/dates/__init__.py (100%) rename {anemoi => src/anemoi}/datasets/dates/groups.py (100%) rename {anemoi => src/anemoi}/datasets/grids.py (100%) rename {anemoi => src/anemoi}/datasets/utils/__init__.py (100%) diff --git a/.gitignore b/.gitignore index fdc24f30..6901caac 100644 --- a/.gitignore +++ b/.gitignore @@ -159,6 +159,8 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +_version.py + *.grib *.onnx *.ckpt diff --git a/docs/building/handling-missing-dates.rst b/docs/building/handling-missing-dates.rst index 3d68b377..0473cd11 100644 --- a/docs/building/handling-missing-dates.rst +++ b/docs/building/handling-missing-dates.rst @@ -2,6 +2,8 @@ Handling missing dates ######################## +By default, the package will raise an error if there are missing dates. + Missing dates can be handled by specifying a list of dates in the configuration file. The dates should be in the same format as the dates in the time series. The missing dates will be filled ``np.nan`` values. diff --git a/setup.py b/setup.py deleted file mode 100644 index 6f02b4d3..00000000 --- a/setup.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -# (C) Copyright 2023 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. -# - - -import io -import os - -import setuptools - - -def read(fname): - file_path = os.path.join(os.path.dirname(__file__), fname) - return io.open(file_path, encoding="utf-8").read() - - -version = None -for line in read("anemoi/datasets/__init__.py").split("\n"): - if line.startswith("__version__"): - version = line.split("=")[-1].strip()[1:-1] - - -assert version - - -data_requires = [ - "anemoi-utils[provenance]", - "zarr", - "pyyaml", - "numpy", - "tqdm", - "semantic-version", -] - -remote_requires = [ - "boto3", - "requests", - "s3fs", # prepml copy only -] - - -create_requires = [ - "zarr", - "numpy", - "tqdm", - "climetlab", # "earthkit-data" - "earthkit-meteo", - "pyproj", - "ecmwflibs>=0.6.3", -] - - -all_requires = data_requires + create_requires + remote_requires -dev_requires = ["sphinx", "sphinx_rtd_theme", "nbsphinx", "pandoc"] + all_requires - -setuptools.setup( - name="anemoi-datasets", - version=version, - description="A package to hold various functions to support training of ML models on ECMWF data.", - long_description=read("README.md"), - long_description_content_type="text/markdown", - author="European Centre for Medium-Range Weather Forecasts (ECMWF)", - author_email="software.support@ecmwf.int", - license="Apache License Version 2.0", - url="https://github.com/ecmwf/anemoi-datasets", - packages=setuptools.find_namespace_packages(include=["anemoi.*"]), - include_package_data=True, - install_requires=data_requires, - extras_require={ - "data": [], - "remote": data_requires + remote_requires, - "create": create_requires, - "dev": dev_requires, - "all": all_requires, - }, - zip_safe=True, - keywords="tool", - classifiers=[ - "Development Status :: 3 - Alpha", - "Intended Audience :: Developers", - "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", - "Operating System :: OS Independent", - ], - entry_points={"console_scripts": ["anemoi-datasets=anemoi.datasets.__main__:main"]}, -) diff --git a/anemoi/datasets/__init__.py b/src/anemoi/datasets/__init__.py similarity index 95% rename from anemoi/datasets/__init__.py rename to src/anemoi/datasets/__init__.py index fb99d284..0e29a95b 100644 --- a/anemoi/datasets/__init__.py +++ b/src/anemoi/datasets/__init__.py @@ -5,13 +5,12 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. +from ._version import __version__ from .data import MissingDateError from .data import add_dataset_path from .data import add_named_dataset from .data import open_dataset -__version__ = "0.1.2" - __all__ = [ "open_dataset", "MissingDateError", diff --git a/anemoi/datasets/__main__.py b/src/anemoi/datasets/__main__.py similarity index 100% rename from anemoi/datasets/__main__.py rename to src/anemoi/datasets/__main__.py diff --git a/anemoi/datasets/commands/__init__.py b/src/anemoi/datasets/commands/__init__.py similarity index 100% rename from anemoi/datasets/commands/__init__.py rename to src/anemoi/datasets/commands/__init__.py diff --git a/anemoi/datasets/commands/compare.py b/src/anemoi/datasets/commands/compare.py similarity index 100% rename from anemoi/datasets/commands/compare.py rename to src/anemoi/datasets/commands/compare.py diff --git a/anemoi/datasets/commands/copy.py b/src/anemoi/datasets/commands/copy.py similarity index 100% rename from anemoi/datasets/commands/copy.py rename to src/anemoi/datasets/commands/copy.py diff --git a/anemoi/datasets/commands/create.py b/src/anemoi/datasets/commands/create.py similarity index 100% rename from anemoi/datasets/commands/create.py rename to src/anemoi/datasets/commands/create.py diff --git a/anemoi/datasets/commands/inspect/__init__.py b/src/anemoi/datasets/commands/inspect/__init__.py similarity index 100% rename from anemoi/datasets/commands/inspect/__init__.py rename to src/anemoi/datasets/commands/inspect/__init__.py diff --git a/anemoi/datasets/commands/inspect/zarr.py b/src/anemoi/datasets/commands/inspect/zarr.py similarity index 100% rename from anemoi/datasets/commands/inspect/zarr.py rename to src/anemoi/datasets/commands/inspect/zarr.py diff --git a/anemoi/datasets/commands/scan.py b/src/anemoi/datasets/commands/scan.py similarity index 100% rename from anemoi/datasets/commands/scan.py rename to src/anemoi/datasets/commands/scan.py diff --git a/anemoi/datasets/create/__init__.py b/src/anemoi/datasets/create/__init__.py similarity index 100% rename from anemoi/datasets/create/__init__.py rename to src/anemoi/datasets/create/__init__.py diff --git a/anemoi/datasets/create/check.py b/src/anemoi/datasets/create/check.py similarity index 100% rename from anemoi/datasets/create/check.py rename to src/anemoi/datasets/create/check.py diff --git a/anemoi/datasets/create/config.py b/src/anemoi/datasets/create/config.py similarity index 100% rename from anemoi/datasets/create/config.py rename to src/anemoi/datasets/create/config.py diff --git a/anemoi/datasets/create/functions/__init__.py b/src/anemoi/datasets/create/functions/__init__.py similarity index 100% rename from anemoi/datasets/create/functions/__init__.py rename to src/anemoi/datasets/create/functions/__init__.py diff --git a/anemoi/datasets/create/functions/actions/__init__.py b/src/anemoi/datasets/create/functions/actions/__init__.py similarity index 100% rename from anemoi/datasets/create/functions/actions/__init__.py rename to src/anemoi/datasets/create/functions/actions/__init__.py diff --git a/anemoi/datasets/create/functions/actions/accumulations.py b/src/anemoi/datasets/create/functions/actions/accumulations.py similarity index 100% rename from anemoi/datasets/create/functions/actions/accumulations.py rename to src/anemoi/datasets/create/functions/actions/accumulations.py diff --git a/anemoi/datasets/create/functions/actions/constants.py b/src/anemoi/datasets/create/functions/actions/constants.py similarity index 100% rename from anemoi/datasets/create/functions/actions/constants.py rename to src/anemoi/datasets/create/functions/actions/constants.py diff --git a/anemoi/datasets/create/functions/actions/empty.py b/src/anemoi/datasets/create/functions/actions/empty.py similarity index 100% rename from anemoi/datasets/create/functions/actions/empty.py rename to src/anemoi/datasets/create/functions/actions/empty.py diff --git a/anemoi/datasets/create/functions/actions/forcings.py b/src/anemoi/datasets/create/functions/actions/forcings.py similarity index 100% rename from anemoi/datasets/create/functions/actions/forcings.py rename to src/anemoi/datasets/create/functions/actions/forcings.py diff --git a/anemoi/datasets/create/functions/actions/grib.py b/src/anemoi/datasets/create/functions/actions/grib.py similarity index 100% rename from anemoi/datasets/create/functions/actions/grib.py rename to src/anemoi/datasets/create/functions/actions/grib.py diff --git a/anemoi/datasets/create/functions/actions/mars.py b/src/anemoi/datasets/create/functions/actions/mars.py similarity index 100% rename from anemoi/datasets/create/functions/actions/mars.py rename to src/anemoi/datasets/create/functions/actions/mars.py diff --git a/anemoi/datasets/create/functions/actions/netcdf.py b/src/anemoi/datasets/create/functions/actions/netcdf.py similarity index 100% rename from anemoi/datasets/create/functions/actions/netcdf.py rename to src/anemoi/datasets/create/functions/actions/netcdf.py diff --git a/anemoi/datasets/create/functions/actions/opendap.py b/src/anemoi/datasets/create/functions/actions/opendap.py similarity index 100% rename from anemoi/datasets/create/functions/actions/opendap.py rename to src/anemoi/datasets/create/functions/actions/opendap.py diff --git a/anemoi/datasets/create/functions/actions/perturbations.py b/src/anemoi/datasets/create/functions/actions/perturbations.py similarity index 100% rename from anemoi/datasets/create/functions/actions/perturbations.py rename to src/anemoi/datasets/create/functions/actions/perturbations.py diff --git a/anemoi/datasets/create/functions/actions/source.py b/src/anemoi/datasets/create/functions/actions/source.py similarity index 100% rename from anemoi/datasets/create/functions/actions/source.py rename to src/anemoi/datasets/create/functions/actions/source.py diff --git a/anemoi/datasets/create/functions/actions/tendencies.py b/src/anemoi/datasets/create/functions/actions/tendencies.py similarity index 100% rename from anemoi/datasets/create/functions/actions/tendencies.py rename to src/anemoi/datasets/create/functions/actions/tendencies.py diff --git a/anemoi/datasets/create/functions/filters/__init__.py b/src/anemoi/datasets/create/functions/filters/__init__.py similarity index 100% rename from anemoi/datasets/create/functions/filters/__init__.py rename to src/anemoi/datasets/create/functions/filters/__init__.py diff --git a/anemoi/datasets/create/functions/filters/empty.py b/src/anemoi/datasets/create/functions/filters/empty.py similarity index 100% rename from anemoi/datasets/create/functions/filters/empty.py rename to src/anemoi/datasets/create/functions/filters/empty.py diff --git a/anemoi/datasets/create/functions/filters/noop.py b/src/anemoi/datasets/create/functions/filters/noop.py similarity index 100% rename from anemoi/datasets/create/functions/filters/noop.py rename to src/anemoi/datasets/create/functions/filters/noop.py diff --git a/anemoi/datasets/create/functions/filters/rename.py b/src/anemoi/datasets/create/functions/filters/rename.py similarity index 100% rename from anemoi/datasets/create/functions/filters/rename.py rename to src/anemoi/datasets/create/functions/filters/rename.py diff --git a/anemoi/datasets/create/functions/filters/rotate_winds.py b/src/anemoi/datasets/create/functions/filters/rotate_winds.py similarity index 100% rename from anemoi/datasets/create/functions/filters/rotate_winds.py rename to src/anemoi/datasets/create/functions/filters/rotate_winds.py diff --git a/anemoi/datasets/create/functions/filters/unrotate_winds.py b/src/anemoi/datasets/create/functions/filters/unrotate_winds.py similarity index 100% rename from anemoi/datasets/create/functions/filters/unrotate_winds.py rename to src/anemoi/datasets/create/functions/filters/unrotate_winds.py diff --git a/anemoi/datasets/create/input.py b/src/anemoi/datasets/create/input.py similarity index 100% rename from anemoi/datasets/create/input.py rename to src/anemoi/datasets/create/input.py diff --git a/anemoi/datasets/create/loaders.py b/src/anemoi/datasets/create/loaders.py similarity index 100% rename from anemoi/datasets/create/loaders.py rename to src/anemoi/datasets/create/loaders.py diff --git a/anemoi/datasets/create/patch.py b/src/anemoi/datasets/create/patch.py similarity index 100% rename from anemoi/datasets/create/patch.py rename to src/anemoi/datasets/create/patch.py diff --git a/anemoi/datasets/create/statistics.py b/src/anemoi/datasets/create/statistics.py similarity index 100% rename from anemoi/datasets/create/statistics.py rename to src/anemoi/datasets/create/statistics.py diff --git a/anemoi/datasets/create/template.py b/src/anemoi/datasets/create/template.py similarity index 100% rename from anemoi/datasets/create/template.py rename to src/anemoi/datasets/create/template.py diff --git a/anemoi/datasets/create/utils.py b/src/anemoi/datasets/create/utils.py similarity index 100% rename from anemoi/datasets/create/utils.py rename to src/anemoi/datasets/create/utils.py diff --git a/anemoi/datasets/create/writer.py b/src/anemoi/datasets/create/writer.py similarity index 100% rename from anemoi/datasets/create/writer.py rename to src/anemoi/datasets/create/writer.py diff --git a/anemoi/datasets/create/zarr.py b/src/anemoi/datasets/create/zarr.py similarity index 100% rename from anemoi/datasets/create/zarr.py rename to src/anemoi/datasets/create/zarr.py diff --git a/anemoi/datasets/data/__init__.py b/src/anemoi/datasets/data/__init__.py similarity index 100% rename from anemoi/datasets/data/__init__.py rename to src/anemoi/datasets/data/__init__.py diff --git a/anemoi/datasets/data/concat.py b/src/anemoi/datasets/data/concat.py similarity index 100% rename from anemoi/datasets/data/concat.py rename to src/anemoi/datasets/data/concat.py diff --git a/anemoi/datasets/data/dataset.py b/src/anemoi/datasets/data/dataset.py similarity index 100% rename from anemoi/datasets/data/dataset.py rename to src/anemoi/datasets/data/dataset.py diff --git a/anemoi/datasets/data/debug.py b/src/anemoi/datasets/data/debug.py similarity index 100% rename from anemoi/datasets/data/debug.py rename to src/anemoi/datasets/data/debug.py diff --git a/anemoi/datasets/data/ensemble.py b/src/anemoi/datasets/data/ensemble.py similarity index 100% rename from anemoi/datasets/data/ensemble.py rename to src/anemoi/datasets/data/ensemble.py diff --git a/anemoi/datasets/data/forewards.py b/src/anemoi/datasets/data/forewards.py similarity index 100% rename from anemoi/datasets/data/forewards.py rename to src/anemoi/datasets/data/forewards.py diff --git a/anemoi/datasets/data/grids.py b/src/anemoi/datasets/data/grids.py similarity index 100% rename from anemoi/datasets/data/grids.py rename to src/anemoi/datasets/data/grids.py diff --git a/anemoi/datasets/data/indexing.py b/src/anemoi/datasets/data/indexing.py similarity index 100% rename from anemoi/datasets/data/indexing.py rename to src/anemoi/datasets/data/indexing.py diff --git a/anemoi/datasets/data/join.py b/src/anemoi/datasets/data/join.py similarity index 100% rename from anemoi/datasets/data/join.py rename to src/anemoi/datasets/data/join.py diff --git a/anemoi/datasets/data/masked.py b/src/anemoi/datasets/data/masked.py similarity index 100% rename from anemoi/datasets/data/masked.py rename to src/anemoi/datasets/data/masked.py diff --git a/anemoi/datasets/data/misc.py b/src/anemoi/datasets/data/misc.py similarity index 100% rename from anemoi/datasets/data/misc.py rename to src/anemoi/datasets/data/misc.py diff --git a/anemoi/datasets/data/select.py b/src/anemoi/datasets/data/select.py similarity index 100% rename from anemoi/datasets/data/select.py rename to src/anemoi/datasets/data/select.py diff --git a/anemoi/datasets/data/statistics.py b/src/anemoi/datasets/data/statistics.py similarity index 100% rename from anemoi/datasets/data/statistics.py rename to src/anemoi/datasets/data/statistics.py diff --git a/anemoi/datasets/data/stores.py b/src/anemoi/datasets/data/stores.py similarity index 100% rename from anemoi/datasets/data/stores.py rename to src/anemoi/datasets/data/stores.py diff --git a/anemoi/datasets/data/subset.py b/src/anemoi/datasets/data/subset.py similarity index 100% rename from anemoi/datasets/data/subset.py rename to src/anemoi/datasets/data/subset.py diff --git a/anemoi/datasets/data/unchecked.py b/src/anemoi/datasets/data/unchecked.py similarity index 100% rename from anemoi/datasets/data/unchecked.py rename to src/anemoi/datasets/data/unchecked.py diff --git a/anemoi/datasets/dates/__init__.py b/src/anemoi/datasets/dates/__init__.py similarity index 100% rename from anemoi/datasets/dates/__init__.py rename to src/anemoi/datasets/dates/__init__.py diff --git a/anemoi/datasets/dates/groups.py b/src/anemoi/datasets/dates/groups.py similarity index 100% rename from anemoi/datasets/dates/groups.py rename to src/anemoi/datasets/dates/groups.py diff --git a/anemoi/datasets/grids.py b/src/anemoi/datasets/grids.py similarity index 100% rename from anemoi/datasets/grids.py rename to src/anemoi/datasets/grids.py diff --git a/anemoi/datasets/utils/__init__.py b/src/anemoi/datasets/utils/__init__.py similarity index 100% rename from anemoi/datasets/utils/__init__.py rename to src/anemoi/datasets/utils/__init__.py From f45907928f305190dc895ea63172c0c7ed0ebf31 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Sun, 28 Apr 2024 10:15:13 +0100 Subject: [PATCH 02/14] Use pyproject --- pyproject.toml | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++ tox.ini | 16 --------- 2 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 pyproject.toml delete mode 100644 tox.ini diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..cf9906dd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# (C) Copyright 2024 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +# https://packaging.python.org/en/latest/guides/writing-pyproject-toml/ + +[build-system] +requires = ["setuptools>=60", "setuptools-scm>=8.0"] + +[project] +description = "A package to hold various functions to support training of ML models on ECMWF data." +name = "anemoi-dataset" + +dynamic = ["version"] +license = { file = "LICENSE" } +requires-python = ">=3.9" + +authors = [ + { name = "European Centre for Medium-Range Weather Forecasts (ECMWF)", email = "software.support@ecmwf.int" }, +] + +keywords = ["tools", "datasets", "ai"] + +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Operating System :: OS Independent", +] + +dependencies = [ + "anemoi-utils[provenance]", + "zarr", + "pyyaml", + "numpy", + "tqdm", + "semantic-version", +] + +[project.optional-dependencies] +remote = ["boto3", "requests", "s3fs"] + +create = [ + "climetlab", # "earthkit-data" + "earthkit-meteo", + "pyproj", + "ecmwflibs>=0.6.3", +] + +docs = ["sphinx", "sphinx_rtd_theme", "nbsphinx", "pandoc"] + +all = [ + "boto3", + "requests", + "s3fs", + "climetlab", # "earthkit-data" + "earthkit-meteo", + "pyproj", + "ecmwflibs>=0.6.3", +] + +dev = [ + "boto3", + "requests", + "s3fs", + "climetlab", # "earthkit-data" + "earthkit-meteo", + "pyproj", + "ecmwflibs>=0.6.3", + "sphinx", + "sphinx_rtd_theme", + "nbsphinx", + "pandoc", +] + +[project.urls] +Homepage = "https://github.com/ecmwf/anemoi-datasets/" +Documentation = "https://anemoi-datasets.readthedocs.io/" +Repository = "https://github.com/ecmwf/anemoi-datasets/" +Issues = "https://github.com/ecmwf/anemoi-datasets/issues" +# Changelog = "https://github.com/ecmwf/anemoi-datasets/CHANGELOG.md" + +[project.scripts] +anemoi-datasets = "anemoi.datasets.__main__:main" + +[tool.setuptools_scm] +version_file = "src/anemoi/datasets/_version.py" diff --git a/tox.ini b/tox.ini deleted file mode 100644 index cb1b3118..00000000 --- a/tox.ini +++ /dev/null @@ -1,16 +0,0 @@ -[isort] -profile = black - -[flake8] -; ignore = E226,E302,E41 -max-line-length = 120 -; exclude = tests/* -; See https://black.readthedocs.io/en/stable/the_black_code_style.html -exclude = - dev/* - experiments - ?.py -extend-ignore = E203, E721 - -[black] -line-length= 120 From 32f649b44ae70115f12b47bce11618fec12c9054 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Mon, 29 Apr 2024 10:59:59 +0000 Subject: [PATCH 03/14] fix but in missing dates --- anemoi/datasets/data/subset.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/anemoi/datasets/data/subset.py b/anemoi/datasets/data/subset.py index cf4547f8..571f6c8d 100644 --- a/anemoi/datasets/data/subset.py +++ b/anemoi/datasets/data/subset.py @@ -100,7 +100,12 @@ def __repr__(self): @cached_property def missing(self): - return {self.indices[i] for i in self.dataset.missing if i in self.indices} + missing = self.dataset.missing + result = set() + for j, i in enumerate(self.indices): + if i in missing: + result.add(j) + return result def tree(self): return Node(self, [self.dataset.tree()], **self.reason) From 277beeff4b5638c0a1df59277070e7fec6aa385c Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Mon, 29 Apr 2024 17:33:36 +0100 Subject: [PATCH 04/14] use pyproject.toml --- .github/workflows/python-publish.yml | 46 ++-------------------------- .readthedocs.yaml | 8 +++-- docs/conf.py | 2 +- 3 files changed, 10 insertions(+), 46 deletions(-) diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 1bbc9a1e..32074d93 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -49,38 +49,6 @@ jobs: - name: Tests run: pytest - # notify-failure: - # if: failure() - # runs-on: ubuntu-latest - # needs: [quality, checks] - # name: Notify failure - # steps: - # - uses: jdcargile/ms-teams-notification@v1.4 - # with: - # github-token: ${{ github.token }} - # ms-teams-webhook-uri: ${{ secrets.MS_TEAMS_WEBHOOK_URI_F }} - # # notification-summary: ${{ steps.qa.outputs.status }} - # notification-summary: ❌ Build failed on anemoi.datasets! - # notification-color: dc3545 - # timezone: Europe/Paris - # verbose-logging: true - - # notify-success: - # if: success() - # runs-on: ubuntu-latest - # needs: [quality, checks] - # name: Notify success - # steps: - # - uses: jdcargile/ms-teams-notification@v1.4 - # with: - # github-token: ${{ github.token }} - # ms-teams-webhook-uri: ${{ secrets.MS_TEAMS_WEBHOOK_URI_F }} - # # notification-summary: ${{ steps.qa.outputs.status }} - # notification-summary: ✅ New commit on anemoi.datasets - # notification-color: 17a2b8 - # timezone: Europe/Paris - # verbose-logging: true - deploy: if: ${{ github.event_name == 'release' }} @@ -93,24 +61,16 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: '3.10' - - - name: Check that tag version matches code version - run: | - tag=${GITHUB_REF#refs/tags/} - version=$(python setup.py --version) - echo 'tag='$tag - echo "version file="$version - test "$tag" == "$version" + python-version: 3.x - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel twine + pip install build wheel twine - name: Build and publish env: TWINE_USERNAME: __token__ TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} run: | - python setup.py sdist + python -m build twine upload dist/* diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 45958d1f..c03429e5 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -3,11 +3,15 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.10" + python: "3.11" sphinx: configuration: docs/conf.py python: install: - - requirements: docs/requirements.txt + - requirements: docs/requirements.txt + - method: pip + path: . + extra_requirements: + - docs diff --git a/docs/conf.py b/docs/conf.py index 62a1f298..c7d40d58 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,7 @@ # # import os # import sys -# sys.path.insert(0, os.path.abspath('.')) +# sys.path.insert(0, os.path.join(os.path.abspath('.'), 'src')) import datetime import os From 2aeecbbfed68be01dd3b901b0329b8c06d66234b Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Mon, 29 Apr 2024 18:16:49 +0100 Subject: [PATCH 05/14] tidy --- .pre-commit-config.yaml | 27 ++++--------------- .../datasets/commands/inspect/__init__.py | 2 +- .../create/functions/filters/rotate_winds.py | 4 +-- .../functions/filters/unrotate_winds.py | 4 +-- src/anemoi/datasets/create/input.py | 3 ++- src/anemoi/datasets/create/loaders.py | 3 +-- src/anemoi/datasets/create/utils.py | 3 +-- src/anemoi/datasets/data/indexing.py | 19 ++++--------- src/anemoi/datasets/data/stores.py | 6 +++-- src/anemoi/datasets/data/unchecked.py | 4 +-- src/anemoi/datasets/dates/groups.py | 3 +-- src/anemoi/datasets/grids.py | 8 ++---- tests/test_data.py | 3 ++- 13 files changed, 27 insertions(+), 62 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 05d460cb..cfcfcc8e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -51,23 +51,6 @@ repos: - --exit-non-zero-on-fix - --preview -# - repo: https://github.com/thclark/pre-commit-sphinx -# rev: 0.0.1 -# hooks: -# - id: build-docs -# additional_dependencies: -# - sphinx -# - sphinx_rtd_theme -# - nbsphinx -# - pandoc -# args: -# - --cache-dir -# - docs/_build/doctrees -# - --html-dir -# - docs/_build/html -# - --source-dir -# - docs -# language_version: python3 - repo: https://github.com/sphinx-contrib/sphinx-lint rev: v0.9.1 @@ -80,8 +63,8 @@ repos: hooks: - id: rstfmt -# - repo: https://github.com/b8raoult/pre-commit-docconvert -# rev: "0.1.0" -# hooks: -# - id: docconvert -# args: ["-o", "numpy"] +- repo: https://github.com/b8raoult/pre-commit-docconvert + rev: "0.1.4" + hooks: + - id: docconvert + args: ["numpy"] diff --git a/src/anemoi/datasets/commands/inspect/__init__.py b/src/anemoi/datasets/commands/inspect/__init__.py index d62ab192..d9937118 100644 --- a/src/anemoi/datasets/commands/inspect/__init__.py +++ b/src/anemoi/datasets/commands/inspect/__init__.py @@ -9,9 +9,9 @@ import os from .. import Command +from .zarr import InspectZarr # from .checkpoint import InspectCheckpoint -from .zarr import InspectZarr class Inspect(Command, InspectZarr): diff --git a/src/anemoi/datasets/create/functions/filters/rotate_winds.py b/src/anemoi/datasets/create/functions/filters/rotate_winds.py index a41e2d78..f8ca190a 100644 --- a/src/anemoi/datasets/create/functions/filters/rotate_winds.py +++ b/src/anemoi/datasets/create/functions/filters/rotate_winds.py @@ -13,9 +13,7 @@ def rotate_winds(lats, lons, x_wind, y_wind, source_projection, target_projection): - """ - Code provided by MetNO - """ + """Code provided by MetNO""" import numpy as np import pyproj diff --git a/src/anemoi/datasets/create/functions/filters/unrotate_winds.py b/src/anemoi/datasets/create/functions/filters/unrotate_winds.py index c5fcde08..074d0806 100644 --- a/src/anemoi/datasets/create/functions/filters/unrotate_winds.py +++ b/src/anemoi/datasets/create/functions/filters/unrotate_winds.py @@ -78,9 +78,7 @@ def __getattr__(self, name): def execute(context, input, u, v): - """ - Unrotate the wind components of a GRIB file. - """ + """Unrotate the wind components of a GRIB file.""" result = FieldArray() wind_params = (u, v) diff --git a/src/anemoi/datasets/create/input.py b/src/anemoi/datasets/create/input.py index 68854666..d06e0105 100644 --- a/src/anemoi/datasets/create/input.py +++ b/src/anemoi/datasets/create/input.py @@ -869,7 +869,8 @@ def step_factory(config, context, action_path, previous_step): class FunctionContext: """A FunctionContext is passed to all functions, it will be used to pass information - to the functions from the other actions and filters and results.""" + to the functions from the other actions and filters and results. + """ def __init__(self, owner): self.owner = owner diff --git a/src/anemoi/datasets/create/loaders.py b/src/anemoi/datasets/create/loaders.py index f20f230c..499c29de 100644 --- a/src/anemoi/datasets/create/loaders.py +++ b/src/anemoi/datasets/create/loaders.py @@ -42,8 +42,7 @@ def default_statistics_dates(dates): - """ - Calculate default statistics dates based on the given list of dates. + """Calculate default statistics dates based on the given list of dates. Args: dates (list): List of datetime objects representing dates. diff --git a/src/anemoi/datasets/create/utils.py b/src/anemoi/datasets/create/utils.py index 8579038f..aa0e8137 100644 --- a/src/anemoi/datasets/create/utils.py +++ b/src/anemoi/datasets/create/utils.py @@ -31,8 +31,7 @@ def no_cache_context(): def bytes(n): - """ - >>> bytes(4096) + """>>> bytes(4096) '4 KiB' >>> bytes(4000) '3.9 KiB' diff --git a/src/anemoi/datasets/data/indexing.py b/src/anemoi/datasets/data/indexing.py index dfb123cc..f51ae2fa 100644 --- a/src/anemoi/datasets/data/indexing.py +++ b/src/anemoi/datasets/data/indexing.py @@ -11,9 +11,7 @@ def _tuple_with_slices(t, shape): - """ - Replace all integers in a tuple with slices, so we preserve the dimensionality. - """ + """Replace all integers in a tuple with slices, so we preserve the dimensionality.""" result = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in t) changes = tuple(j for (j, i) in enumerate(t) if isinstance(i, int)) @@ -52,9 +50,7 @@ def _index_to_tuple(index, shape): def index_to_slices(index, shape): - """ - Convert an index to a tuple of slices, with the same dimensionality as the shape. - """ + """Convert an index to a tuple of slices, with the same dimensionality as the shape.""" return _tuple_with_slices(_index_to_tuple(index, shape), shape) @@ -68,9 +64,7 @@ def apply_index_to_slices_changes(result, changes): def update_tuple(t, index, value): - """ - Replace the elements of a tuple at the given index with a new value. - """ + """Replace the elements of a tuple at the given index with a new value.""" t = list(t) prev = t[index] t[index] = value @@ -78,9 +72,7 @@ def update_tuple(t, index, value): def length_to_slices(index, lengths): - """ - Convert an index to a list of slices, given the lengths of the dimensions. - """ + """Convert an index to a list of slices, given the lengths of the dimensions.""" total = sum(lengths) start, stop, step = index.indices(total) @@ -127,8 +119,7 @@ def _(i): def expand_list_indexing(method): - """ - Allows to use slices, lists, and tuples to select data from the dataset. + """Allows to use slices, lists, and tuples to select data from the dataset. Zarr does not support indexing with lists/arrays directly, so we need to implement it ourselves. """ diff --git a/src/anemoi/datasets/data/stores.py b/src/anemoi/datasets/data/stores.py index a9311bbd..698838e7 100644 --- a/src/anemoi/datasets/data/stores.py +++ b/src/anemoi/datasets/data/stores.py @@ -40,7 +40,8 @@ def __iter__(self): class HTTPStore(ReadOnlyStore): """We write our own HTTPStore because the one used by zarr (fsspec) does not play - well with fork() and multiprocessing.""" + well with fork() and multiprocessing. + """ def __init__(self, url): self.url = url @@ -59,7 +60,8 @@ def __getitem__(self, key): class S3Store(ReadOnlyStore): """We write our own S3Store because the one used by zarr (fsspec) does not play well - with fork() and multiprocessing.""" + with fork() and multiprocessing. + """ def __init__(self, url): import boto3 diff --git a/src/anemoi/datasets/data/unchecked.py b/src/anemoi/datasets/data/unchecked.py index f041bd77..7d9b02b1 100644 --- a/src/anemoi/datasets/data/unchecked.py +++ b/src/anemoi/datasets/data/unchecked.py @@ -131,9 +131,7 @@ def missing(self): class Chain(ConcatMixin, Unchecked): - """ - Same as Concat, but with no checks - """ + """Same as Concat, but with no checks""" def __len__(self): return sum(len(d) for d in self.datasets) diff --git a/src/anemoi/datasets/dates/groups.py b/src/anemoi/datasets/dates/groups.py index 8233054a..a83e789b 100644 --- a/src/anemoi/datasets/dates/groups.py +++ b/src/anemoi/datasets/dates/groups.py @@ -13,8 +13,7 @@ class Groups: - """ - >>> list(Groups(group_by="daily", start="2023-01-01 00:00", end="2023-01-05 00:00", frequency=12))[0] + """>>> list(Groups(group_by="daily", start="2023-01-01 00:00", end="2023-01-05 00:00", frequency=12))[0] [datetime.datetime(2023, 1, 1, 0, 0), datetime.datetime(2023, 1, 1, 12, 0)] >>> list(Groups(group_by="daily", start="2023-01-01 00:00", end="2023-01-05 00:00", frequency=12))[1] diff --git a/src/anemoi/datasets/grids.py b/src/anemoi/datasets/grids.py index ff36ed07..1646eedb 100644 --- a/src/anemoi/datasets/grids.py +++ b/src/anemoi/datasets/grids.py @@ -124,9 +124,7 @@ def cutout_mask( min_distance_km=None, plot=None, ): - """ - Return a mask for the points in [global_lats, global_lons] that are inside of [lats, lons] - """ + """Return a mask for the points in [global_lats, global_lons] that are inside of [lats, lons]""" from scipy.spatial import KDTree # TODO: transform min_distance from lat/lon to xyz @@ -235,9 +233,7 @@ def thinning_mask( global_lons, cropping_distance=2.0, ): - """ - Return the list of points in [lats, lons] closest to [global_lats, global_lons] - """ + """Return the list of points in [lats, lons] closest to [global_lats, global_lons]""" from scipy.spatial import KDTree assert global_lats.ndim == 1 diff --git a/tests/test_data.py b/tests/test_data.py index f25d619e..87876691 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -43,7 +43,8 @@ def wrapper(*args, **kwargs): @cache def _(date, var, k=0, e=0, values=VALUES): """Create a simple array of values based on the date and variable name, ensemble, - grid and a few other parameters.""" + grid and a few other parameters. + """ d = date.year * 10000 + date.month * 100 + date.day v = ord(var) - ord("a") + 1 assert 0 <= k <= 9 From 7d3fb41fde14d675de98e232f4221f414bd69cdd Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 30 Apr 2024 07:40:41 +0000 Subject: [PATCH 06/14] rename "actions" to "sources" --- .../create/functions/{actions => sources}/__init__.py | 0 .../create/functions/{actions => sources}/accumulations.py | 0 .../create/functions/{actions => sources}/constants.py | 0 .../datasets/create/functions/{actions => sources}/empty.py | 0 .../create/functions/{actions => sources}/forcings.py | 0 .../datasets/create/functions/{actions => sources}/grib.py | 0 .../datasets/create/functions/{actions => sources}/mars.py | 0 .../datasets/create/functions/{actions => sources}/netcdf.py | 0 .../datasets/create/functions/{actions => sources}/opendap.py | 0 .../create/functions/{actions => sources}/perturbations.py | 0 .../datasets/create/functions/{actions => sources}/source.py | 0 .../create/functions/{actions => sources}/tendencies.py | 0 src/anemoi/datasets/create/input.py | 4 ++-- 13 files changed, 2 insertions(+), 2 deletions(-) rename src/anemoi/datasets/create/functions/{actions => sources}/__init__.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/accumulations.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/constants.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/empty.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/forcings.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/grib.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/mars.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/netcdf.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/opendap.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/perturbations.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/source.py (100%) rename src/anemoi/datasets/create/functions/{actions => sources}/tendencies.py (100%) diff --git a/src/anemoi/datasets/create/functions/actions/__init__.py b/src/anemoi/datasets/create/functions/sources/__init__.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/__init__.py rename to src/anemoi/datasets/create/functions/sources/__init__.py diff --git a/src/anemoi/datasets/create/functions/actions/accumulations.py b/src/anemoi/datasets/create/functions/sources/accumulations.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/accumulations.py rename to src/anemoi/datasets/create/functions/sources/accumulations.py diff --git a/src/anemoi/datasets/create/functions/actions/constants.py b/src/anemoi/datasets/create/functions/sources/constants.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/constants.py rename to src/anemoi/datasets/create/functions/sources/constants.py diff --git a/src/anemoi/datasets/create/functions/actions/empty.py b/src/anemoi/datasets/create/functions/sources/empty.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/empty.py rename to src/anemoi/datasets/create/functions/sources/empty.py diff --git a/src/anemoi/datasets/create/functions/actions/forcings.py b/src/anemoi/datasets/create/functions/sources/forcings.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/forcings.py rename to src/anemoi/datasets/create/functions/sources/forcings.py diff --git a/src/anemoi/datasets/create/functions/actions/grib.py b/src/anemoi/datasets/create/functions/sources/grib.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/grib.py rename to src/anemoi/datasets/create/functions/sources/grib.py diff --git a/src/anemoi/datasets/create/functions/actions/mars.py b/src/anemoi/datasets/create/functions/sources/mars.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/mars.py rename to src/anemoi/datasets/create/functions/sources/mars.py diff --git a/src/anemoi/datasets/create/functions/actions/netcdf.py b/src/anemoi/datasets/create/functions/sources/netcdf.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/netcdf.py rename to src/anemoi/datasets/create/functions/sources/netcdf.py diff --git a/src/anemoi/datasets/create/functions/actions/opendap.py b/src/anemoi/datasets/create/functions/sources/opendap.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/opendap.py rename to src/anemoi/datasets/create/functions/sources/opendap.py diff --git a/src/anemoi/datasets/create/functions/actions/perturbations.py b/src/anemoi/datasets/create/functions/sources/perturbations.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/perturbations.py rename to src/anemoi/datasets/create/functions/sources/perturbations.py diff --git a/src/anemoi/datasets/create/functions/actions/source.py b/src/anemoi/datasets/create/functions/sources/source.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/source.py rename to src/anemoi/datasets/create/functions/sources/source.py diff --git a/src/anemoi/datasets/create/functions/actions/tendencies.py b/src/anemoi/datasets/create/functions/sources/tendencies.py similarity index 100% rename from src/anemoi/datasets/create/functions/actions/tendencies.py rename to src/anemoi/datasets/create/functions/sources/tendencies.py diff --git a/src/anemoi/datasets/create/input.py b/src/anemoi/datasets/create/input.py index d06e0105..e0611902 100644 --- a/src/anemoi/datasets/create/input.py +++ b/src/anemoi/datasets/create/input.py @@ -561,7 +561,7 @@ def select(self, dates): @property def function(self): # name, delta = parse_function_name(self.name) - return import_function(self.name, "actions") + return import_function(self.name, "sources") def __repr__(self): content = "" @@ -825,7 +825,7 @@ def action_factory(config, context, action_path): }.get(key) if cls is None: - if not is_function(key, "actions"): + if not is_function(key, "sources"): raise ValueError(f"Unknown action '{key}' in {config}") cls = FunctionAction args = [key] + args From cb63d762dad1aa0e92d9fd22d1692a07f3c12d00 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 30 Apr 2024 12:00:15 +0000 Subject: [PATCH 07/14] dataset generation, added build:use_grib_paramid --- src/anemoi/datasets/create/config.py | 1 + .../datasets/create/functions/sources/mars.py | 18 ++++++++++++++++++ src/anemoi/datasets/create/input.py | 4 +++- src/anemoi/datasets/create/loaders.py | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/anemoi/datasets/create/config.py b/src/anemoi/datasets/create/config.py index 7ff760fa..6acc17ac 100644 --- a/src/anemoi/datasets/create/config.py +++ b/src/anemoi/datasets/create/config.py @@ -153,6 +153,7 @@ def __init__(self, config, *args, **kwargs): self.setdefault("build", Config()) self.build.setdefault("group_by", "monthly") + self.build.setdefault("use_grib_paramid", False) self.setdefault("output", Config()) self.output.setdefault("order_by", ["valid_datetime", "param_level", "number"]) diff --git a/src/anemoi/datasets/create/functions/sources/mars.py b/src/anemoi/datasets/create/functions/sources/mars.py index d133e9df..e2a9fe24 100644 --- a/src/anemoi/datasets/create/functions/sources/mars.py +++ b/src/anemoi/datasets/create/functions/sources/mars.py @@ -82,6 +82,20 @@ def factorise_requests(dates, *requests): yield r +def use_grib_paramid(r): + from anemoi.utils.grib import shortname_to_paramid + + params = r["param"] + if isinstance(params, str): + params = params.split("/") + assert isinstance(params, (list, tuple)), params + + params = [shortname_to_paramid(p) for p in params] + r["param"] = "/".join(str(p) for p in params) + + return r + + def mars(context, dates, *requests, **kwargs): if not requests: requests = [kwargs] @@ -90,6 +104,10 @@ def mars(context, dates, *requests, **kwargs): ds = load_source("empty") for r in requests: r = {k: v for k, v in r.items() if v != ("-",)} + + if context.use_grib_paramid and "param" in r: + r = use_grib_paramid(r) + if DEBUG: context.trace("✅", f"load_source(mars, {r}") diff --git a/src/anemoi/datasets/create/input.py b/src/anemoi/datasets/create/input.py index e0611902..ab08798d 100644 --- a/src/anemoi/datasets/create/input.py +++ b/src/anemoi/datasets/create/input.py @@ -874,17 +874,19 @@ class FunctionContext: def __init__(self, owner): self.owner = owner + self.use_grib_paramid = owner.context.use_grib_paramid def trace(self, emoji, *args): trace(emoji, *args) class ActionContext(Context): - def __init__(self, /, order_by, flatten_grid, remapping): + def __init__(self, /, order_by, flatten_grid, remapping, use_grib_paramid): super().__init__() self.order_by = order_by self.flatten_grid = flatten_grid self.remapping = build_remapping(remapping) + self.use_grib_paramid = use_grib_paramid class InputBuilder: diff --git a/src/anemoi/datasets/create/loaders.py b/src/anemoi/datasets/create/loaders.py index 499c29de..17beebd0 100644 --- a/src/anemoi/datasets/create/loaders.py +++ b/src/anemoi/datasets/create/loaders.py @@ -125,6 +125,7 @@ def build_input(self): order_by=self.output.order_by, flatten_grid=self.output.flatten_grid, remapping=build_remapping(self.output.remapping), + use_grib_paramid=self.main_config.build.use_grib_paramid, ) LOG.info("✅ INPUT_BUILDER") LOG.info(builder) From 75cfe84e5c0a5168a3c9ce95c15f6f20be77ce80 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Tue, 30 Apr 2024 12:30:02 +0000 Subject: [PATCH 08/14] fix example --- tools/examples/an-oper-2023-2023-2p5-6h-v1.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/examples/an-oper-2023-2023-2p5-6h-v1.yaml b/tools/examples/an-oper-2023-2023-2p5-6h-v1.yaml index 33a60441..fd4a44a6 100644 --- a/tools/examples/an-oper-2023-2023-2p5-6h-v1.yaml +++ b/tools/examples/an-oper-2023-2023-2p5-6h-v1.yaml @@ -1,9 +1,7 @@ description: "Example for the anemoi documentation" -dataset_status: stable -purpose: aifs name: an-oper-2023-2023-2p5-6h-v1 licence: CC-BY-4.0 -copyright: ECMWF +attribution: ECMWF dates: start: 2023-01-01 00:00:00 From b5cdff4cb2257c81e63f77d990a364c75d9355de Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Tue, 30 Apr 2024 18:20:45 +0000 Subject: [PATCH 09/14] some debug stuff --- .gitignore | 1 + pyproject.toml | 3 ++ src/anemoi/datasets/data/dataset.py | 4 +++ src/anemoi/datasets/data/debug.css | 12 ++++++++ src/anemoi/datasets/data/debug.py | 47 +++++++++++++++++++---------- src/anemoi/datasets/data/grids.py | 4 +-- src/anemoi/datasets/data/misc.py | 2 +- src/anemoi/datasets/data/stores.py | 4 +++ 8 files changed, 58 insertions(+), 19 deletions(-) create mode 100644 src/anemoi/datasets/data/debug.css diff --git a/.gitignore b/.gitignore index 6901caac..64958f06 100644 --- a/.gitignore +++ b/.gitignore @@ -186,3 +186,4 @@ _build/ ?.* ~* *.sync +*.dot diff --git a/pyproject.toml b/pyproject.toml index cf9906dd..dd6158b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,3 +96,6 @@ anemoi-datasets = "anemoi.datasets.__main__:main" [tool.setuptools_scm] version_file = "src/anemoi/datasets/_version.py" + +[tool.setuptools.package-data] +"anemoi.datasets.data" = ["*.css"] diff --git a/src/anemoi/datasets/data/dataset.py b/src/anemoi/datasets/data/dataset.py index ae9f3e3f..879af680 100644 --- a/src/anemoi/datasets/data/dataset.py +++ b/src/anemoi/datasets/data/dataset.py @@ -222,3 +222,7 @@ def _check(ds): def _repr_html_(self): return self.tree().html() + + @property + def label(self): + return self.__class__.__name__.lower() diff --git a/src/anemoi/datasets/data/debug.css b/src/anemoi/datasets/data/debug.css new file mode 100644 index 00000000..edde87d0 --- /dev/null +++ b/src/anemoi/datasets/data/debug.css @@ -0,0 +1,12 @@ +table.dataset td { + vertical-align: top; + text-align: left !important; +} + +table.dataset span.dataset { + font-weight: bold !important; +} + +table.dataset span.values { + font-style: italic !important; +} diff --git a/src/anemoi/datasets/data/debug.py b/src/anemoi/datasets/data/debug.py index c03c08a7..98ea3d9d 100644 --- a/src/anemoi/datasets/data/debug.py +++ b/src/anemoi/datasets/data/debug.py @@ -21,6 +21,12 @@ # a.flags.writeable = False +def css(name): + path = os.path.join(os.path.dirname(__file__), f"{name}.css") + with open(path) as f: + return f"" + + class Node: def __init__(self, dataset, kids, **kwargs): self.dataset = dataset @@ -46,7 +52,7 @@ def __repr__(self): return "\n".join(result) def graph(self, digraph, nodes): - label = self.dataset.__class__.__name__.lower() + label = self.dataset.label # dataset.__class__.__name__.lower() if self.kwargs: param = [] for k, v in self.kwargs.items(): @@ -107,7 +113,7 @@ def _html(self, indent, rows): if k == "path": v = v[::-1] kwargs[k] = v - label = self.dataset.__class__.__name__.lower() + label = self.dataset.label label = f'{label}' if len(kwargs) == 1: k, v = list(kwargs.items())[0] @@ -116,25 +122,14 @@ def _html(self, indent, rows): rows.append([indent] + [label]) for k, v in kwargs.items(): - rows.append([indent] + [k, v]) + rows.append([indent] + [f"{k}", f"{v}"]) for kid in self.kids: kid._html(indent + "   ", rows) def html(self): - result = [ - """ - - """ - ] + result = [css("debug")] + result.append('') rows = [] @@ -147,6 +142,26 @@ def html(self): result.append("
") return "\n".join(result) + def _as_tree(self, tree): + + for kid in self.kids: + n = tree.node(kid) + kid._as_tree(n) + + def as_tree(self): + from anemoi.utils.text import Tree + + tree = Tree(self) + self._as_tree(tree) + return tree + + @property + def summary(self): + return self.dataset.label + + def as_dict(self): + return {} + class Source: """Class used to follow the provenance of a data point.""" diff --git a/src/anemoi/datasets/data/grids.py b/src/anemoi/datasets/data/grids.py index 6d1bc006..23519087 100644 --- a/src/anemoi/datasets/data/grids.py +++ b/src/anemoi/datasets/data/grids.py @@ -127,7 +127,7 @@ def tree(self): return Node(self, [d.tree() for d in self.datasets], mode="concat") -class CutoutGrids(Grids): +class Cutout(Grids): def __init__(self, datasets, axis): from anemoi.datasets.grids import cutout_mask @@ -236,4 +236,4 @@ def cutout_factory(args, kwargs): datasets = [_open(e) for e in cutout] datasets, kwargs = _auto_adjust(datasets, kwargs) - return CutoutGrids(datasets, axis=axis)._subset(**kwargs) + return Cutout(datasets, axis=axis)._subset(**kwargs) diff --git a/src/anemoi/datasets/data/misc.py b/src/anemoi/datasets/data/misc.py index f590d008..dbc1946c 100644 --- a/src/anemoi/datasets/data/misc.py +++ b/src/anemoi/datasets/data/misc.py @@ -237,7 +237,7 @@ def _open(a): if isinstance(a, (list, tuple)): return _open_dataset(*a) - raise NotImplementedError("Unsupported argument: " + type(a)) + raise NotImplementedError(f"Unsupported argument: {type(a)}") def _auto_adjust(datasets, kwargs): diff --git a/src/anemoi/datasets/data/stores.py b/src/anemoi/datasets/data/stores.py index 698838e7..bdce9cd6 100644 --- a/src/anemoi/datasets/data/stores.py +++ b/src/anemoi/datasets/data/stores.py @@ -328,6 +328,10 @@ def _report_missing(self, n): def tree(self): return Node(self, [], path=self.path, missing=sorted(self.missing)) + @property + def label(self): + return "zarr*" + def zarr_lookup(name): From e9622c4f7b4f1f7c1946a8c4a1224b5216a0aafc Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 2 May 2024 09:40:54 +0000 Subject: [PATCH 10/14] update dependency on climetlab --- pyproject.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index dd6158b5..0e1871a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dependencies = [ remote = ["boto3", "requests", "s3fs"] create = [ - "climetlab", # "earthkit-data" + "climetlab>=0.22.1", # "earthkit-data" "earthkit-meteo", "pyproj", "ecmwflibs>=0.6.3", @@ -64,7 +64,7 @@ all = [ "boto3", "requests", "s3fs", - "climetlab", # "earthkit-data" + "climetlab>=0.22.1", # "earthkit-data" "earthkit-meteo", "pyproj", "ecmwflibs>=0.6.3", @@ -74,7 +74,7 @@ dev = [ "boto3", "requests", "s3fs", - "climetlab", # "earthkit-data" + "climetlab>=0.22.1", # "earthkit-data" "earthkit-meteo", "pyproj", "ecmwflibs>=0.6.3", From 4426fd78cfb3076cc5706543ff0865b1cfccac59 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 2 May 2024 09:44:06 +0000 Subject: [PATCH 11/14] implemented statistics with tendencies --- src/anemoi/datasets/create/__init__.py | 79 +++- src/anemoi/datasets/create/check.py | 15 - src/anemoi/datasets/create/chunks.py | 78 ++++ .../create/functions/sources/perturbations.py | 1 + src/anemoi/datasets/create/loaders.py | 437 ++++++++++++------ src/anemoi/datasets/create/persistent.py | 152 ++++++ src/anemoi/datasets/create/size.py | 33 ++ .../{statistics.py => statistics/__init__.py} | 252 ++++++---- .../datasets/create/statistics/summary.py | 108 +++++ src/anemoi/datasets/create/utils.py | 15 - src/anemoi/datasets/create/writer.py | 40 -- tests/test_chunks.py | 89 ++++ tests/test_dates.py | 2 +- 13 files changed, 1000 insertions(+), 301 deletions(-) create mode 100644 src/anemoi/datasets/create/chunks.py create mode 100644 src/anemoi/datasets/create/persistent.py create mode 100644 src/anemoi/datasets/create/size.py rename src/anemoi/datasets/create/{statistics.py => statistics/__init__.py} (61%) create mode 100644 src/anemoi/datasets/create/statistics/summary.py create mode 100644 tests/test_chunks.py diff --git a/src/anemoi/datasets/create/__init__.py b/src/anemoi/datasets/create/__init__.py index 3d85d2a0..b829f641 100644 --- a/src/anemoi/datasets/create/__init__.py +++ b/src/anemoi/datasets/create/__init__.py @@ -32,13 +32,13 @@ def init(self, check_name=False): # check path _, ext = os.path.splitext(self.path) assert ext != "zarr", f"Unsupported extension={ext}" - from .loaders import InitialiseLoader + from .loaders import InitialiserLoader if self._path_readable() and not self.overwrite: raise Exception(f"{self.path} already exists. Use overwrite=True to overwrite.") with self._cache_context(): - obj = InitialiseLoader.from_config( + obj = InitialiserLoader.from_config( path=self.path, config=self.config, statistics_tmp=self.statistics_tmp, @@ -59,12 +59,11 @@ def load(self, parts=None): loader.load() def statistics(self, force=False, output=None, start=None, end=None): - from .loaders import StatisticsLoader + from .loaders import StatisticsAdder - loader = StatisticsLoader.from_dataset( + loader = StatisticsAdder.from_dataset( path=self.path, print=self.print, - force=force, statistics_tmp=self.statistics_tmp, statistics_output=output, recompute=False, @@ -74,26 +73,72 @@ def statistics(self, force=False, output=None, start=None, end=None): loader.run() def size(self): - from .loaders import SizeLoader + from .loaders import DatasetHandler + from .size import compute_directory_sizes - loader = SizeLoader.from_dataset(path=self.path, print=self.print) - loader.add_total_size() + metadata = compute_directory_sizes(self.path) + handle = DatasetHandler.from_dataset(path=self.path, print=self.print) + handle.update_metadata(**metadata) def cleanup(self): - from .loaders import CleanupLoader + from .loaders import DatasetHandlerWithStatistics - loader = CleanupLoader.from_dataset( - path=self.path, - print=self.print, - statistics_tmp=self.statistics_tmp, + cleaner = DatasetHandlerWithStatistics.from_dataset( + path=self.path, print=self.print, statistics_tmp=self.statistics_tmp ) - loader.run() + cleaner.tmp_statistics.delete() + cleaner.registry.clean() def patch(self, **kwargs): from .patch import apply_patch apply_patch(self.path, **kwargs) + def init_additions(self, delta=[1, 3, 6, 12]): + from .loaders import StatisticsAddition + from .loaders import TendenciesStatisticsAddition + from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency + + a = StatisticsAddition.from_dataset(path=self.path, print=self.print) + a.initialise() + + for d in delta: + try: + a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d) + a.initialise() + except TendenciesStatisticsDeltaNotMultipleOfFrequency: + self.print(f"Skipping delta={d} as it is not a multiple of the frequency.") + + def run_additions(self, parts=None, delta=[1, 3, 6, 12]): + from .loaders import StatisticsAddition + from .loaders import TendenciesStatisticsAddition + from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency + + a = StatisticsAddition.from_dataset(path=self.path, print=self.print) + a.run(parts) + + for d in delta: + try: + a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d) + a.run(parts) + except TendenciesStatisticsDeltaNotMultipleOfFrequency: + self.print(f"Skipping delta={d} as it is not a multiple of the frequency.") + + def finalise_additions(self, delta=[1, 3, 6, 12]): + from .loaders import StatisticsAddition + from .loaders import TendenciesStatisticsAddition + from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency + + a = StatisticsAddition.from_dataset(path=self.path, print=self.print) + a.finalise() + + for d in delta: + try: + a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d) + a.finalise() + except TendenciesStatisticsDeltaNotMultipleOfFrequency: + self.print(f"Skipping delta={d} as it is not a multiple of the frequency.") + def finalise(self, **kwargs): self.statistics(**kwargs) self.size() @@ -102,8 +147,14 @@ def create(self): self.init() self.load() self.finalise() + self.additions() self.cleanup() + def additions(self): + self.init_additions() + self.run_additions() + self.finalise_additions() + def _cache_context(self): from .utils import cache_context diff --git a/src/anemoi/datasets/create/check.py b/src/anemoi/datasets/create/check.py index 20398356..e24c7e02 100644 --- a/src/anemoi/datasets/create/check.py +++ b/src/anemoi/datasets/create/check.py @@ -8,29 +8,14 @@ # import logging -import os import re import warnings import numpy as np -import tqdm LOG = logging.getLogger(__name__) -def compute_directory_size(path): - if not os.path.isdir(path): - return None - size = 0 - n = 0 - for dirpath, _, filenames in tqdm.tqdm(os.walk(path), desc="Computing size", leave=False): - for filename in filenames: - file_path = os.path.join(dirpath, filename) - size += os.path.getsize(file_path) - n += 1 - return size, n - - class DatasetName: def __init__( self, diff --git a/src/anemoi/datasets/create/chunks.py b/src/anemoi/datasets/create/chunks.py new file mode 100644 index 00000000..4dc988f6 --- /dev/null +++ b/src/anemoi/datasets/create/chunks.py @@ -0,0 +1,78 @@ +# (C) Copyright 2024 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# +import logging +import warnings + +LOG = logging.getLogger(__name__) + +ALL = object() + + +class ChunkFilter: + def __init__(self, *, parts, total): + self.total = total + + if isinstance(parts, list): + if len(parts) == 1: + parts = parts[0] + elif len(parts) == 0: + parts = None + else: + raise ValueError(f"Invalid parts format: {parts}. Must be in the form 'i/n'.") + + if not parts: + parts = "all" + + assert isinstance(parts, str), f"Argument parts must be a string, got {parts}." + + if parts.lower() == "all" or parts == "*": + self.allowed = ALL + return + + assert "/" in parts, f"Invalid parts format: {parts}. Must be in the form 'i/n'." + + i, n = parts.split("/") + i, n = int(i), int(n) + + assert i > 0, f"Chunk number {i} must be positive." + assert i <= n, f"Chunk number {i} must be less than total chunks {n}." + if n > total: + warnings.warn( + f"Number of chunks {n} is larger than the total number of chunks: {total}. " + "Some chunks will be empty." + ) + + chunk_size = total / n + parts = [x for x in range(total) if x >= (i - 1) * chunk_size and x < i * chunk_size] + + for i in parts: + if i < 0 or i >= total: + raise AssertionError(f"Invalid chunk number {i}. Must be between 0 and {total - 1}.") + if not parts: + warnings.warn(f"Nothing to do for chunk {i}/{n}.") + + LOG.info(f"Running parts: {parts}") + + self.allowed = parts + + def __call__(self, i): + if i < 0 or i >= self.total: + raise AssertionError(f"Invalid chunk number {i}. Must be between 0 and {self.total - 1}.") + + if self.allowed == ALL: + return True + return i in self.allowed + + def __iter__(self): + for i in range(self.total): + if self(i): + yield i + + def __len__(self): + return len([_ for _ in self]) diff --git a/src/anemoi/datasets/create/functions/sources/perturbations.py b/src/anemoi/datasets/create/functions/sources/perturbations.py index a678afed..3300b05f 100644 --- a/src/anemoi/datasets/create/functions/sources/perturbations.py +++ b/src/anemoi/datasets/create/functions/sources/perturbations.py @@ -58,6 +58,7 @@ def load_if_needed(context, dates, dict_or_dataset): def perturbations(context, dates, members, center, remapping={}, patches={}): members = load_if_needed(context, dates, members) center = load_if_needed(context, dates, center) + # return perturbations(member, centers....) keys = ["param", "level", "valid_datetime", "date", "time", "step", "number"] diff --git a/src/anemoi/datasets/create/loaders.py b/src/anemoi/datasets/create/loaders.py index 17beebd0..063405f6 100644 --- a/src/anemoi/datasets/create/loaders.py +++ b/src/anemoi/datasets/create/loaders.py @@ -9,29 +9,33 @@ import os import time import uuid +import warnings from functools import cached_property import numpy as np import zarr +from anemoi.datasets import MissingDateError from anemoi.datasets import open_dataset +from anemoi.datasets.create.persistent import build_storage from anemoi.datasets.data.misc import as_first_date from anemoi.datasets.data.misc import as_last_date from anemoi.datasets.dates.groups import Groups from .check import DatasetName from .check import check_data_values +from .chunks import ChunkFilter from .config import build_output from .config import loader_config from .input import build_input -from .statistics import TempStatistics +from .statistics import Summary +from .statistics import TmpStatistics +from .statistics import check_variance from .statistics import compute_statistics -from .utils import bytes -from .utils import compute_directory_sizes +from .statistics import default_statistics_dates from .utils import normalize_and_check_dates from .utils import progress_bar from .utils import seconds -from .writer import CubesFilter from .writer import ViewCacheArray from .zarr import ZarrBuiltRegistry from .zarr import add_zarr_dataset @@ -41,48 +45,7 @@ VERSION = "0.20" -def default_statistics_dates(dates): - """Calculate default statistics dates based on the given list of dates. - - Args: - dates (list): List of datetime objects representing dates. - - Returns: - tuple: A tuple containing the default start and end dates. - """ - - def to_datetime(d): - if isinstance(d, np.datetime64): - return d.tolist() - assert isinstance(d, datetime.datetime), d - return d - - first = dates[0] - last = dates[-1] - - first = to_datetime(first) - last = to_datetime(last) - - n_years = round((last - first).total_seconds() / (365.25 * 24 * 60 * 60)) - - if n_years < 10: - # leave out 20% of the data - k = int(len(dates) * 0.8) - end = dates[k - 1] - LOG.info(f"Number of years {n_years} < 10, leaving out 20%. {end=}") - return dates[0], end - - delta = 1 - if n_years >= 20: - delta = 3 - LOG.info(f"Number of years {n_years}, leaving out {delta} years.") - end_year = last.year - delta - - end = max(d for d in dates if to_datetime(d).year == end_year) - return dates[0], end - - -class Loader: +class GenericDatasetHandler: def __init__(self, *, path, print=print, **kwargs): # Catch all floating point errors, including overflow, sqrt(<0), etc np.seterr(all="raise", under="warn") @@ -93,10 +56,6 @@ def __init__(self, *, path, print=print, **kwargs): self.kwargs = kwargs self.print = print - statistics_tmp = kwargs.get("statistics_tmp") or self.path + ".statistics" - - self.statistics_registry = TempStatistics(statistics_tmp) - @classmethod def from_config(cls, *, config, path, print=print, **kwargs): # config is the path to the config file or a dict with the config @@ -116,38 +75,6 @@ def from_dataset(cls, *, path, **kwargs): assert os.path.exists(path), f"Path {path} does not exist." return cls(path=path, **kwargs) - def build_input(self): - from climetlab.core.order import build_remapping - - builder = build_input( - self.main_config.input, - data_sources=self.main_config.get("data_sources", {}), - order_by=self.output.order_by, - flatten_grid=self.output.flatten_grid, - remapping=build_remapping(self.output.remapping), - use_grib_paramid=self.main_config.build.use_grib_paramid, - ) - LOG.info("✅ INPUT_BUILDER") - LOG.info(builder) - return builder - - def build_statistics_dates(self, start, end): - ds = open_dataset(self.path) - dates = ds.dates - - default_start, default_end = default_statistics_dates(dates) - if start is None: - start = default_start - if end is None: - end = default_end - - start = as_first_date(start, dates) - end = as_last_date(end, dates) - - start = start.astype(datetime.datetime) - end = end.astype(datetime.datetime) - return (start.isoformat(), end.isoformat()) - def read_dataset_metadata(self): ds = open_dataset(self.path) self.dataset_shape = ds.shape @@ -155,21 +82,17 @@ def read_dataset_metadata(self): assert len(self.variables_names) == ds.shape[1], self.dataset_shape self.dates = ds.dates - z = zarr.open(self.path, "r") - self.missing_dates = z.attrs.get("missing_dates", []) - self.missing_dates = [np.datetime64(d) for d in self.missing_dates] + self.missing_dates = sorted(list([self.dates[i] for i in ds.missing])) - def allow_nan(self, name): - return name in self.main_config.statistics.get("allow_nans", []) + z = zarr.open(self.path, "r") + missing_dates = z.attrs.get("missing_dates", []) + missing_dates = sorted([np.datetime64(d) for d in missing_dates]) + assert missing_dates == self.missing_dates, (missing_dates, self.missing_dates) @cached_property def registry(self): return ZarrBuiltRegistry(self.path) - def initialise_dataset_backend(self): - z = zarr.open(self.path, mode="w") - z.create_group("_build") - def update_metadata(self, **kwargs): LOG.info(f"Updating metadata {kwargs}") z = zarr.open(self.path, mode="w+") @@ -196,12 +119,43 @@ def print_info(self): LOG.info(e) -class InitialiseLoader(Loader): +class DatasetHandler(GenericDatasetHandler): + pass + + +class DatasetHandlerWithStatistics(GenericDatasetHandler): + def __init__(self, statistics_tmp=None, **kwargs): + super().__init__(**kwargs) + statistics_tmp = kwargs.get("statistics_tmp") or os.path.join(self.path + ".tmp_data", "statistics") + self.tmp_statistics = TmpStatistics(statistics_tmp) + + +class Loader(DatasetHandlerWithStatistics): + def build_input(self): + from climetlab.core.order import build_remapping + + builder = build_input( + self.main_config.input, + data_sources=self.main_config.get("data_sources", {}), + order_by=self.output.order_by, + flatten_grid=self.output.flatten_grid, + remapping=build_remapping(self.output.remapping), + use_grib_paramid=self.main_config.build.use_grib_paramid, + ) + LOG.info("✅ INPUT_BUILDER") + LOG.info(builder) + return builder + + def allow_nan(self, name): + return name in self.main_config.statistics.get("allow_nans", []) + + +class InitialiserLoader(Loader): def __init__(self, config, **kwargs): super().__init__(**kwargs) self.main_config = loader_config(config) - self.statistics_registry.delete() + self.tmp_statistics.delete() LOG.info(self.main_config.dates) self.groups = Groups(**self.main_config.dates) @@ -217,6 +171,27 @@ def __init__(self, config, **kwargs): LOG.info("MINIMAL INPUT :") LOG.info(self.minimal_input) + def build_statistics_dates(self, start, end): + ds = open_dataset(self.path) + dates = ds.dates + + default_start, default_end = default_statistics_dates(dates) + if start is None: + start = default_start + if end is None: + end = default_end + + start = as_first_date(start, dates) + end = as_last_date(end, dates) + + start = start.astype(datetime.datetime) + end = end.astype(datetime.datetime) + return (start.isoformat(), end.isoformat()) + + def initialise_dataset_backend(self): + z = zarr.open(self.path, mode="w") + z.create_group("_build") + def initialise(self, check_name=True): """Create empty dataset.""" @@ -330,8 +305,8 @@ def initialise(self, check_name=True): self._add_dataset(name="longitudes", array=grid_points[1]) self.registry.create(lengths=lengths) - self.statistics_registry.create(exist_ok=False) - self.registry.add_to_history("statistics_registry_initialised", version=self.statistics_registry.version) + self.tmp_statistics.create(exist_ok=False) + self.registry.add_to_history("tmp_statistics_initialised", version=self.tmp_statistics.version) statistics_start, statistics_end = self.build_statistics_dates( self.main_config.statistics.get("start"), @@ -360,7 +335,7 @@ def __init__(self, config, parts, **kwargs): self.parts = parts total = len(self.registry.get_flags()) - self.cube_filter = CubesFilter(parts=self.parts, total=total) + self.chunk_filter = ChunkFilter(parts=self.parts, total=total) self.data_array = zarr.open(self.path, mode="r+")["data"] self.n_groups = len(self.groups) @@ -369,12 +344,12 @@ def load(self): self.registry.add_to_history("loading_data_start", parts=self.parts) for igroup, group in enumerate(self.groups): - if not self.cube_filter(igroup): + if not self.chunk_filter(igroup): continue if self.registry.get_flag(igroup): LOG.info(f" -> Skipping {igroup} total={len(self.groups)} (already done)") continue - self.print(f" -> Processing {igroup} total={len(self.groups)}") + # self.print(f" -> Processing {igroup} total={len(self.groups)}") # print("========", group) assert isinstance(group[0], datetime.datetime), group @@ -392,7 +367,7 @@ def load(self): self.registry.add_to_history("loading_data_end", parts=self.parts) self.registry.add_provenance(name="provenance_load") - self.statistics_registry.add_provenance(name="provenance_load", config=self.main_config) + self.tmp_statistics.add_provenance(name="provenance_load", config=self.main_config) self.print_info() @@ -430,7 +405,7 @@ def dates_to_indexes(dates, all_dates): self.load_cube(cube, array) stats = compute_statistics(array.cache, self.variables_names, allow_nan=self.allow_nan) - self.statistics_registry.write(indexes, stats, dates=dates_in_data) + self.tmp_statistics.write(indexes, stats, dates=dates_in_data) array.flush() @@ -476,16 +451,12 @@ def load_cube(self, cube, array): LOG.info(msg) -class StatisticsLoader(Loader): - main_config = {} - +class StatisticsAdder(DatasetHandlerWithStatistics): def __init__( self, - config=None, statistics_output=None, statistics_start=None, statistics_end=None, - force=False, **kwargs, ): super().__init__(**kwargs) @@ -499,11 +470,16 @@ def __init__( "-": self.write_stats_to_stdout, }.get(self.statistics_output, self.write_stats_to_file) - if config: - self.main_config = loader_config(config) - self.read_dataset_metadata() + def allow_nan(self, name): + z = zarr.open(self.path, mode="r") + if "variables_with_nans" in z.attrs: + return name in z.attrs["variables_with_nans"] + + warnings.warn(f"Cannot find 'variables_with_nans' in {self.path}. Assuming nans allowed for {name}.") + return True + def _get_statistics_dates(self): dates = self.dates dtype = type(dates[0]) @@ -513,10 +489,7 @@ def assert_dtype(d): # remove missing dates if self.missing_dates: - assert type(self.missing_dates[0]) is dtype, ( - type(self.missing_dates[0]), - dtype, - ) + assert_dtype(self.missing_dates[0]) dates = [d for d in dates if d not in self.missing_dates] # filter dates according the the start and end dates in the metadata @@ -543,11 +516,11 @@ def assert_dtype(d): def run(self): dates = self._get_statistics_dates() - stats = self.statistics_registry.get_aggregated(dates, self.variables_names, self.allow_nan) + stats = self.tmp_statistics.get_aggregated(dates, self.variables_names, self.allow_nan) self.output_writer(stats) def write_stats_to_file(self, stats): - stats.save(self.statistics_output, provenance=dict(config=self.main_config)) + stats.save(self.statistics_output) LOG.info(f"✅ Statistics written in {self.statistics_output}") def write_stats_to_dataset(self, stats): @@ -572,24 +545,230 @@ def write_stats_to_stdout(self, stats): LOG.info(stats) -class SizeLoader(Loader): - def __init__(self, path, print): - self.path = path - self.print = print +class GenericAdditions(GenericDatasetHandler): + def __init__(self, name="", **kwargs): + super().__init__(**kwargs) + self.name = name - def add_total_size(self): - dic = compute_directory_sizes(self.path) + storage_path = os.path.join(self.path + ".tmp_data", name) + self.tmp_storage = build_storage(directory=storage_path, create=True) - size = dic["total_size"] - n = dic["total_number_of_files"] + def initialise(self): + self.tmp_storage.delete() + self.tmp_storage.create() + LOG.info(f"Dataset {self.path} additions initialized.") - LOG.info(f"Total size: {bytes(size)}") - LOG.info(f"Total number of files: {n}") + @cached_property + def _variables_with_nans(self): + z = zarr.open(self.path, mode="r") + if "variables_with_nans" in z.attrs: + return z.attrs["variables_with_nans"] + return None - self.update_metadata(total_size=size, total_number_of_files=n) + def allow_nan(self, name): + if self._variables_with_nans is not None: + return name in self._variables_with_nans + warnings.warn(f"❗Cannot find 'variables_with_nans' in {self.path}, Assuming nans allowed for {name}.") + return True + @classmethod + def _check_type_equal(cls, a, b): + a = list(a) + b = list(b) + a = a[0] if a else None + b = b[0] if b else None + assert type(a) is type(b), (type(a), type(b)) + + def finalise(self): + shape = (len(self.dates), len(self.variables)) + agg = dict( + minimum=np.full(shape, np.nan, dtype=np.float64), + maximum=np.full(shape, np.nan, dtype=np.float64), + sums=np.full(shape, np.nan, dtype=np.float64), + squares=np.full(shape, np.nan, dtype=np.float64), + count=np.full(shape, -1, dtype=np.int64), + has_nans=np.full(shape, False, dtype=np.bool_), + ) + LOG.info(f"Aggregating {self.name} statistics on shape={shape}. Variables : {self.variables}") + + found = set() + ifound = set() + missing = set() + for _date, (date, i, stats) in self.tmp_storage.items(): + assert _date == date + if stats == "missing": + missing.add(date) + continue -class CleanupLoader(Loader): - def run(self): - self.statistics_registry.delete() - self.registry.clean() + assert date not in found, f"Duplicates found {date}" + found.add(date) + ifound.add(i) + + for k in ["minimum", "maximum", "sums", "squares", "count", "has_nans"]: + agg[k][i, ...] = stats[k] + + assert len(found) + len(missing) == len(self.dates), (len(found), len(missing), len(self.dates)) + assert found.union(missing) == set(self.dates), (found, missing, set(self.dates)) + + mask = sorted(list(ifound)) + for k in ["minimum", "maximum", "sums", "squares", "count", "has_nans"]: + agg[k] = agg[k][mask, ...] + + for k in ["minimum", "maximum", "sums", "squares", "count", "has_nans"]: + assert agg[k].shape == agg["count"].shape, (agg[k].shape, agg["count"].shape) + + minimum = np.nanmin(agg["minimum"], axis=0) + maximum = np.nanmax(agg["maximum"], axis=0) + sums = np.nansum(agg["sums"], axis=0) + squares = np.nansum(agg["squares"], axis=0) + count = np.nansum(agg["count"], axis=0) + has_nans = np.any(agg["has_nans"], axis=0) + + assert sums.shape == count.shape + assert sums.shape == squares.shape + assert sums.shape == minimum.shape + assert sums.shape == maximum.shape + assert sums.shape == has_nans.shape + + mean = sums / count + assert sums.shape == mean.shape + + x = squares / count - mean * mean + # remove negative variance due to numerical errors + # x[- 1e-15 < (x / (np.sqrt(squares / count) + np.abs(mean))) < 0] = 0 + check_variance(x, self.variables, minimum, maximum, mean, count, sums, squares) + + stdev = np.sqrt(x) + assert sums.shape == stdev.shape + + self.summary = Summary( + minimum=minimum, + maximum=maximum, + mean=mean, + count=count, + sums=sums, + squares=squares, + stdev=stdev, + variables_names=self.variables, + has_nans=has_nans, + ) + LOG.info(f"Dataset {self.path} additions finalized.") + self.check_statistics() + self._write(self.summary) + self.tmp_storage.delete() + + def _write(self, summary): + for k in ["mean", "stdev", "minimum", "maximum", "sums", "squares", "count", "has_nans"]: + self._add_dataset(name=k, array=summary[k]) + self.registry.add_to_history("compute_statistics_end") + LOG.info(f"Wrote {self.name} additions in {self.path}") + + def check_statistics(self): + pass + + +class StatisticsAddition(GenericAdditions): + def __init__(self, **kwargs): + super().__init__("statistics_", **kwargs) + + z = zarr.open(self.path, mode="r") + start = z.attrs["statistics_start_date"] + end = z.attrs["statistics_end_date"] + self.ds = open_dataset(self.path, start=start, end=end) + + self.variables = self.ds.variables + self.dates = self.ds.dates + + assert len(self.variables) == self.ds.shape[1], self.ds.shape + self.total = len(self.dates) + + def run(self, parts): + chunk_filter = ChunkFilter(parts=parts, total=self.total) + for i in range(0, self.total): + if not chunk_filter(i): + continue + date = self.dates[i] + try: + arr = self.ds[i : i + 1, ...] + stats = compute_statistics(arr, self.variables, allow_nan=self.allow_nan) + self.tmp_storage.add([date, i, stats], key=date) + except MissingDateError: + self.tmp_storage.add([date, i, "missing"], key=date) + self.tmp_storage.flush() + LOG.info(f"Dataset {self.path} additions run.") + + def check_statistics(self): + ds = open_dataset(self.path) + ref = ds.statistics + for k in ds.statistics: + assert np.all(np.isclose(ref[k], self.summary[k], rtol=1e-4, atol=1e-4)), ( + k, + ref[k], + self.summary[k], + ) + + +class DeltaDataset: + def __init__(self, ds, idelta): + self.ds = ds + self.idelta = idelta + + def __getitem__(self, i): + j = i - self.idelta + if j < 0: + raise MissingDateError(f"Missing date {j}") + return self.ds[i : i + 1, ...] - self.ds[j : j + 1, ...] + + +class TendenciesStatisticsDeltaNotMultipleOfFrequency(ValueError): + pass + + +class TendenciesStatisticsAddition(GenericAdditions): + def __init__(self, path, delta=None, **kwargs): + full_ds = open_dataset(path) + self.variables = full_ds.variables + + frequency = full_ds.frequency + if delta is None: + delta = frequency + assert isinstance(delta, int), delta + if not delta % frequency == 0: + raise TendenciesStatisticsDeltaNotMultipleOfFrequency( + f"Delta {delta} is not a multiple of frequency {frequency}" + ) + idelta = delta // frequency + + super().__init__(path=path, name=f"tendencies_statistics_{delta}h", **kwargs) + + z = zarr.open(self.path, mode="r") + start = z.attrs["statistics_start_date"] + end = z.attrs["statistics_end_date"] + start = datetime.datetime.fromisoformat(start) + ds = open_dataset(self.path, start=start + datetime.timedelta(hours=delta), end=end) + self.dates = ds.dates + self.total = len(self.dates) + + ds = open_dataset(self.path, start=start, end=end) + self.ds = DeltaDataset(ds, idelta) + + def run(self, parts): + chunk_filter = ChunkFilter(parts=parts, total=self.total) + for i in range(0, self.total): + if not chunk_filter(i): + continue + date = self.dates[i] + try: + arr = self.ds[i] + stats = compute_statistics(arr, self.variables, allow_nan=self.allow_nan) + self.tmp_storage.add([date, i, stats], key=date) + except MissingDateError: + self.tmp_storage.add([date, i, "missing"], key=date) + self.tmp_storage.flush() + LOG.info(f"Dataset {self.path} additions run.") + + def _write(self, summary): + for k in ["mean", "stdev", "minimum", "maximum", "sums", "squares", "count", "has_nans"]: + self._add_dataset(name=f"{self.name}_{k}", array=summary[k]) + self.registry.add_to_history(f"compute_{self.name}_end") + LOG.info(f"Wrote {self.name} additions in {self.path}") diff --git a/src/anemoi/datasets/create/persistent.py b/src/anemoi/datasets/create/persistent.py new file mode 100644 index 00000000..29950553 --- /dev/null +++ b/src/anemoi/datasets/create/persistent.py @@ -0,0 +1,152 @@ +# (C) Copyright 2023 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +import glob +import hashlib +import json +import logging +import os +import pickle +import shutil +import socket + +import numpy as np +from anemoi.utils.provenance import gather_provenance_info + +LOG = logging.getLogger(__name__) + + +class PersistentDict: + version = 3 + + # Used in parrallel, during data loading, + # to write data in pickle files. + def __init__(self, directory, create=True): + """dirname: str + The directory where the data will be stored. + """ + self.dirname = directory + self.name, self.ext = os.path.splitext(os.path.basename(self.dirname)) + if create: + self.create() + + def create(self): + os.makedirs(self.dirname, exist_ok=True) + + def delete(self): + try: + shutil.rmtree(self.dirname) + except FileNotFoundError: + pass + + def __str__(self): + return f"{self.__class__.__name__}({self.dirname})" + + def items(self): + # use glob to read all pickles + files = glob.glob(self.dirname + "/*.pickle") + LOG.info(f"Reading {self.name} data, found {len(files)} files in {self.dirname}") + assert len(files) > 0, f"No files found in {self.dirname}" + for f in files: + with open(f, "rb") as f: + yield pickle.load(f) + + def add_provenance(self, **kwargs): + out = dict(provenance=gather_provenance_info(), **kwargs) + with open(os.path.join(self.dirname, "provenance.json"), "w") as f: + json.dump(out, f) + + def add(self, elt, *, key): + self[key] = elt + + def __setitem__(self, key, elt): + h = hashlib.sha256(str(key).encode("utf-8")).hexdigest() + path = os.path.join(self.dirname, f"{h}.pickle") + + if os.path.exists(path): + LOG.warn(f"{path} already exists") + + tmp_path = path + f".tmp-{os.getpid()}-on-{socket.gethostname()}" + with open(tmp_path, "wb") as f: + pickle.dump((key, elt), f) + shutil.move(tmp_path, path) + + LOG.debug(f"Written {self.name} data for len {key} in {path}") + + def flush(self): + pass + + +class BufferedPersistentDict(PersistentDict): + def __init__(self, buffer_size=1000, **kwargs): + self.buffer_size = buffer_size + self.elements = [] + self.keys = [] + self.storage = PersistentDict(**kwargs) + + def add(self, elt, *, key): + self.elements.append(elt) + self.keys.append(key) + if len(self.keys) > self.buffer_size: + self.flush() + + def flush(self): + k = sorted(self.keys) + self.storage.add(self.elements, key=k) + self.elements = [] + self.keys = [] + + def items(self): + for keys, elements in self.storage.items(): + for key, elt in zip(keys, elements): + yield key, elt + + def delete(self): + self.storage.delete() + + def create(self): + self.storage.create() + + +def build_storage(directory, create=True): + return BufferedPersistentDict(directory=directory, create=create) + + +if __name__ == "__main__": + N = 3 + P = 2 + directory = "h" + p = PersistentDict(directory=directory) + print(p) + assert os.path.exists(directory) + import numpy as np + + arrs = [np.random.randint(1, 101, size=(P,)) for _ in range(N)] + dates = [np.array([np.datetime64(f"2021-01-0{_+1}") + np.timedelta64(i, "h") for i in range(P)]) for _ in range(N)] + + print() + print("Writing the data") + for i in range(N): + _arr = arrs[i] + _dates = dates[i] + print(f"Writing : {i=}, {_arr=} {_dates=}") + p[_dates] = (i, _arr) + + print() + print("Reading the data back") + + p = PersistentDict(directory="h") + for _dates, (i, _arr) in p.items(): + print(f"{i=}, {_arr=}, {_dates=}") + + assert np.allclose(_arr, arrs[i]) + + assert len(_dates) == len(dates[i]) + for a, b in zip(_dates, dates[i]): + assert a == b diff --git a/src/anemoi/datasets/create/size.py b/src/anemoi/datasets/create/size.py new file mode 100644 index 00000000..1671290f --- /dev/null +++ b/src/anemoi/datasets/create/size.py @@ -0,0 +1,33 @@ +# (C) Copyright 2023 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +import logging +import os + +from anemoi.datasets.create.utils import progress_bar + +LOG = logging.getLogger(__name__) + + +def compute_directory_sizes(path): + if not os.path.isdir(path): + return None + + size, n = 0, 0 + bar = progress_bar(iterable=os.walk(path), desc=f"Computing size of {path}") + for dirpath, _, filenames in bar: + for filename in filenames: + file_path = os.path.join(dirpath, filename) + size += os.path.getsize(file_path) + n += 1 + + LOG.info(f"Total size: {bytes(size)}") + LOG.info(f"Total number of files: {n}") + + return dict(total_size=size, total_number_of_files=n) diff --git a/src/anemoi/datasets/create/statistics.py b/src/anemoi/datasets/create/statistics/__init__.py similarity index 61% rename from src/anemoi/datasets/create/statistics.py rename to src/anemoi/datasets/create/statistics/__init__.py index 9abe2e7a..cc3d8095 100644 --- a/src/anemoi/datasets/create/statistics.py +++ b/src/anemoi/datasets/create/statistics/__init__.py @@ -15,18 +15,58 @@ import pickle import shutil import socket -from collections import defaultdict import numpy as np from anemoi.utils.provenance import gather_provenance_info -from .check import StatisticsValueError -from .check import check_data_values -from .check import check_stats +from ..check import check_data_values +from .summary import Summary LOG = logging.getLogger(__name__) +def default_statistics_dates(dates): + """ + Calculate default statistics dates based on the given list of dates. + + Args: + dates (list): List of datetime objects representing dates. + + Returns: + tuple: A tuple containing the default start and end dates. + """ + + def to_datetime(d): + if isinstance(d, np.datetime64): + return d.tolist() + assert isinstance(d, datetime.datetime), d + return d + + first = dates[0] + last = dates[-1] + + first = to_datetime(first) + last = to_datetime(last) + + n_years = round((last - first).total_seconds() / (365.25 * 24 * 60 * 60)) + + if n_years < 10: + # leave out 20% of the data + k = int(len(dates) * 0.8) + end = dates[k - 1] + LOG.info(f"Number of years {n_years} < 10, leaving out 20%. {end=}") + return dates[0], end + + delta = 1 + if n_years >= 20: + delta = 3 + LOG.info(f"Number of years {n_years}, leaving out {delta} years.") + end_year = last.year - delta + + end = max(d for d in dates if to_datetime(d).year == end_year) + return dates[0], end + + def to_datetime(date): if isinstance(date, str): return np.datetime64(date) @@ -45,11 +85,12 @@ def check_variance(x, variables_names, minimum, maximum, mean, count, sums, squa print(x) print(variables_names) print(count) - for i, (var, y) in enumerate(zip(variables_names, x)): + for i, (name, y) in enumerate(zip(variables_names, x)): if y >= 0: continue + print("---") print( - var, + name, y, maximum[i], minimum[i], @@ -59,9 +100,9 @@ def check_variance(x, variables_names, minimum, maximum, mean, count, sums, squa squares[i], ) - print(var, np.min(sums[i]), np.max(sums[i]), np.argmin(sums[i])) - print(var, np.min(squares[i]), np.max(squares[i]), np.argmin(squares[i])) - print(var, np.min(count[i]), np.max(count[i]), np.argmin(count[i])) + print(name, np.min(sums[i]), np.max(sums[i]), np.argmin(sums[i])) + print(name, np.min(squares[i]), np.max(squares[i]), np.argmin(squares[i])) + print(name, np.min(count[i]), np.max(count[i]), np.argmin(count[i])) raise ValueError("Negative variance") @@ -69,7 +110,7 @@ def check_variance(x, variables_names, minimum, maximum, mean, count, sums, squa def compute_statistics(array, check_variables_names=None, allow_nan=False): nvars = array.shape[1] - LOG.info("Stats %s", (nvars, array.shape, check_variables_names)) + LOG.info(f"Stats {nvars}, {array.shape}, {check_variables_names}") if check_variables_names: assert nvars == len(check_variables_names), (nvars, check_variables_names) stats_shape = (array.shape[0], nvars) @@ -108,7 +149,7 @@ def compute_statistics(array, check_variables_names=None, allow_nan=False): } -class TempStatistics: +class TmpStatistics: version = 3 # Used in parrallel, during data loading, # to write statistics in pickled npz files. @@ -162,7 +203,7 @@ def get_aggregated(self, *args, **kwargs): return aggregator.aggregate() def __str__(self): - return f"TempStatistics({self.dirname})" + return f"TmpStatistics({self.dirname})" def normalise_date(d): @@ -289,7 +330,7 @@ def aggregate(self): allow_nan=False, ) - return Statistics( + return Summary( minimum=minimum, maximum=maximum, mean=mean, @@ -302,82 +343,119 @@ def aggregate(self): ) -class Statistics(dict): - STATS_NAMES = ["minimum", "maximum", "mean", "stdev", "has_nans"] # order matter for __str__. +class SummaryAggregator: + NAMES = ["minimum", "maximum", "sums", "squares", "count", "has_nans"] - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.check() + def __init__(self, owner, dates, variables_names, allow_nan): + dates = sorted(dates) + dates = to_datetimes(dates) + assert dates, "No dates selected" + self.owner = owner + self.dates = dates + self.variables_names = variables_names + self.allow_nan = allow_nan - @property - def size(self): - return len(self["variables_names"]) + self.shape = (len(self.dates), len(self.variables_names)) + LOG.info(f"Aggregating statistics on shape={self.shape}. Variables : {self.variables_names}") - def check(self): - for k, v in self.items(): - if k == "variables_names": - assert len(v) == self.size - continue - assert v.shape == (self.size,) - if k == "count": - assert (v >= 0).all(), (k, v) - assert v.dtype == np.int64, (k, v) - continue - if k == "has_nans": - assert v.dtype == np.bool_, (k, v) - continue - if k == "stdev": - assert (v >= 0).all(), (k, v) - assert v.dtype == np.float64, (k, v) - - for i, name in enumerate(self["variables_names"]): - try: - check_stats(**{k: v[i] for k, v in self.items()}, msg=f"{i} {name}") - check_data_values(self["minimum"][i], name=name) - check_data_values(self["maximum"][i], name=name) - check_data_values(self["mean"][i], name=name) - except StatisticsValueError as e: - e.args += (i, name) - raise + self.minimum = np.full(self.shape, np.nan, dtype=np.float64) + self.maximum = np.full(self.shape, np.nan, dtype=np.float64) + self.sums = np.full(self.shape, np.nan, dtype=np.float64) + self.squares = np.full(self.shape, np.nan, dtype=np.float64) + self.count = np.full(self.shape, -1, dtype=np.int64) + self.has_nans = np.full(self.shape, False, dtype=np.bool_) - def __str__(self): - header = ["Variables"] + self.STATS_NAMES - out = [" ".join(header)] - - out += [ - " ".join([v] + [f"{self[n][i]:.2f}" for n in self.STATS_NAMES]) - for i, v in enumerate(self["variables_names"]) - ] - return "\n".join(out) - - def save(self, filename, provenance=None): - assert filename.endswith(".json"), filename - dic = {} - for k in self.STATS_NAMES: - dic[k] = list(self[k]) - - out = dict(data=defaultdict(dict)) - for i, name in enumerate(self["variables_names"]): - for k in self.STATS_NAMES: - out["data"][name][k] = dic[k][i] - - out["provenance"] = provenance - - with open(filename, "w") as f: - json.dump(out, f, indent=2) - - def load(self, filename): - assert filename.endswith(".json"), filename - with open(filename) as f: - dic = json.load(f) - - dic_ = {} - for k, v in dic.items(): - if k == "count": - dic_[k] = np.array(v, dtype=np.int64) - continue - if k == "variables": - dic_[k] = v + self._read() + + def _read(self): + def check_type(a, b): + a = list(a) + b = list(b) + a = a[0] if a else None + b = b[0] if b else None + assert type(a) is type(b), (type(a), type(b)) + + found = set() + offset = 0 + for _, _dates, stats in self.owner._gather_data(): + for n in self.NAMES: + assert n in stats, (n, list(stats.keys())) + _dates = to_datetimes(_dates) + check_type(_dates, self.dates) + if found: + check_type(found, self.dates) + assert found.isdisjoint(_dates), "Duplicate dates found in precomputed statistics" + + # filter dates + dates = set(_dates) & set(self.dates) + + if not dates: + # dates have been completely filtered for this chunk continue - dic_[k] = np.array(v, dtype=np.float64) - return Statistics(dic_) + + # filter data + bitmap = np.isin(_dates, self.dates) + for k in self.NAMES: + stats[k] = stats[k][bitmap] + + assert stats["minimum"].shape[0] == len(dates), ( + stats["minimum"].shape, + len(dates), + ) + + # store data in self + found |= set(dates) + for name in self.NAMES: + array = getattr(self, name) + assert stats[name].shape[0] == len(dates), ( + stats[name].shape, + len(dates), + ) + array[offset : offset + len(dates)] = stats[name] + offset += len(dates) + + for d in self.dates: + assert d in found, f"Statistics for date {d} not precomputed." + assert len(self.dates) == len(found), "Not all dates found in precomputed statistics" + assert len(self.dates) == offset, "Not all dates found in precomputed statistics." + LOG.info(f"Statistics for {len(found)} dates found.") + + def aggregate(self): + minimum = np.nanmin(self.minimum, axis=0) + maximum = np.nanmax(self.maximum, axis=0) + sums = np.nansum(self.sums, axis=0) + squares = np.nansum(self.squares, axis=0) + count = np.nansum(self.count, axis=0) + has_nans = np.any(self.has_nans, axis=0) + mean = sums / count + + assert sums.shape == count.shape == squares.shape == mean.shape == minimum.shape == maximum.shape + + x = squares / count - mean * mean + # remove negative variance due to numerical errors + # x[- 1e-15 < (x / (np.sqrt(squares / count) + np.abs(mean))) < 0] = 0 + check_variance(x, self.variables_names, minimum, maximum, mean, count, sums, squares) + stdev = np.sqrt(x) + + for j, name in enumerate(self.variables_names): + check_data_values( + np.array( + [ + mean[j], + ] + ), + name=name, + allow_nan=False, + ) + + return Summary( + minimum=minimum, + maximum=maximum, + mean=mean, + count=count, + sums=sums, + squares=squares, + stdev=stdev, + variables_names=self.variables_names, + has_nans=has_nans, + ) diff --git a/src/anemoi/datasets/create/statistics/summary.py b/src/anemoi/datasets/create/statistics/summary.py new file mode 100644 index 00000000..1434688e --- /dev/null +++ b/src/anemoi/datasets/create/statistics/summary.py @@ -0,0 +1,108 @@ +# (C) Copyright 2024 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# +import json +from collections import defaultdict + +import numpy as np + +from ..check import StatisticsValueError +from ..check import check_data_values +from ..check import check_stats + + +class Summary(dict): + """This class is used to store the summary statistics of a dataset. + It can be saved and loaded from a json file. + And does some basic checks on the data. + """ + + STATS_NAMES = [ + "minimum", + "maximum", + "mean", + "stdev", + "has_nans", + ] # order matter for __str__. + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.check() + + @property + def size(self): + return len(self["variables_names"]) + + def check(self): + for k, v in self.items(): + if k == "variables_names": + assert len(v) == self.size + continue + assert v.shape == (self.size,) + if k == "count": + assert (v >= 0).all(), (k, v) + assert v.dtype == np.int64, (k, v) + continue + if k == "has_nans": + assert v.dtype == np.bool_, (k, v) + continue + if k == "stdev": + assert (v >= 0).all(), (k, v) + assert v.dtype == np.float64, (k, v) + + for i, name in enumerate(self["variables_names"]): + try: + check_stats(**{k: v[i] for k, v in self.items()}, msg=f"{i} {name}") + check_data_values(self["minimum"][i], name=name) + check_data_values(self["maximum"][i], name=name) + check_data_values(self["mean"][i], name=name) + except StatisticsValueError as e: + e.args += (i, name) + raise + + def __str__(self): + header = ["Variables"] + self.STATS_NAMES + out = [" ".join(header)] + + out += [ + " ".join([v] + [f"{self[n][i]:.2f}" for n in self.STATS_NAMES]) + for i, v in enumerate(self["variables_names"]) + ] + return "\n".join(out) + + def save(self, filename, **metadata): + assert filename.endswith(".json"), filename + dic = {} + for k in self.STATS_NAMES: + dic[k] = list(self[k]) + + out = dict(data=defaultdict(dict)) + for i, name in enumerate(self["variables_names"]): + for k in self.STATS_NAMES: + out["data"][name][k] = dic[k][i] + + out["metadata"] = metadata + + with open(filename, "w") as f: + json.dump(out, f, indent=2) + + def load(self, filename): + assert filename.endswith(".json"), filename + with open(filename) as f: + dic = json.load(f) + + dic_ = {} + for k, v in dic.items(): + if k == "count": + dic_[k] = np.array(v, dtype=np.int64) + continue + if k == "variables": + dic_[k] = v + continue + dic_[k] = np.array(v, dtype=np.float64) + return Summary(dic_) diff --git a/src/anemoi/datasets/create/utils.py b/src/anemoi/datasets/create/utils.py index aa0e8137..71c273b6 100644 --- a/src/anemoi/datasets/create/utils.py +++ b/src/anemoi/datasets/create/utils.py @@ -71,21 +71,6 @@ def load_json_or_yaml(path): raise ValueError(f"Cannot read file {path}. Need json or yaml with appropriate extension.") -def compute_directory_sizes(path): - if not os.path.isdir(path): - return None - - size, n = 0, 0 - bar = progress_bar(iterable=os.walk(path), desc=f"Computing size of {path}") - for dirpath, _, filenames in bar: - for filename in filenames: - file_path = os.path.join(dirpath, filename) - size += os.path.getsize(file_path) - n += 1 - - return dict(total_size=size, total_number_of_files=n) - - def make_list_int(value): if isinstance(value, str): if "/" not in value: diff --git a/src/anemoi/datasets/create/writer.py b/src/anemoi/datasets/create/writer.py index 2182f8af..3117f5d1 100644 --- a/src/anemoi/datasets/create/writer.py +++ b/src/anemoi/datasets/create/writer.py @@ -8,52 +8,12 @@ # import logging -import warnings import numpy as np LOG = logging.getLogger(__name__) -class CubesFilter: - def __init__(self, *, parts, total): - if parts is None: - self.parts = None - return - - if len(parts) == 1: - part = parts[0] - if part.lower() in ["all", "*"]: - self.parts = None - return - - if "/" in part: - i_chunk, n_chunks = part.split("/") - i_chunk, n_chunks = int(i_chunk), int(n_chunks) - - assert i_chunk > 0, f"Chunk number {i_chunk} must be positive." - if n_chunks > total: - warnings.warn( - f"Number of chunks {n_chunks} is larger than the total number of chunks: {total}+1. " - "Some chunks will be empty." - ) - - chunk_size = total / n_chunks - parts = [x for x in range(total) if x >= (i_chunk - 1) * chunk_size and x < i_chunk * chunk_size] - - parts = [int(_) for _ in parts] - LOG.info(f"Running parts: {parts}") - if not parts: - warnings.warn(f"Nothing to do for chunk {i_chunk}/{n_chunks}.") - - self.parts = parts - - def __call__(self, i): - if self.parts is None: - return True - return i in self.parts - - class ViewCacheArray: """A class that provides a caching mechanism for writing to a NumPy-like array. diff --git a/tests/test_chunks.py b/tests/test_chunks.py new file mode 100644 index 00000000..4dfc38e7 --- /dev/null +++ b/tests/test_chunks.py @@ -0,0 +1,89 @@ +# (C) Copyright 2024 European Centre for Medium-Range Weather Forecasts. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + + +import pytest + +from anemoi.datasets.create.chunks import ChunkFilter + + +def test_chunk_filter(): + # Test case 1: no filter + cf = ChunkFilter(parts=None, total=10) + assert cf(5) is True + assert len(list(cf)) == len(cf) + + cf = ChunkFilter(parts="", total=10) + assert cf(5) is True + assert len(list(cf)) == len(cf) + + cf = ChunkFilter(parts=[], total=10) + assert cf(5) is True + assert len(list(cf)) == len(cf) + + # Test case 2: wrong input + with pytest.raises(AssertionError): + cf = ChunkFilter(parts="4/3", total=10) + + cf = ChunkFilter(parts="1/3", total=10) + with pytest.raises(AssertionError): + cf(-1) + with pytest.raises(AssertionError): + cf(10) + + # Test case 3: parts is a string representation of fraction + cf = ChunkFilter(parts="1/3", total=10) + cf_ = ChunkFilter(parts=["1/3"], total=10) + assert cf(0) is cf_(0) is True + assert cf(1) is cf_(1) is True + assert cf(2) is cf_(2) is True + assert cf(3) is cf_(3) is True + assert cf(4) is cf_(4) is False + assert cf(5) is cf_(5) is False + assert cf(6) is cf_(6) is False + assert cf(7) is cf_(7) is False + assert cf(8) is cf_(8) is False + assert cf(9) is cf_(9) is False + assert len(list(cf)) == len(cf) + + cf = ChunkFilter(parts="2/3", total=10) + cf_ = ChunkFilter(parts=["2/3"], total=10) + assert cf(0) is cf_(0) is False + assert cf(1) is cf_(1) is False + assert cf(2) is cf_(2) is False + assert cf(3) is cf_(3) is False + assert cf(4) is cf_(4) is True + assert cf(5) is cf_(5) is True + assert cf(6) is cf_(6) is True + assert cf(7) is cf_(7) is False + assert cf(8) is cf_(8) is False + assert cf(9) is cf_(9) is False + assert len(list(cf)) == len(cf) + + cf = ChunkFilter(parts="3/3", total=10) + cf_ = ChunkFilter(parts=["3/3"], total=10) + assert cf(0) is cf_(0) is False + assert cf(1) is cf_(1) is False + assert cf(2) is cf_(2) is False + assert cf(3) is cf_(3) is False + assert cf(4) is cf_(4) is False + assert cf(5) is cf_(5) is False + assert cf(6) is cf_(6) is False + assert cf(7) is cf_(7) is True + assert cf(8) is cf_(8) is True + assert cf(9) is cf_(9) is True + assert len(list(cf)) == len(cf) + + # Test case 4: test __iter__ + cf = ChunkFilter(parts="2/3", total=10) + for i in cf: + assert cf(i) is True + assert len(list(cf)) == 3 + + +if __name__ == "__main__": + test_chunk_filter() diff --git a/tests/test_dates.py b/tests/test_dates.py index 77f6ebdc..c16185b2 100644 --- a/tests/test_dates.py +++ b/tests/test_dates.py @@ -10,7 +10,7 @@ import numpy as np import pytest -from anemoi.datasets.create.loaders import default_statistics_dates +from anemoi.datasets.create.statistics import default_statistics_dates _ = datetime.datetime From 7961f1622cac8f2de865771ea28b31912324839f Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 2 May 2024 09:45:35 +0000 Subject: [PATCH 12/14] add optional caching in open_zarr --- src/anemoi/datasets/data/stores.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/anemoi/datasets/data/stores.py b/src/anemoi/datasets/data/stores.py index bdce9cd6..dc5d3d63 100644 --- a/src/anemoi/datasets/data/stores.py +++ b/src/anemoi/datasets/data/stores.py @@ -102,7 +102,7 @@ def __contains__(self, key): return key in self.store -def open_zarr(path, dont_fail=False): +def open_zarr(path, dont_fail=False, cache=None): try: store = path @@ -117,6 +117,9 @@ def open_zarr(path, dont_fail=False): store = zarr.storage.DirectoryStore(store) store = DebugStore(store) + if cache is not None: + store = zarr.LRUStoreCache(store, max_size=cache) + return zarr.convenience.open(store, "r") except zarr.errors.PathNotFoundError: if not dont_fail: From 64378e21e716493365c60253c77f19a52f72c290 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 2 May 2024 12:45:17 +0000 Subject: [PATCH 13/14] perturbations in anemoi.datasets.compute.* --- pyproject.toml | 2 +- src/anemoi/datasets/compute/perturbations.py | 109 ++++++++++++++++++ .../create/functions/sources/perturbations.py | 98 +--------------- 3 files changed, 113 insertions(+), 96 deletions(-) create mode 100644 src/anemoi/datasets/compute/perturbations.py diff --git a/pyproject.toml b/pyproject.toml index 0e1871a9..619ba9f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ classifiers = [ ] dependencies = [ - "anemoi-utils[provenance]", + "anemoi-utils[provenance]>=0.1.7", "zarr", "pyyaml", "numpy", diff --git a/src/anemoi/datasets/compute/perturbations.py b/src/anemoi/datasets/compute/perturbations.py new file mode 100644 index 00000000..c09041f3 --- /dev/null +++ b/src/anemoi/datasets/compute/perturbations.py @@ -0,0 +1,109 @@ +# (C) Copyright 2024 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +import warnings + +import numpy as np +from climetlab.core.temporary import temp_file +from climetlab.readers.grib.output import new_grib_output + +from anemoi.datasets.create.check import check_data_values +from anemoi.datasets.create.functions import assert_is_fieldset + + +def perturbations( + members, + center, + positive_clipping_variables=[ + "q", + "cp", + "lsp", + "tp", + ], # add "swl4", "swl3", "swl2", "swl1", "swl0", and more ? +): + + keys = ["param", "level", "valid_datetime", "date", "time", "step", "number"] + + def check_compatible(f1, f2, ignore=["number"]): + for k in keys + ["grid", "shape"]: + if k in ignore: + continue + assert f1.metadata(k) == f2.metadata(k), (k, f1.metadata(k), f2.metadata(k)) + + print(f"Retrieving ensemble data with {members}") + print(f"Retrieving center data with {center}") + + members = members.order_by(*keys) + center = center.order_by(*keys) + + number_list = members.unique_values("number")["number"] + n_numbers = len(number_list) + + if len(center) * n_numbers != len(members): + print(len(center), n_numbers, len(members)) + for f in members: + print("Member: ", f) + for f in center: + print("Center: ", f) + raise ValueError(f"Inconsistent number of fields: {len(center)} * {n_numbers} != {len(members)}") + + # prepare output tmp file so we can read it back + tmp = temp_file() + path = tmp.path + out = new_grib_output(path) + + for i, center_field in enumerate(center): + param = center_field.metadata("param") + + # load the center field + center_np = center_field.to_numpy() + + # load the ensemble fields and compute the mean + members_np = np.zeros((n_numbers, *center_np.shape)) + + for j in range(n_numbers): + ensemble_field = members[i * n_numbers + j] + check_compatible(center_field, ensemble_field) + members_np[j] = ensemble_field.to_numpy() + + mean_np = members_np.mean(axis=0) + + for j in range(n_numbers): + template = members[i * n_numbers + j] + e = members_np[j] + m = mean_np + c = center_np + + assert e.shape == c.shape == m.shape, (e.shape, c.shape, m.shape) + + x = c - m + e + + if param in positive_clipping_variables: + warnings.warn(f"Clipping {param} to be positive") + x = np.maximum(x, 0) + + assert x.shape == e.shape, (x.shape, e.shape) + + check_data_values(x, name=param) + out.write(x, template=template) + template = None + + out.close() + + from climetlab import load_source + + ds = load_source("file", path) + assert_is_fieldset(ds) + # save a reference to the tmp file so it is deleted + # only when the dataset is not used anymore + ds._tmp = tmp + + assert len(ds) == len(members), (len(ds), len(members)) + + return ds diff --git a/src/anemoi/datasets/create/functions/sources/perturbations.py b/src/anemoi/datasets/create/functions/sources/perturbations.py index 3300b05f..53428da8 100644 --- a/src/anemoi/datasets/create/functions/sources/perturbations.py +++ b/src/anemoi/datasets/create/functions/sources/perturbations.py @@ -6,16 +6,11 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. # -import warnings from copy import deepcopy -import numpy as np -from climetlab.core.temporary import temp_file -from climetlab.readers.grib.output import new_grib_output +from anemoi.datasets.compute.perturbations import perturbations as compute_perturbations -from anemoi.datasets.create.check import check_data_values -from anemoi.datasets.create.functions import assert_is_fieldset -from anemoi.datasets.create.functions.actions.mars import mars +from .mars import mars def to_list(x): @@ -58,94 +53,7 @@ def load_if_needed(context, dates, dict_or_dataset): def perturbations(context, dates, members, center, remapping={}, patches={}): members = load_if_needed(context, dates, members) center = load_if_needed(context, dates, center) - # return perturbations(member, centers....) - - keys = ["param", "level", "valid_datetime", "date", "time", "step", "number"] - - def check_compatible(f1, f2, ignore=["number"]): - for k in keys + ["grid", "shape"]: - if k in ignore: - continue - assert f1.metadata(k) == f2.metadata(k), (k, f1.metadata(k), f2.metadata(k)) - - print(f"Retrieving ensemble data with {members}") - print(f"Retrieving center data with {center}") - - members = members.order_by(*keys) - center = center.order_by(*keys) - - number_list = members.unique_values("number")["number"] - n_numbers = len(number_list) - - if len(center) * n_numbers != len(members): - print(len(center), n_numbers, len(members)) - for f in members: - print("Member: ", f) - for f in center: - print("Center: ", f) - raise ValueError(f"Inconsistent number of fields: {len(center)} * {n_numbers} != {len(members)}") - - # prepare output tmp file so we can read it back - tmp = temp_file() - path = tmp.path - out = new_grib_output(path) - - for i, center_field in enumerate(center): - param = center_field.metadata("param") - - # load the center field - center_np = center_field.to_numpy() - - # load the ensemble fields and compute the mean - members_np = np.zeros((n_numbers, *center_np.shape)) - - for j in range(n_numbers): - ensemble_field = members[i * n_numbers + j] - check_compatible(center_field, ensemble_field) - members_np[j] = ensemble_field.to_numpy() - - mean_np = members_np.mean(axis=0) - - for j in range(n_numbers): - template = members[i * n_numbers + j] - e = members_np[j] - m = mean_np - c = center_np - - assert e.shape == c.shape == m.shape, (e.shape, c.shape, m.shape) - - FORCED_POSITIVE = [ - "q", - "cp", - "lsp", - "tp", - ] # add "swl4", "swl3", "swl2", "swl1", "swl0", and more ? - - x = c - m + e - - if param in FORCED_POSITIVE: - warnings.warn(f"Clipping {param} to be positive") - x = np.maximum(x, 0) - - assert x.shape == e.shape, (x.shape, e.shape) - - check_data_values(x, name=param) - out.write(x, template=template) - template = None - - out.close() - - from climetlab import load_source - - ds = load_source("file", path) - assert_is_fieldset(ds) - # save a reference to the tmp file so it is deleted - # only when the dataset is not used anymore - ds._tmp = tmp - - assert len(ds) == len(members), (len(ds), len(members)) - - return ds + return compute_perturbations(members, center) execute = perturbations From 69256c0600c86b229ef535eb1ca04f94a90760aa Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Fri, 3 May 2024 13:01:36 +0000 Subject: [PATCH 14/14] adding read for statistics tendencies --- src/anemoi/datasets/create/loaders.py | 6 ++++-- src/anemoi/datasets/data/forewards.py | 5 +++++ src/anemoi/datasets/data/join.py | 8 ++++++++ src/anemoi/datasets/data/select.py | 5 +++++ src/anemoi/datasets/data/statistics.py | 5 +++++ src/anemoi/datasets/data/stores.py | 15 +++++++++++++++ src/anemoi/datasets/data/unchecked.py | 4 ++++ 7 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/anemoi/datasets/create/loaders.py b/src/anemoi/datasets/create/loaders.py index 063405f6..780a078e 100644 --- a/src/anemoi/datasets/create/loaders.py +++ b/src/anemoi/datasets/create/loaders.py @@ -550,7 +550,7 @@ def __init__(self, name="", **kwargs): super().__init__(**kwargs) self.name = name - storage_path = os.path.join(self.path + ".tmp_data", name) + storage_path = f"{self.path}.tmp_storage_{name}" self.tmp_storage = build_storage(directory=storage_path, create=True) def initialise(self): @@ -725,6 +725,8 @@ class TendenciesStatisticsDeltaNotMultipleOfFrequency(ValueError): class TendenciesStatisticsAddition(GenericAdditions): + DATASET_NAME_PATTERN = "statistics_tendencies_{delta}" + def __init__(self, path, delta=None, **kwargs): full_ds = open_dataset(path) self.variables = full_ds.variables @@ -739,7 +741,7 @@ def __init__(self, path, delta=None, **kwargs): ) idelta = delta // frequency - super().__init__(path=path, name=f"tendencies_statistics_{delta}h", **kwargs) + super().__init__(path=path, name=self.DATASET_NAME_PATTERN.format(delta=f"{delta}h"), **kwargs) z = zarr.open(self.path, mode="r") start = z.attrs["statistics_start_date"] diff --git a/src/anemoi/datasets/data/forewards.py b/src/anemoi/datasets/data/forewards.py index 078c68ae..197e2fcd 100644 --- a/src/anemoi/datasets/data/forewards.py +++ b/src/anemoi/datasets/data/forewards.py @@ -67,6 +67,11 @@ def variables(self): def statistics(self): return self.forward.statistics + def statistics_tendencies(self, delta=None): + if delta is None: + delta = self.frequency + return self.forward.statistics_tendencies(delta) + @property def shape(self): return self.forward.shape diff --git a/src/anemoi/datasets/data/join.py b/src/anemoi/datasets/data/join.py index fd382ccd..b47f7762 100644 --- a/src/anemoi/datasets/data/join.py +++ b/src/anemoi/datasets/data/join.py @@ -121,6 +121,14 @@ def statistics(self): k: np.concatenate([d.statistics[k] for d in self.datasets], axis=0) for k in self.datasets[0].statistics } + def statistics_tendencies(self, delta=None): + if delta is None: + delta = self.frequency + return { + k: np.concatenate([d.statistics_tendencies(delta)[k] for d in self.datasets], axis=0) + for k in self.datasets[0].statistics_tendencies(delta) + } + def source(self, index): i = index for dataset in self.datasets: diff --git a/src/anemoi/datasets/data/select.py b/src/anemoi/datasets/data/select.py index aaf2f224..feae504b 100644 --- a/src/anemoi/datasets/data/select.py +++ b/src/anemoi/datasets/data/select.py @@ -74,6 +74,11 @@ def name_to_index(self): def statistics(self): return {k: v[self.indices] for k, v in self.dataset.statistics.items()} + def statistics_tendencies(self, delta=None): + if delta is None: + delta = self.frequency + return {k: v[self.indices] for k, v in self.dataset.statistics_tendencies(delta).items()} + def metadata_specific(self, **kwargs): return super().metadata_specific(indices=self.indices, **kwargs) diff --git a/src/anemoi/datasets/data/statistics.py b/src/anemoi/datasets/data/statistics.py index 78f0e1cb..90b3115c 100644 --- a/src/anemoi/datasets/data/statistics.py +++ b/src/anemoi/datasets/data/statistics.py @@ -29,6 +29,11 @@ def __init__(self, dataset, statistic): def statistics(self): return self._statistic.statistics + def statistics_tendencies(self, delta=None): + if delta is None: + delta = self.frequency + return self._statistic.statistics_tendencies(delta) + def metadata_specific(self, **kwargs): return super().metadata_specific( statistics=self._statistic.metadata_specific(), diff --git a/src/anemoi/datasets/data/stores.py b/src/anemoi/datasets/data/stores.py index dc5d3d63..62173e08 100644 --- a/src/anemoi/datasets/data/stores.py +++ b/src/anemoi/datasets/data/stores.py @@ -218,6 +218,21 @@ def statistics(self): minimum=self.z.minimum[:], ) + def statistics_tendencies(self, delta=None): + if delta is None: + delta = self.frequency + if isinstance(delta, int): + delta = f"{delta}h" + from anemoi.datasets.create.loaders import TendenciesStatisticsAddition + + prefix = TendenciesStatisticsAddition.DATASET_NAME_PATTERN.format(delta=delta) + "_" + return dict( + mean=self.z[f"{prefix}mean"][:], + stdev=self.z[f"{prefix}stdev"][:], + maximum=self.z[f"{prefix}maximum"][:], + minimum=self.z[f"{prefix}minimum"][:], + ) + @property def resolution(self): return self.z.attrs["resolution"] diff --git a/src/anemoi/datasets/data/unchecked.py b/src/anemoi/datasets/data/unchecked.py index 7d9b02b1..776a0f92 100644 --- a/src/anemoi/datasets/data/unchecked.py +++ b/src/anemoi/datasets/data/unchecked.py @@ -101,6 +101,10 @@ def variables(self): def statistics(self): raise NotImplementedError() + @check("check_same_variables") + def statistics_tendencies(self, delta=None): + raise NotImplementedError() + @property def shape(self): raise NotImplementedError()