From 947b6df430b654b0d208843149b6cc7421c109e5 Mon Sep 17 00:00:00 2001 From: jcamier Date: Mon, 27 Jan 2025 07:30:32 -0600 Subject: [PATCH 1/2] start working on loaiding day and month in xarray --- src/open_data_pvnet/main.py | 53 +++++++- src/open_data_pvnet/utils/data_downloader.py | 124 +++++++++++++++++++ 2 files changed, 171 insertions(+), 6 deletions(-) diff --git a/src/open_data_pvnet/main.py b/src/open_data_pvnet/main.py index d8271dc..1a06e42 100644 --- a/src/open_data_pvnet/main.py +++ b/src/open_data_pvnet/main.py @@ -2,7 +2,11 @@ import logging from open_data_pvnet.scripts.archive import handle_archive from open_data_pvnet.utils.env_loader import load_environment_variables -from open_data_pvnet.utils.data_downloader import load_zarr_data +from open_data_pvnet.utils.data_downloader import ( + load_zarr_data, + load_month_zarr_data, + load_day_zarr_data, +) from pathlib import Path import concurrent.futures from typing import List, Tuple @@ -76,7 +80,12 @@ def _add_common_arguments(parser, provider_name): """Add arguments common to both archive and load operations.""" parser.add_argument("--year", type=int, required=True, help="Year of data") parser.add_argument("--month", type=int, required=True, help="Month of data") - parser.add_argument("--day", type=int, required=True, help="Day of data") + parser.add_argument( + "--day", + type=int, + help="Day of data (optional - if not provided, loads entire month)", + default=None, + ) # Add Met Office specific arguments if provider_name == "metoffice": @@ -114,12 +123,44 @@ def parse_chunks(chunks_str): def handle_load(provider: str, year: int, month: int, day: int, **kwargs): """Handle loading archived data.""" - hour = kwargs.get("hour", 0) # Default to hour 0 if not specified + # If day is provided but hour is not, load the entire day + if day is not None and kwargs.get("hour") is None: + try: + dataset = load_day_zarr_data( + year=year, + month=month, + day=day, + region=kwargs.get("region", "uk"), + chunks=parse_chunks(kwargs.get("chunks")), + remote=kwargs.get("remote", False), + ) + logger.info(f"Successfully loaded dataset for {year}-{month:02d}-{day:02d}") + return dataset + except Exception as e: + logger.error(f"Error loading daily dataset: {e}") + raise + + # If day is None, load the entire month + if day is None: + try: + dataset = load_month_zarr_data( + year=year, + month=month, + region=kwargs.get("region", "uk"), + chunks=parse_chunks(kwargs.get("chunks")), + remote=kwargs.get("remote", False), + ) + logger.info(f"Successfully loaded dataset for {year}-{month:02d}") + return dataset + except Exception as e: + logger.error(f"Error loading monthly dataset: {e}") + raise + + # Single hour loading logic + hour = kwargs.get("hour", 0) chunks = parse_chunks(kwargs.get("chunks")) remote = kwargs.get("remote", False) - # Construct the archive path based on provider and parameters - # Format: data/2023/01/16/2023-01-16-00.zarr.zip archive_path = ( Path("data") / str(year) @@ -133,7 +174,7 @@ def handle_load(provider: str, year: int, month: int, day: int, **kwargs): archive_path, chunks=chunks, remote=remote, - download=not remote, # Don't try to download if remote=True + download=not remote, ) logger.info(f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}") return dataset diff --git a/src/open_data_pvnet/utils/data_downloader.py b/src/open_data_pvnet/utils/data_downloader.py index 0751af8..503641c 100644 --- a/src/open_data_pvnet/utils/data_downloader.py +++ b/src/open_data_pvnet/utils/data_downloader.py @@ -5,6 +5,7 @@ from typing import Union, Optional, List from huggingface_hub import hf_hub_download import fsspec +import calendar logger = logging.getLogger(__name__) @@ -188,3 +189,126 @@ def load_zarr_data( except Exception as e: logger.error(f"Error loading zarr dataset: {e}") raise + + +def load_day_zarr_data( + year: int, + month: int, + day: int, + region: str = "uk", + chunks: Optional[dict] = None, + remote: bool = False, +) -> xr.Dataset: + """ + Load and combine all hourly zarr datasets for a single day. + + Args: + year (int): The year to load data for + month (int): The month to load data for (1-12) + day (int): The day to load data for + region (str): Region of data ('uk' or 'global') + chunks (Optional[dict]): Dictionary specifying chunk sizes + remote (bool): Whether to load the data lazily from HuggingFace + + Returns: + xr.Dataset: Combined dataset for the entire day + """ + datasets = [] + + logger.info(f"Loading data for {year}-{month:02d}-{day:02d}") + + for hour in range(24): + # Construct the archive path + archive_path = ( + Path("data") + / str(year) + / f"{month:02d}" + / f"{day:02d}" + / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + ) + + try: + ds = load_zarr_data( + archive_path=archive_path, + chunks=chunks, + download=not remote, + remote=remote, + restructure=True, + ) + datasets.append(ds) + except Exception as e: + logger.warning(f"Could not load data for hour {hour:02d}: {e}") + continue + + if not datasets: + raise ValueError(f"No datasets could be loaded for {year}-{month:02d}-{day:02d}") + + # Concatenate all datasets along the time dimension + logger.info(f"Combining {len(datasets)} hourly datasets") + combined_dataset = xr.concat(datasets, dim="time") + logger.info(f"Successfully created combined dataset with shape: {dict(combined_dataset.dims)}") + + return combined_dataset + + +def load_month_zarr_data( + year: int, + month: int, + region: str = "uk", + chunks: Optional[dict] = None, + remote: bool = False, +) -> xr.Dataset: + """ + Load and combine all hourly zarr datasets for an entire month. + + Args: + year (int): The year to load data for + month (int): The month to load data for (1-12) + region (str): Region of data ('uk' or 'global') + chunks (Optional[dict]): Dictionary specifying chunk sizes + remote (bool): Whether to load the data lazily from HuggingFace + + Returns: + xr.Dataset: Combined dataset for the entire month + """ + # Get number of days in the month + num_days = calendar.monthrange(year, month)[1] + datasets = [] + + logger.info(f"Loading data for {year}-{month:02d} ({num_days} days)") + + for day in range(1, num_days + 1): + for hour in range(24): + # Construct the archive path + archive_path = ( + Path("data") + / str(year) + / f"{month:02d}" + / f"{day:02d}" + / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + ) + + try: + ds = load_zarr_data( + archive_path=archive_path, + chunks=chunks, + download=not remote, + remote=remote, + restructure=True, + ) + datasets.append(ds) + except Exception as e: + logger.warning( + f"Could not load data for {year}-{month:02d}-{day:02d} hour {hour:02d}: {e}" + ) + continue + + if not datasets: + raise ValueError(f"No datasets could be loaded for {year}-{month:02d}") + + # Concatenate all datasets along the time dimension + logger.info(f"Combining {len(datasets)} hourly datasets") + combined_dataset = xr.concat(datasets, dim="time") + logger.info(f"Successfully created combined dataset with shape: {dict(combined_dataset.dims)}") + + return combined_dataset From 4fd0f449268a4cc2e7b0f42f414d7bda7faefd99 Mon Sep 17 00:00:00 2001 From: jcamier Date: Tue, 4 Feb 2025 07:26:19 -0600 Subject: [PATCH 2/2] fix load for day --- src/open_data_pvnet/main.py | 85 ++++----- src/open_data_pvnet/utils/data_downloader.py | 175 +++++++------------ 2 files changed, 94 insertions(+), 166 deletions(-) diff --git a/src/open_data_pvnet/main.py b/src/open_data_pvnet/main.py index 1a06e42..e364773 100644 --- a/src/open_data_pvnet/main.py +++ b/src/open_data_pvnet/main.py @@ -2,11 +2,7 @@ import logging from open_data_pvnet.scripts.archive import handle_archive from open_data_pvnet.utils.env_loader import load_environment_variables -from open_data_pvnet.utils.data_downloader import ( - load_zarr_data, - load_month_zarr_data, - load_day_zarr_data, -) +from open_data_pvnet.utils.data_downloader import load_zarr_data, load_zarr_data_for_day from pathlib import Path import concurrent.futures from typing import List, Tuple @@ -123,60 +119,41 @@ def parse_chunks(chunks_str): def handle_load(provider: str, year: int, month: int, day: int, **kwargs): """Handle loading archived data.""" - # If day is provided but hour is not, load the entire day - if day is not None and kwargs.get("hour") is None: - try: - dataset = load_day_zarr_data( - year=year, - month=month, - day=day, - region=kwargs.get("region", "uk"), - chunks=parse_chunks(kwargs.get("chunks")), - remote=kwargs.get("remote", False), - ) - logger.info(f"Successfully loaded dataset for {year}-{month:02d}-{day:02d}") - return dataset - except Exception as e: - logger.error(f"Error loading daily dataset: {e}") - raise - - # If day is None, load the entire month - if day is None: - try: - dataset = load_month_zarr_data( - year=year, - month=month, - region=kwargs.get("region", "uk"), - chunks=parse_chunks(kwargs.get("chunks")), - remote=kwargs.get("remote", False), - ) - logger.info(f"Successfully loaded dataset for {year}-{month:02d}") - return dataset - except Exception as e: - logger.error(f"Error loading monthly dataset: {e}") - raise - - # Single hour loading logic - hour = kwargs.get("hour", 0) chunks = parse_chunks(kwargs.get("chunks")) remote = kwargs.get("remote", False) + hour = kwargs.get("hour") - archive_path = ( - Path("data") - / str(year) - / f"{month:02d}" - / f"{day:02d}" - / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" - ) + # Base path for the data + base_path = Path("data") / str(year) / f"{month:02d}" / f"{day:02d}" try: - dataset = load_zarr_data( - archive_path, - chunks=chunks, - remote=remote, - download=not remote, - ) - logger.info(f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}") + if hour is not None: + # Load specific hour + archive_path = base_path / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + dataset = load_zarr_data( + archive_path, + chunks=chunks, + remote=remote, + download=not remote, + ) + logger.info( + f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}" + ) + else: + # Load all hours for the day + dataset = load_zarr_data_for_day( + base_path, + year, + month, + day, + chunks=chunks, + remote=remote, + download=not remote, + ) + logger.info( + f"Successfully loaded all available datasets for {year}-{month:02d}-{day:02d}" + ) + return dataset except Exception as e: logger.error(f"Error loading dataset: {e}") diff --git a/src/open_data_pvnet/utils/data_downloader.py b/src/open_data_pvnet/utils/data_downloader.py index 503641c..2293fcd 100644 --- a/src/open_data_pvnet/utils/data_downloader.py +++ b/src/open_data_pvnet/utils/data_downloader.py @@ -5,7 +5,6 @@ from typing import Union, Optional, List from huggingface_hub import hf_hub_download import fsspec -import calendar logger = logging.getLogger(__name__) @@ -191,124 +190,76 @@ def load_zarr_data( raise -def load_day_zarr_data( - year: int, - month: int, - day: int, - region: str = "uk", - chunks: Optional[dict] = None, - remote: bool = False, -) -> xr.Dataset: - """ - Load and combine all hourly zarr datasets for a single day. - - Args: - year (int): The year to load data for - month (int): The month to load data for (1-12) - day (int): The day to load data for - region (str): Region of data ('uk' or 'global') - chunks (Optional[dict]): Dictionary specifying chunk sizes - remote (bool): Whether to load the data lazily from HuggingFace - - Returns: - xr.Dataset: Combined dataset for the entire day - """ +def load_zarr_data_for_day( # noqa: C901 + base_path: Path, year: int, month: int, day: int, chunks=None, remote=False, download=True +): + """Load and merge all hourly Zarr datasets for a given day.""" datasets = [] + stores = [] # Keep track of stores to close them later - logger.info(f"Loading data for {year}-{month:02d}-{day:02d}") - - for hour in range(24): - # Construct the archive path - archive_path = ( - Path("data") - / str(year) - / f"{month:02d}" - / f"{day:02d}" - / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" - ) - - try: - ds = load_zarr_data( - archive_path=archive_path, - chunks=chunks, - download=not remote, - remote=remote, - restructure=True, - ) - datasets.append(ds) - except Exception as e: - logger.warning(f"Could not load data for hour {hour:02d}: {e}") - continue - - if not datasets: - raise ValueError(f"No datasets could be loaded for {year}-{month:02d}-{day:02d}") - - # Concatenate all datasets along the time dimension - logger.info(f"Combining {len(datasets)} hourly datasets") - combined_dataset = xr.concat(datasets, dim="time") - logger.info(f"Successfully created combined dataset with shape: {dict(combined_dataset.dims)}") - - return combined_dataset - - -def load_month_zarr_data( - year: int, - month: int, - region: str = "uk", - chunks: Optional[dict] = None, - remote: bool = False, -) -> xr.Dataset: - """ - Load and combine all hourly zarr datasets for an entire month. - - Args: - year (int): The year to load data for - month (int): The month to load data for (1-12) - region (str): Region of data ('uk' or 'global') - chunks (Optional[dict]): Dictionary specifying chunk sizes - remote (bool): Whether to load the data lazily from HuggingFace - - Returns: - xr.Dataset: Combined dataset for the entire month - """ - # Get number of days in the month - num_days = calendar.monthrange(year, month)[1] - datasets = [] - - logger.info(f"Loading data for {year}-{month:02d} ({num_days} days)") - - for day in range(1, num_days + 1): + try: for hour in range(24): - # Construct the archive path - archive_path = ( - Path("data") - / str(year) - / f"{month:02d}" - / f"{day:02d}" - / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" - ) - + archive_path = base_path / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" try: - ds = load_zarr_data( - archive_path=archive_path, - chunks=chunks, - download=not remote, - remote=remote, - restructure=True, + if remote: + dataset = _load_remote_zarr( + get_hf_url(archive_path), + chunks=chunks, + consolidated=False, + restructure=True, + ) + else: + if not archive_path.exists() and download: + download_from_hf(str(archive_path), archive_path) + + logger.info(f"Opening zarr store from {archive_path}") + logger.info(f"File size: {archive_path.stat().st_size / (1024*1024):.2f} MB") + + store = zarr.storage.ZipStore(str(archive_path), mode="r") + stores.append(store) # Keep track of the store + + zarr_groups = get_zarr_groups(store) + hour_datasets = [] + + for group in zarr_groups: + try: + group_ds = open_zarr_group(store, group, chunks, False) + hour_datasets.append(group_ds) + except Exception as e: + logger.warning(f"Could not open group {group}: {e}") + continue + + if not hour_datasets: + raise ValueError("No valid datasets found in the Zarr store") + + dataset = merge_datasets(hour_datasets) + dataset = restructure_dataset(dataset) + + datasets.append(dataset) + logger.info( + f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}" ) - datasets.append(ds) + except Exception as e: - logger.warning( - f"Could not load data for {year}-{month:02d}-{day:02d} hour {hour:02d}: {e}" - ) + logger.warning(f"Could not load dataset for hour {hour}: {e}") continue - if not datasets: - raise ValueError(f"No datasets could be loaded for {year}-{month:02d}") + if not datasets: + raise ValueError(f"No datasets could be loaded for {year}-{month:02d}-{day:02d}") - # Concatenate all datasets along the time dimension - logger.info(f"Combining {len(datasets)} hourly datasets") - combined_dataset = xr.concat(datasets, dim="time") - logger.info(f"Successfully created combined dataset with shape: {dict(combined_dataset.dims)}") + # Merge all datasets along the time dimension + merged_dataset = xr.concat(datasets, dim="time") + logger.info(f"Successfully merged {len(datasets)} hourly datasets") - return combined_dataset + # Load the merged dataset into memory before closing stores + merged_dataset = merged_dataset.compute() + + return merged_dataset + + finally: + # Close all stores in the finally block + for store in stores: + try: + store.close() + except Exception as e: + logger.warning(f"Error closing store: {e}")