Skip to content

Commit

Permalink
Merge pull request #56 from openclimatefix/feature/issue-31
Browse files Browse the repository at this point in the history
Feature/issue 31
  • Loading branch information
jcamier authored Feb 4, 2025
2 parents 53a29da + 4fd0f44 commit 07ee51b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 19 deletions.
56 changes: 37 additions & 19 deletions src/open_data_pvnet/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from open_data_pvnet.scripts.archive import handle_archive
from open_data_pvnet.utils.env_loader import load_environment_variables
from open_data_pvnet.utils.data_downloader import load_zarr_data
from open_data_pvnet.utils.data_downloader import load_zarr_data, load_zarr_data_for_day
from pathlib import Path
import concurrent.futures
from typing import List, Tuple
Expand Down Expand Up @@ -76,7 +76,12 @@ def _add_common_arguments(parser, provider_name):
"""Add arguments common to both archive and load operations."""
parser.add_argument("--year", type=int, required=True, help="Year of data")
parser.add_argument("--month", type=int, required=True, help="Month of data")
parser.add_argument("--day", type=int, required=True, help="Day of data")
parser.add_argument(
"--day",
type=int,
help="Day of data (optional - if not provided, loads entire month)",
default=None,
)

# Add Met Office specific arguments
if provider_name == "metoffice":
Expand Down Expand Up @@ -114,28 +119,41 @@ def parse_chunks(chunks_str):

def handle_load(provider: str, year: int, month: int, day: int, **kwargs):
"""Handle loading archived data."""
hour = kwargs.get("hour", 0) # Default to hour 0 if not specified
chunks = parse_chunks(kwargs.get("chunks"))
remote = kwargs.get("remote", False)
hour = kwargs.get("hour")

# Construct the archive path based on provider and parameters
# Format: data/2023/01/16/2023-01-16-00.zarr.zip
archive_path = (
Path("data")
/ str(year)
/ f"{month:02d}"
/ f"{day:02d}"
/ f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip"
)
# Base path for the data
base_path = Path("data") / str(year) / f"{month:02d}" / f"{day:02d}"

try:
dataset = load_zarr_data(
archive_path,
chunks=chunks,
remote=remote,
download=not remote, # Don't try to download if remote=True
)
logger.info(f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}")
if hour is not None:
# Load specific hour
archive_path = base_path / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip"
dataset = load_zarr_data(
archive_path,
chunks=chunks,
remote=remote,
download=not remote,
)
logger.info(
f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}"
)
else:
# Load all hours for the day
dataset = load_zarr_data_for_day(
base_path,
year,
month,
day,
chunks=chunks,
remote=remote,
download=not remote,
)
logger.info(
f"Successfully loaded all available datasets for {year}-{month:02d}-{day:02d}"
)

return dataset
except Exception as e:
logger.error(f"Error loading dataset: {e}")
Expand Down
75 changes: 75 additions & 0 deletions src/open_data_pvnet/utils/data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,78 @@ def load_zarr_data(
except Exception as e:
logger.error(f"Error loading zarr dataset: {e}")
raise


def load_zarr_data_for_day( # noqa: C901
base_path: Path, year: int, month: int, day: int, chunks=None, remote=False, download=True
):
"""Load and merge all hourly Zarr datasets for a given day."""
datasets = []
stores = [] # Keep track of stores to close them later

try:
for hour in range(24):
archive_path = base_path / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip"
try:
if remote:
dataset = _load_remote_zarr(
get_hf_url(archive_path),
chunks=chunks,
consolidated=False,
restructure=True,
)
else:
if not archive_path.exists() and download:
download_from_hf(str(archive_path), archive_path)

logger.info(f"Opening zarr store from {archive_path}")
logger.info(f"File size: {archive_path.stat().st_size / (1024*1024):.2f} MB")

store = zarr.storage.ZipStore(str(archive_path), mode="r")
stores.append(store) # Keep track of the store

zarr_groups = get_zarr_groups(store)
hour_datasets = []

for group in zarr_groups:
try:
group_ds = open_zarr_group(store, group, chunks, False)
hour_datasets.append(group_ds)
except Exception as e:
logger.warning(f"Could not open group {group}: {e}")
continue

if not hour_datasets:
raise ValueError("No valid datasets found in the Zarr store")

dataset = merge_datasets(hour_datasets)
dataset = restructure_dataset(dataset)

datasets.append(dataset)
logger.info(
f"Successfully loaded dataset for {year}-{month:02d}-{day:02d} hour {hour:02d}"
)

except Exception as e:
logger.warning(f"Could not load dataset for hour {hour}: {e}")
continue

if not datasets:
raise ValueError(f"No datasets could be loaded for {year}-{month:02d}-{day:02d}")

# Merge all datasets along the time dimension
merged_dataset = xr.concat(datasets, dim="time")
logger.info(f"Successfully merged {len(datasets)} hourly datasets")

# Load the merged dataset into memory before closing stores
merged_dataset = merged_dataset.compute()

return merged_dataset

finally:
# Close all stores in the finally block
for store in stores:
try:
store.close()
except Exception as e:
logger.warning(f"Error closing store: {e}")

0 comments on commit 07ee51b

Please sign in to comment.