diff --git a/src/open_data_pvnet/main.py b/src/open_data_pvnet/main.py index e364773..565845e 100644 --- a/src/open_data_pvnet/main.py +++ b/src/open_data_pvnet/main.py @@ -1,11 +1,19 @@ import argparse import logging -from open_data_pvnet.scripts.archive import handle_archive from open_data_pvnet.utils.env_loader import load_environment_variables -from open_data_pvnet.utils.data_downloader import load_zarr_data, load_zarr_data_for_day +from open_data_pvnet.utils.data_downloader import ( + load_zarr_data, + load_zarr_data_for_day, + merge_hours_to_day, + process_month_by_days, + merge_days_to_month, +) from pathlib import Path import concurrent.futures from typing import List, Tuple +from open_data_pvnet.utils.data_uploader import upload_monthly_zarr, upload_to_huggingface +from open_data_pvnet.scripts.archive import handle_archive +from open_data_pvnet.nwp.met_office import CONFIG_PATHS logger = logging.getLogger(__name__) @@ -32,46 +40,6 @@ def load_env_and_setup_logger(): raise -def add_provider_parser(subparsers, provider_name): - """Add a subparser for a specific data provider.""" - provider_parser = subparsers.add_parser( - provider_name, help=f"Commands for {provider_name.capitalize()} data" - ) - operation_subparsers = provider_parser.add_subparsers( - dest="operation", help="Operation to perform" - ) - - # Archive operation parser - archive_parser = operation_subparsers.add_parser("archive", help="Archive data to Hugging Face") - _add_common_arguments(archive_parser, provider_name) - archive_parser.add_argument( - "--archive-type", - choices=["zarr.zip", "tar"], - default="zarr.zip", - help="Type of archive to create (default: zarr.zip)", - ) - archive_parser.add_argument( - "--workers", - type=int, - default=1, - help="Number of concurrent workers for parallel processing (default: 1)", - ) - - # Load operation parser - load_parser = operation_subparsers.add_parser("load", help="Load archived data") - _add_common_arguments(load_parser, provider_name) - load_parser.add_argument( - "--chunks", - type=str, - help="Chunking specification in format 'dim1:size1,dim2:size2' (e.g., 'time:24,latitude:100')", - ) - load_parser.add_argument( - "--remote", - action="store_true", - help="Load data lazily from HuggingFace without downloading", - ) - - def _add_common_arguments(parser, provider_name): """Add arguments common to both archive and load operations.""" parser.add_argument("--year", type=int, required=True, help="Year of data") @@ -79,7 +47,7 @@ def _add_common_arguments(parser, provider_name): parser.add_argument( "--day", type=int, - help="Day of data (optional - if not provided, loads entire month)", + help="Day of data (optional - if not provided, processes entire month)", default=None, ) @@ -94,7 +62,7 @@ def _add_common_arguments(parser, provider_name): parser.add_argument( "--region", choices=["global", "uk"], - default=DEFAULT_REGION, + default="global", help="Specify the Met Office dataset region (default: global)", ) @@ -161,25 +129,65 @@ def handle_load(provider: str, year: int, month: int, day: int, **kwargs): def configure_parser(): - """Configure the main argument parser for the CLI tool. + """Configure the main argument parser for the CLI tool.""" + parser = argparse.ArgumentParser(prog="open-data-pvnet", description="Open Data PVNet CLI") - Creates the main parser and adds subparsers for each supported data provider - (metoffice, gfs, dwd). Each provider subparser includes options for year, - month, day, hour, and operation type. - - Returns: - argparse.ArgumentParser: The configured argument parser - """ - parser = argparse.ArgumentParser(prog="open-data-pvnet", description="Open Data PVNet CLI Tool") + # Create a parent parser for the --list argument parser.add_argument( "--list", choices=["providers"], help="List available options (e.g., providers)", - action="store", + nargs="?", # Make it optional ) - subparsers = parser.add_subparsers(dest="command", help="Available commands") - for provider in PROVIDERS: - add_provider_parser(subparsers, provider) + + # Create subparsers for commands (providers) + subparsers = parser.add_subparsers(dest="command", help="Data provider") + + # Add provider-specific parsers + for provider in ["metoffice", "gfs"]: + provider_parser = subparsers.add_parser( + provider, help=f"Commands for {provider.capitalize()} data" + ) + operation_subparsers = provider_parser.add_subparsers( + dest="operation", help="Operation to perform" + ) + + # Archive operation parser + archive_parser = operation_subparsers.add_parser( + "archive", help="Archive data to Hugging Face" + ) + _add_common_arguments(archive_parser, provider) + archive_parser.add_argument( + "--archive-type", + choices=["zarr.zip", "tar"], + default="zarr.zip", + help="Type of archive to create (default: zarr.zip)", + ) + archive_parser.add_argument( + "--workers", + type=int, + default=1, + help="Number of concurrent workers for parallel processing (default: 1)", + ) + + # Load operation parser + load_parser = operation_subparsers.add_parser("load", help="Load archived data") + _add_common_arguments(load_parser, provider) + load_parser.add_argument( + "--chunks", + type=str, + help="Chunking specification in format 'dim1:size1,dim2:size2' (e.g., 'time:24,latitude:100')", + ) + load_parser.add_argument( + "--remote", + action="store_true", + help="Load data lazily from HuggingFace without downloading", + ) + + # Consolidate operation parser + consolidate_parser = operation_subparsers.add_parser("consolidate", help="Consolidate data") + _add_common_arguments(consolidate_parser, provider) + return parser @@ -205,7 +213,7 @@ def archive_hours_chunk( """Archive a chunk of hours.""" start_hour, end_hour = hour_range for hour in range(start_hour, end_hour + 1): - handle_archive( + archive_to_hf( provider=provider, year=year, month=month, @@ -253,6 +261,115 @@ def parallel_archive( raise future.exception() +def handle_monthly_consolidation(provider: str, year: int, month: int, **kwargs): + """Handle consolidating data into zarr.zip files.""" + chunks = parse_chunks(kwargs.get("chunks")) + base_path = Path("data") + day = kwargs.get("day") + + try: + if day is not None: + # Consolidate a single day + logger.info(f"Consolidating day {year}-{month:02d}-{day:02d}") + daily_file = merge_hours_to_day(base_path, year, month, day, chunks) + logger.info(f"Successfully consolidated day to {daily_file}") + return + + # First ensure all days are processed + logger.info(f"Processing all days in month {year}-{month:02d}") + successful_files = process_month_by_days(base_path, year, month, chunks) + + if successful_files: + logger.info("\nSuccessfully created daily files:") + for file in successful_files: + logger.info(f"- {file}") + + # Now create the monthly file + logger.info("\nCreating monthly consolidated file") + monthly_file = merge_days_to_month(base_path, year, month, chunks) + logger.info(f"Successfully created monthly file: {monthly_file}") + else: + logger.warning("No daily files were created, cannot create monthly file") + + except Exception as e: + logger.error(f"Error in consolidation: {e}") + raise + + +def handle_upload(provider: str, year: int, month: int, day: int = None, **kwargs): + """Handle uploading data to Hugging Face.""" + config_path = Path("config.yaml") + overwrite = kwargs.get("overwrite", False) + upload_type = kwargs.get("type", "hourly") # New parameter to specify upload type + + try: + if upload_type == "monthly": + # Upload monthly consolidated file + logger.info(f"Uploading monthly consolidated file for {year}-{month:02d}") + upload_monthly_zarr( + config_path=config_path, year=year, month=month, overwrite=overwrite + ) + else: + # Original hourly upload functionality + logger.info(f"Uploading hourly data for {year}-{month:02d}-{day:02d}") + upload_to_huggingface( + config_path=config_path, + folder_name=f"{year}-{month:02d}-{day:02d}", + year=year, + month=month, + day=day, + overwrite=overwrite, + ) + + except Exception as e: + logger.error(f"Error in upload: {e}") + raise + + +def archive_to_hf(provider: str, year: int, month: int, day: int = None, **kwargs): + """Handle archiving data to Hugging Face.""" + overwrite = kwargs.get("overwrite", False) + archive_type = kwargs.get("archive_type", "zarr.zip") + + try: + if day is None: + # Archive monthly consolidated file + logger.info(f"Archiving monthly consolidated file for {year}-{month:02d}") + region = kwargs.get("region", "global") + + if provider == "metoffice": + if region not in CONFIG_PATHS: + raise ValueError(f"Invalid region '{region}'. Must be 'uk' or 'global'.") + config_path = CONFIG_PATHS[region] + else: + raise NotImplementedError( + f"Monthly archive for provider {provider} not yet implemented" + ) + + upload_monthly_zarr( + config_path=config_path, + year=year, + month=month, + overwrite=overwrite, + ) + else: + # Use provider-specific archive processing for daily data + handle_archive( + provider=provider, + year=year, + month=month, + day=day, + hour=kwargs.get("hour"), + region=kwargs.get("region", "global"), + overwrite=overwrite, + archive_type=archive_type, + ) + + except Exception as e: + logger.error(f"Error in archive: {e}") + raise + + def main(): """Entry point for the Open Data PVNet CLI tool. @@ -266,13 +383,22 @@ def main(): open-data-pvnet metoffice archive --year 2023 --month 12 --day 1 --region uk -o --workers 4 # Archive global region data for a specific hour - open-data-pvnet metoffice archive --year 2023 --month 12 --day 1 --hour 12 --region global -o + open-data-pvnet metoffice archive --year 2023 --month 12 --day 1 --hour 12 --region uk -o # Archive as tar instead of zarr.zip open-data-pvnet metoffice archive --year 2023 --month 12 --day 1 --hour 12 --region uk -o --archive-type tar + # Archive monthly data, this requires a consolidation first + open-data-pvnet metoffice archive --year 2023 --month 12 + + # Consolidate monthly data + open-data-pvnet metoffice consolidate --year 2023 --month 12 + + # Consolidate specific day + open-data-pvnet metoffice consolidate --year 2023 --month 12 --day 1 + GFS Data: - Not implemented yet + Partially implemented DWD Data: Not implemented yet @@ -290,54 +416,64 @@ def main(): List Available Providers: open-data-pvnet --list providers """ - load_env_and_setup_logger() parser = configure_parser() args = parser.parse_args() + # Handle the --list providers case first if args.list == "providers": print("Available providers:") for provider in PROVIDERS: - print(f"- {provider}") - return - - if not args.command: - parser.print_help() - return - - kwargs = { - "provider": args.command, - "year": args.year, - "month": args.month, - "day": args.day, - "hour": getattr(args, "hour", None), - "overwrite": args.overwrite, - "remote": getattr(args, "remote", False), - } - - # Only add region for Met Office commands - if args.command == "metoffice": - kwargs["region"] = args.region - - if args.operation == "archive": - # If specific hour is provided, use regular archiving - if kwargs["hour"] is not None: - archive_kwargs = {k: v for k, v in kwargs.items() if k != "remote"} - archive_kwargs["archive_type"] = args.archive_type - handle_archive(**archive_kwargs) - else: - # Use parallel archiving for full day - parallel_archive( - provider=kwargs["provider"], - year=kwargs["year"], - month=kwargs["month"], - day=kwargs["day"], - region=kwargs["region"], - overwrite=kwargs["overwrite"], - archive_type=args.archive_type, - max_workers=args.workers, - ) - elif args.operation == "load": - kwargs["chunks"] = getattr(args, "chunks", None) - handle_load(**kwargs) - else: + if provider == "gfs": + print(f"- {provider} (partially implemented)") + elif provider == "dwd": + print(f"- {provider} (not implemented)") + else: + print(f"- {provider}") + return 0 + + # For all other commands, we need a provider and operation + if not args.command or not args.operation: parser.print_help() + return 1 + + # Load environment variables + load_env_and_setup_logger() + + # Execute the requested operation + if args.operation == "load": + load_kwargs = { + "provider": args.command, + "year": args.year, + "month": args.month, + "day": args.day, + "hour": args.hour, + "region": args.region, + "overwrite": args.overwrite, + "chunks": args.chunks, + "remote": args.remote, + } + handle_load(**load_kwargs) + elif args.operation == "consolidate": + consolidate_kwargs = { + "provider": args.command, + "year": args.year, + "month": args.month, + "day": args.day, + "region": getattr(args, "region", None), + "overwrite": args.overwrite, + } + handle_monthly_consolidation(**consolidate_kwargs) + elif args.operation == "archive": + archive_kwargs = { + "provider": args.command, + "year": args.year, + "month": args.month, + "day": args.day, + "hour": getattr(args, "hour", None), + "region": getattr(args, "region", None), + "overwrite": args.overwrite, + "archive_type": getattr(args, "archive_type", "zarr.zip"), + } + archive_to_hf(**archive_kwargs) + + return 0 diff --git a/src/open_data_pvnet/utils/data_downloader.py b/src/open_data_pvnet/utils/data_downloader.py index 2293fcd..22d57d5 100644 --- a/src/open_data_pvnet/utils/data_downloader.py +++ b/src/open_data_pvnet/utils/data_downloader.py @@ -5,6 +5,7 @@ from typing import Union, Optional, List from huggingface_hub import hf_hub_download import fsspec +import shutil logger = logging.getLogger(__name__) @@ -191,49 +192,60 @@ def load_zarr_data( def load_zarr_data_for_day( # noqa: C901 - base_path: Path, year: int, month: int, day: int, chunks=None, remote=False, download=True -): - """Load and merge all hourly Zarr datasets for a given day.""" + base_path: Path, + year: int, + month: int, + day: int, + chunks: Optional[dict] = None, + remote: bool = False, + download: bool = True, +) -> xr.Dataset: + """Load all hourly Zarr datasets for a given day.""" datasets = [] stores = [] # Keep track of stores to close them later try: for hour in range(24): - archive_path = base_path / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + # Construct paths to match HuggingFace structure + repo_path = f"data/{year}/{month:02d}/{day:02d}/{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + local_path = ( + base_path + / str(year) + / f"{month:02d}" + / f"{day:02d}" + / f"{year}-{month:02d}-{day:02d}-{hour:02d}.zarr.zip" + ) + + # Log the exact paths we're using + logger.info(f"Local path: {local_path}") + logger.info(f"Repo path: {repo_path}") + try: - if remote: - dataset = _load_remote_zarr( - get_hf_url(archive_path), - chunks=chunks, - consolidated=False, - restructure=True, - ) - else: - if not archive_path.exists() and download: - download_from_hf(str(archive_path), archive_path) - - logger.info(f"Opening zarr store from {archive_path}") - logger.info(f"File size: {archive_path.stat().st_size / (1024*1024):.2f} MB") - - store = zarr.storage.ZipStore(str(archive_path), mode="r") - stores.append(store) # Keep track of the store - - zarr_groups = get_zarr_groups(store) - hour_datasets = [] - - for group in zarr_groups: - try: - group_ds = open_zarr_group(store, group, chunks, False) - hour_datasets.append(group_ds) - except Exception as e: - logger.warning(f"Could not open group {group}: {e}") - continue - - if not hour_datasets: - raise ValueError("No valid datasets found in the Zarr store") - - dataset = merge_datasets(hour_datasets) - dataset = restructure_dataset(dataset) + if not local_path.exists() and download: + download_from_hf(repo_path, local_path) + + logger.info(f"Opening zarr store from {local_path}") + logger.info(f"File size: {local_path.stat().st_size / (1024*1024):.2f} MB") + + store = zarr.storage.ZipStore(str(local_path), mode="r") + stores.append(store) # Keep track of the store + + zarr_groups = get_zarr_groups(store) + hour_datasets = [] + + for group in zarr_groups: + try: + group_ds = open_zarr_group(store, group, chunks, False) + hour_datasets.append(group_ds) + except Exception as e: + logger.warning(f"Could not open group {group}: {e}") + continue + + if not hour_datasets: + raise ValueError("No valid datasets found in the Zarr store") + + dataset = merge_datasets(hour_datasets) + dataset = restructure_dataset(dataset) datasets.append(dataset) logger.info( @@ -251,9 +263,6 @@ def load_zarr_data_for_day( # noqa: C901 merged_dataset = xr.concat(datasets, dim="time") logger.info(f"Successfully merged {len(datasets)} hourly datasets") - # Load the merged dataset into memory before closing stores - merged_dataset = merged_dataset.compute() - return merged_dataset finally: @@ -263,3 +272,228 @@ def load_zarr_data_for_day( # noqa: C901 store.close() except Exception as e: logger.warning(f"Error closing store: {e}") + + +def save_consolidated_zarr(dataset: xr.Dataset, output_path: Union[str, Path]) -> None: + """Save a consolidated dataset to zarr format. + + Args: + dataset: The xarray Dataset to save + output_path: Path where to save the zarr archive + """ + logger.info(f"Saving consolidated dataset to {output_path}") + try: + # Get the first variable name from the dataset + first_var = list(dataset.data_vars)[0] + + # Check if the dataset has chunks before creating encoding + encoding = None + if dataset[first_var].chunks is not None: + chunks = tuple(x[0] for x in dataset[first_var].chunks) + encoding = {var: {"chunks": chunks} for var in dataset.data_vars} + + # Ensure parent directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Save to temporary zarr directory first + temp_dir = output_path.parent / f"{output_path.stem}_temp" + if temp_dir.exists(): + shutil.rmtree(temp_dir) + temp_dir.mkdir(parents=True, exist_ok=True) + + # Save to temporary zarr directory + logger.info("Writing to temporary directory...") + dataset.to_zarr(temp_dir, mode="w", encoding=encoding, compute=True, consolidated=True) + + # Create zip archive + logger.info("Creating zip archive...") + zip_store = zarr.storage.ZipStore(str(output_path), mode="w") + temp_store = zarr.DirectoryStore(str(temp_dir)) + + try: + zarr.copy_store(temp_store, zip_store) + finally: + zip_store.close() + + logger.info(f"Successfully saved consolidated file to {output_path}") + logger.info(f"Final file size: {output_path.stat().st_size / (1024*1024):.2f} MB") + + finally: + # Cleanup temporary directory + if temp_dir.exists(): + shutil.rmtree(temp_dir) + + +def merge_hours_to_day( + base_path: Path, + year: int, + month: int, + day: int, + chunks: Optional[dict] = None, +) -> Path: + """Merge 24 hourly files into a single daily zarr.zip file.""" + logger.info(f"\nMerging hours for {year}-{month:02d}-{day:02d}") + + # Define paths + day_str = f"{day:02d}" + month_str = f"{month:02d}" + daily_dir = base_path / str(year) / month_str / day_str + daily_output = daily_dir / "daily" / f"{year}-{month_str}-{day_str}.zarr.zip" + + if daily_output.exists(): + logger.info(f"Daily file already exists: {daily_output}") + return daily_output + + logger.info(f"Loading hourly data from {daily_dir}") + daily_dataset = load_zarr_data_for_day( + base_path, + year, + month, + day, + chunks=chunks, + remote=False, + download=True, + ) + + logger.info(f"Creating daily directory: {daily_output.parent}") + daily_output.parent.mkdir(parents=True, exist_ok=True) + + logger.info(f"Saving daily dataset to: {daily_output}") + save_consolidated_zarr(daily_dataset, daily_output) + logger.info(f"Successfully created daily file: {daily_output}") + + return daily_output + + +def test_consolidated_zarr(output_path: Path) -> None: + """Test that the consolidated zarr.zip file was created correctly. + + Args: + output_path (Path): Path to the consolidated zarr.zip file + """ + logger.info(f"\nTesting consolidated file at {output_path}") + test_ds = xr.open_zarr(output_path) + logger.info("\nConsolidated dataset info:") + logger.info(f"Dimensions: {dict(test_ds.dims)}") + logger.info(f"Variables: {list(test_ds.variables)}") + logger.info(f"Time range: {test_ds.time.values.min()} to {test_ds.time.values.max()}") + logger.info(f"Number of time points: {len(test_ds.time)}") + test_ds.close() + + +def process_month_by_days( + base_path: Path, + year: int, + month: int, + chunks: Optional[dict] = None, +) -> List[Path]: + """Process all days in a month, creating daily consolidated files. + + Returns: + List[Path]: List of paths to successfully created daily files + """ + import calendar + + # Get number of days in the month + _, num_days = calendar.monthrange(year, month) + successful_files = [] + + logger.info(f"\nProcessing all days in {year}-{month:02d}") + + for day in range(1, num_days + 1): + try: + logger.info(f"\nProcessing day {year}-{month:02d}-{day:02d}") + daily_file = merge_hours_to_day(base_path, year, month, day, chunks) + + if daily_file.exists(): + successful_files.append(daily_file) + logger.info(f"Successfully processed day {day}") + else: + logger.warning(f"Failed to create daily file for day {day}") + + except Exception as e: + logger.error(f"Error processing day {day}: {e}") + continue + + logger.info(f"\nProcessed {len(successful_files)} days successfully out of {num_days} days") + return successful_files + + +def merge_days_to_month( + base_path: Path, + year: int, + month: int, + chunks: Optional[dict] = None, +) -> Path: + """Merge all daily zarr.zip files in a month into a single monthly zarr.zip file.""" + logger.info(f"\nMerging daily files for {year}-{month:02d}") + + # Define paths + month_str = f"{month:02d}" + month_dir = base_path / str(year) / month_str + monthly_output = month_dir / "monthly" / f"{year}-{month_str}.zarr.zip" + + if monthly_output.exists(): + logger.info(f"Monthly file already exists: {monthly_output}") + return monthly_output + + # Collect all daily files + daily_datasets = [] + daily_dir_pattern = month_dir / "*" / "daily" / f"{year}-{month_str}-*.zarr.zip" + daily_files = sorted(Path(month_dir).glob(f"*/daily/{year}-{month_str}-*.zarr.zip")) + + if not daily_files: + raise ValueError(f"No daily files found matching pattern: {daily_dir_pattern}") + + logger.info(f"Found {len(daily_files)} daily files") + + # Load and concatenate all daily datasets + for daily_file in daily_files: + try: + logger.info(f"Loading {daily_file}") + store = zarr.storage.ZipStore(str(daily_file), mode="r") + ds = xr.open_zarr(store, consolidated=True) + daily_datasets.append(ds) + logger.info(f"Successfully loaded {daily_file}") + except Exception as e: + logger.error(f"Error loading {daily_file}: {e}") + store.close() + continue + + if not daily_datasets: + raise ValueError("No datasets could be loaded") + + # Concatenate along time dimension + logger.info("Concatenating datasets...") + monthly_dataset = xr.concat(daily_datasets, dim="time") + logger.info( + f"Combined dataset spans: {monthly_dataset.time.values.min()} to {monthly_dataset.time.values.max()}" + ) + + # Define new chunks for the monthly dataset + if chunks is None: + n_times = len(monthly_dataset.time) + chunks = { + "time": min(24, n_times), # One day at a time, or less for partial months + "projection_y_coordinate": 243, + "projection_x_coordinate": 261, + } + + # Rechunk the dataset with explicit chunks + logger.info(f"Rechunking dataset with chunks: {chunks}") + monthly_dataset = monthly_dataset.chunk(chunks) + + # Create monthly directory + logger.info(f"Creating monthly directory: {monthly_output.parent}") + monthly_output.parent.mkdir(parents=True, exist_ok=True) + + # Save consolidated monthly file + logger.info(f"Saving monthly dataset to: {monthly_output}") + save_consolidated_zarr(monthly_dataset, monthly_output) + logger.info(f"Successfully created monthly file: {monthly_output}") + + # Close all datasets + for ds in daily_datasets: + ds.close() + + return monthly_output diff --git a/src/open_data_pvnet/utils/data_uploader.py b/src/open_data_pvnet/utils/data_uploader.py index 1879215..ebc1bdf 100644 --- a/src/open_data_pvnet/utils/data_uploader.py +++ b/src/open_data_pvnet/utils/data_uploader.py @@ -249,3 +249,65 @@ def upload_to_huggingface( except Exception as e: logger.error(f"Error uploading to Hugging Face: {e}") raise + + +def upload_monthly_zarr( + config_path: Path, + year: int, + month: int, + overwrite: bool = False, +) -> None: + """ + Upload a monthly consolidated zarr.zip file to the Hugging Face dataset repository. + """ + repo_id = "openclimatefix/met-office-uk-deterministic-solar" + + try: + # Validate token and get API instance (same as daily uploads) + hf_api, hf_token = _validate_token() + _ensure_repository(hf_api, repo_id, hf_token) + + # Construct paths + month_str = f"{month:02d}" + monthly_file = f"{year}-{month_str}.zarr.zip" + local_path = Path("data") / str(year) / month_str / "monthly" / monthly_file + + if not local_path.exists(): + raise FileNotFoundError(f"Monthly consolidated file not found: {local_path}") + + # Create the path structure: data/year/month/monthly/file.zarr.zip + target_path = f"data/{year}/{month_str}/monthly/{monthly_file}" + logger.info(f"Uploading monthly archive {local_path} to {repo_id}:{target_path}") + + try: + if overwrite: + try: + # Delete the file if it exists and overwrite is True + hf_api.delete_file( + path_in_repo=target_path, + repo_id=repo_id, + repo_type="dataset", + token=hf_token, + ) + logger.info(f"Deleted existing file {target_path} from repository") + except Exception as e: + logger.debug( + f"File {target_path} not found in repository or couldn't be deleted: {e}" + ) + + # Upload the new file + hf_api.upload_file( + path_or_fileobj=str(local_path), + path_in_repo=target_path, + repo_id=repo_id, + repo_type="dataset", + token=hf_token, + ) + logger.info(f"Upload completed for {local_path} to {repo_id}:{target_path}") + + except Exception as e: + raise RuntimeError(f"Failed to upload monthly archive: {e}") + + except Exception as e: + logger.error(f"Error uploading monthly archive to Hugging Face: {e}") + raise