Skip to content

Commit 4426fd7

Browse files
committed
implemented statistics with tendencies
1 parent e9622c4 commit 4426fd7

File tree

13 files changed

+1000
-301
lines changed

13 files changed

+1000
-301
lines changed

src/anemoi/datasets/create/__init__.py

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ def init(self, check_name=False):
3232
# check path
3333
_, ext = os.path.splitext(self.path)
3434
assert ext != "zarr", f"Unsupported extension={ext}"
35-
from .loaders import InitialiseLoader
35+
from .loaders import InitialiserLoader
3636

3737
if self._path_readable() and not self.overwrite:
3838
raise Exception(f"{self.path} already exists. Use overwrite=True to overwrite.")
3939

4040
with self._cache_context():
41-
obj = InitialiseLoader.from_config(
41+
obj = InitialiserLoader.from_config(
4242
path=self.path,
4343
config=self.config,
4444
statistics_tmp=self.statistics_tmp,
@@ -59,12 +59,11 @@ def load(self, parts=None):
5959
loader.load()
6060

6161
def statistics(self, force=False, output=None, start=None, end=None):
62-
from .loaders import StatisticsLoader
62+
from .loaders import StatisticsAdder
6363

64-
loader = StatisticsLoader.from_dataset(
64+
loader = StatisticsAdder.from_dataset(
6565
path=self.path,
6666
print=self.print,
67-
force=force,
6867
statistics_tmp=self.statistics_tmp,
6968
statistics_output=output,
7069
recompute=False,
@@ -74,26 +73,72 @@ def statistics(self, force=False, output=None, start=None, end=None):
7473
loader.run()
7574

7675
def size(self):
77-
from .loaders import SizeLoader
76+
from .loaders import DatasetHandler
77+
from .size import compute_directory_sizes
7878

79-
loader = SizeLoader.from_dataset(path=self.path, print=self.print)
80-
loader.add_total_size()
79+
metadata = compute_directory_sizes(self.path)
80+
handle = DatasetHandler.from_dataset(path=self.path, print=self.print)
81+
handle.update_metadata(**metadata)
8182

8283
def cleanup(self):
83-
from .loaders import CleanupLoader
84+
from .loaders import DatasetHandlerWithStatistics
8485

85-
loader = CleanupLoader.from_dataset(
86-
path=self.path,
87-
print=self.print,
88-
statistics_tmp=self.statistics_tmp,
86+
cleaner = DatasetHandlerWithStatistics.from_dataset(
87+
path=self.path, print=self.print, statistics_tmp=self.statistics_tmp
8988
)
90-
loader.run()
89+
cleaner.tmp_statistics.delete()
90+
cleaner.registry.clean()
9191

9292
def patch(self, **kwargs):
9393
from .patch import apply_patch
9494

9595
apply_patch(self.path, **kwargs)
9696

97+
def init_additions(self, delta=[1, 3, 6, 12]):
98+
from .loaders import StatisticsAddition
99+
from .loaders import TendenciesStatisticsAddition
100+
from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency
101+
102+
a = StatisticsAddition.from_dataset(path=self.path, print=self.print)
103+
a.initialise()
104+
105+
for d in delta:
106+
try:
107+
a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d)
108+
a.initialise()
109+
except TendenciesStatisticsDeltaNotMultipleOfFrequency:
110+
self.print(f"Skipping delta={d} as it is not a multiple of the frequency.")
111+
112+
def run_additions(self, parts=None, delta=[1, 3, 6, 12]):
113+
from .loaders import StatisticsAddition
114+
from .loaders import TendenciesStatisticsAddition
115+
from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency
116+
117+
a = StatisticsAddition.from_dataset(path=self.path, print=self.print)
118+
a.run(parts)
119+
120+
for d in delta:
121+
try:
122+
a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d)
123+
a.run(parts)
124+
except TendenciesStatisticsDeltaNotMultipleOfFrequency:
125+
self.print(f"Skipping delta={d} as it is not a multiple of the frequency.")
126+
127+
def finalise_additions(self, delta=[1, 3, 6, 12]):
128+
from .loaders import StatisticsAddition
129+
from .loaders import TendenciesStatisticsAddition
130+
from .loaders import TendenciesStatisticsDeltaNotMultipleOfFrequency
131+
132+
a = StatisticsAddition.from_dataset(path=self.path, print=self.print)
133+
a.finalise()
134+
135+
for d in delta:
136+
try:
137+
a = TendenciesStatisticsAddition.from_dataset(path=self.path, print=self.print, delta=d)
138+
a.finalise()
139+
except TendenciesStatisticsDeltaNotMultipleOfFrequency:
140+
self.print(f"Skipping delta={d} as it is not a multiple of the frequency.")
141+
97142
def finalise(self, **kwargs):
98143
self.statistics(**kwargs)
99144
self.size()
@@ -102,8 +147,14 @@ def create(self):
102147
self.init()
103148
self.load()
104149
self.finalise()
150+
self.additions()
105151
self.cleanup()
106152

153+
def additions(self):
154+
self.init_additions()
155+
self.run_additions()
156+
self.finalise_additions()
157+
107158
def _cache_context(self):
108159
from .utils import cache_context
109160

src/anemoi/datasets/create/check.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,14 @@
88
#
99

1010
import logging
11-
import os
1211
import re
1312
import warnings
1413

1514
import numpy as np
16-
import tqdm
1715

1816
LOG = logging.getLogger(__name__)
1917

2018

21-
def compute_directory_size(path):
22-
if not os.path.isdir(path):
23-
return None
24-
size = 0
25-
n = 0
26-
for dirpath, _, filenames in tqdm.tqdm(os.walk(path), desc="Computing size", leave=False):
27-
for filename in filenames:
28-
file_path = os.path.join(dirpath, filename)
29-
size += os.path.getsize(file_path)
30-
n += 1
31-
return size, n
32-
33-
3419
class DatasetName:
3520
def __init__(
3621
self,

src/anemoi/datasets/create/chunks.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# (C) Copyright 2024 ECMWF.
2+
#
3+
# This software is licensed under the terms of the Apache Licence Version 2.0
4+
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5+
# In applying this licence, ECMWF does not waive the privileges and immunities
6+
# granted to it by virtue of its status as an intergovernmental organisation
7+
# nor does it submit to any jurisdiction.
8+
#
9+
import logging
10+
import warnings
11+
12+
LOG = logging.getLogger(__name__)
13+
14+
ALL = object()
15+
16+
17+
class ChunkFilter:
18+
def __init__(self, *, parts, total):
19+
self.total = total
20+
21+
if isinstance(parts, list):
22+
if len(parts) == 1:
23+
parts = parts[0]
24+
elif len(parts) == 0:
25+
parts = None
26+
else:
27+
raise ValueError(f"Invalid parts format: {parts}. Must be in the form 'i/n'.")
28+
29+
if not parts:
30+
parts = "all"
31+
32+
assert isinstance(parts, str), f"Argument parts must be a string, got {parts}."
33+
34+
if parts.lower() == "all" or parts == "*":
35+
self.allowed = ALL
36+
return
37+
38+
assert "/" in parts, f"Invalid parts format: {parts}. Must be in the form 'i/n'."
39+
40+
i, n = parts.split("/")
41+
i, n = int(i), int(n)
42+
43+
assert i > 0, f"Chunk number {i} must be positive."
44+
assert i <= n, f"Chunk number {i} must be less than total chunks {n}."
45+
if n > total:
46+
warnings.warn(
47+
f"Number of chunks {n} is larger than the total number of chunks: {total}. "
48+
"Some chunks will be empty."
49+
)
50+
51+
chunk_size = total / n
52+
parts = [x for x in range(total) if x >= (i - 1) * chunk_size and x < i * chunk_size]
53+
54+
for i in parts:
55+
if i < 0 or i >= total:
56+
raise AssertionError(f"Invalid chunk number {i}. Must be between 0 and {total - 1}.")
57+
if not parts:
58+
warnings.warn(f"Nothing to do for chunk {i}/{n}.")
59+
60+
LOG.info(f"Running parts: {parts}")
61+
62+
self.allowed = parts
63+
64+
def __call__(self, i):
65+
if i < 0 or i >= self.total:
66+
raise AssertionError(f"Invalid chunk number {i}. Must be between 0 and {self.total - 1}.")
67+
68+
if self.allowed == ALL:
69+
return True
70+
return i in self.allowed
71+
72+
def __iter__(self):
73+
for i in range(self.total):
74+
if self(i):
75+
yield i
76+
77+
def __len__(self):
78+
return len([_ for _ in self])

src/anemoi/datasets/create/functions/sources/perturbations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def load_if_needed(context, dates, dict_or_dataset):
5858
def perturbations(context, dates, members, center, remapping={}, patches={}):
5959
members = load_if_needed(context, dates, members)
6060
center = load_if_needed(context, dates, center)
61+
# return perturbations(member, centers....)
6162

6263
keys = ["param", "level", "valid_datetime", "date", "time", "step", "number"]
6364

0 commit comments

Comments
 (0)