From 382216dea0aa26697a02123c08c742b2cd7e7330 Mon Sep 17 00:00:00 2001 From: rhaegar325 Date: Wed, 3 Dec 2025 14:57:39 +1100 Subject: [PATCH 1/3] solve memory exceeded in write --- src/access_moppy/base.py | 84 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/src/access_moppy/base.py b/src/access_moppy/base.py index e8f9c92..4d7c262 100644 --- a/src/access_moppy/base.py +++ b/src/access_moppy/base.py @@ -1,11 +1,16 @@ import warnings +import psutil from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Union import netCDF4 as nc import xarray as xr +import netCDF4 as nc from cftime import num2date +from dask.distributed import get_client +from pathlib import Path +from cftime import num2date from access_moppy.utilities import ( FrequencyMismatchError, @@ -274,6 +279,85 @@ def write(self): raise ValueError( f"Missing required CMIP6 global attributes for filename: {missing}" ) + + # ========== Memory Check ========== + # This section estimates the data size and compares it against available memory + # to prevent out-of-memory errors during the write operation. + + def estimate_data_size(ds, cmor_name): + total_size = 0 + for var in ds.variables: + vdat = ds[var] + # Start with the size of a single element (e.g., 4 bytes for float32) + var_size = vdat.dtype.itemsize + # Multiply by the size of each dimension to get total elements + for dim in vdat.dims: + var_size *= ds.sizes[dim] + total_size += var_size + # Apply 1.5x overhead factor for safe memory estimation + return int(total_size * 1.5) + + # Calculate the estimated data size for this dataset + data_size = estimate_data_size(self.ds, self.cmor_name) + + # Get system memory information using psutil + available_memory = psutil.virtual_memory().available + + # ========== Dask Client Detection ========== + # Check if a Dask distributed client exists, as this affects how we handle + # memory management. Dask clusters have their own memory limits separate + # from system memory. + + client = None + worker_memory = None # Memory limit of a single worker + total_cluster_memory = None # Sum of all workers' memory limits + + try: + # Attempt to get an existing Dask client + client = get_client() + + # Retrieve information about all workers in the cluster + worker_info = client.scheduler_info()["workers"] + + if worker_info: + # Get the minimum memory_limit across all workers + worker_memory = min(w["memory_limit"] for w in worker_info.values()) + + # Sum up all workers' memory for total cluster capacity + total_cluster_memory = sum(w["memory_limit"] for w in worker_info.values()) + + except ValueError: + # No Dask client exists - we'll use local/system memory for writing + pass + + # ========== Memory Validation Logic ========== + # This section implements a decision tree based on data size vs available memory: + + if client is not None: + # Dask client exists - check against cluster memory limits + if data_size > worker_memory: + # WARNING: Data fits in total cluster memory but exceeds single worker capacity + print( + f"Warning: Data size ({data_size / 1024**3:.2f} GB) exceeds single worker memory " + f"({worker_memory / 1024**3:.2f} GB) but fits in total cluster memory " + f"({total_cluster_memory / 1024**3:.2f} GB)." + ) + print("Closing Dask client to use local memory for writing...") + client.close() + client = None + + # If data < worker_memory: No action needed, proceed with write + + if data_size > available_memory: + # Data exceeds available system memory + raise MemoryError( + f"Data size ({data_size / 1024**3:.2f} GB) exceeds available system memory " + f"({available_memory / 1024**3:.2f} GB). " + f"Consider using write_parallel() for chunked writing." + ) + + # Log the memory status for user awareness + print(f"Data size: {data_size / 1024**3:.2f} GB, Available memory: {available_memory / 1024**3:.2f} GB") time_var = self.ds[self.cmor_name].coords["time"] units = time_var.attrs["units"] From cb5d14a9d202968e5c313ab90d94f8727a4c62f4 Mon Sep 17 00:00:00 2001 From: rhaegar325 Date: Wed, 3 Dec 2025 14:59:21 +1100 Subject: [PATCH 2/3] solve memory exceeded in write --- src/access_moppy/base.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/access_moppy/base.py b/src/access_moppy/base.py index 4d7c262..b2c2930 100644 --- a/src/access_moppy/base.py +++ b/src/access_moppy/base.py @@ -1,16 +1,13 @@ import warnings -import psutil from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Union import netCDF4 as nc +import psutil import xarray as xr -import netCDF4 as nc from cftime import num2date from dask.distributed import get_client -from pathlib import Path -from cftime import num2date from access_moppy.utilities import ( FrequencyMismatchError, @@ -279,7 +276,7 @@ def write(self): raise ValueError( f"Missing required CMIP6 global attributes for filename: {missing}" ) - + # ========== Memory Check ========== # This section estimates the data size and compares it against available memory # to prevent out-of-memory errors during the write operation. @@ -309,23 +306,25 @@ def estimate_data_size(ds, cmor_name): # from system memory. client = None - worker_memory = None # Memory limit of a single worker - total_cluster_memory = None # Sum of all workers' memory limits + worker_memory = None # Memory limit of a single worker + total_cluster_memory = None # Sum of all workers' memory limits try: # Attempt to get an existing Dask client client = get_client() - + # Retrieve information about all workers in the cluster worker_info = client.scheduler_info()["workers"] - + if worker_info: # Get the minimum memory_limit across all workers worker_memory = min(w["memory_limit"] for w in worker_info.values()) - + # Sum up all workers' memory for total cluster capacity - total_cluster_memory = sum(w["memory_limit"] for w in worker_info.values()) - + total_cluster_memory = sum( + w["memory_limit"] for w in worker_info.values() + ) + except ValueError: # No Dask client exists - we'll use local/system memory for writing pass @@ -336,7 +335,7 @@ def estimate_data_size(ds, cmor_name): if client is not None: # Dask client exists - check against cluster memory limits if data_size > worker_memory: - # WARNING: Data fits in total cluster memory but exceeds single worker capacity + # WARNING: Data fits in total cluster memory but exceeds single worker capacity print( f"Warning: Data size ({data_size / 1024**3:.2f} GB) exceeds single worker memory " f"({worker_memory / 1024**3:.2f} GB) but fits in total cluster memory " @@ -345,9 +344,9 @@ def estimate_data_size(ds, cmor_name): print("Closing Dask client to use local memory for writing...") client.close() client = None - + # If data < worker_memory: No action needed, proceed with write - + if data_size > available_memory: # Data exceeds available system memory raise MemoryError( @@ -357,7 +356,9 @@ def estimate_data_size(ds, cmor_name): ) # Log the memory status for user awareness - print(f"Data size: {data_size / 1024**3:.2f} GB, Available memory: {available_memory / 1024**3:.2f} GB") + print( + f"Data size: {data_size / 1024**3:.2f} GB, Available memory: {available_memory / 1024**3:.2f} GB" + ) time_var = self.ds[self.cmor_name].coords["time"] units = time_var.attrs["units"] From 575255b66f9355bac766d4902ab9f3585533cadc Mon Sep 17 00:00:00 2001 From: rhaegar325 Date: Wed, 3 Dec 2025 15:14:53 +1100 Subject: [PATCH 3/3] update pyproject.toml to solve CI issue --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index efe54fd..5a1e833 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "netCDF4", "cftime", "dask", + "distributed>=2024.0.0", "pyyaml", "tqdm", "requests",