Skip to content

Commit

Permalink
Merge pull request #17 from MAIF/feature/use_tempfile_to_handle_grib_…
Browse files Browse the repository at this point in the history
…data

feat(grib): use `tempfile` to handle grib data
  • Loading branch information
GratienDSX authored Jan 10, 2025
2 parents 9efd4bd + a1cc958 commit 2327399
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 124 deletions.
140 changes: 56 additions & 84 deletions src/meteole/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import datetime as dt
import logging
import os
import re
import tempfile
from abc import ABC, abstractmethod
from functools import reduce
from pathlib import Path
from typing import Any
from warnings import warn

Expand Down Expand Up @@ -371,11 +370,42 @@ def _get_coverage_description(self, coverage_id: str) -> dict:
response = self._client.get(url, params=params)
return xmltodict.parse(response.text)

def _transform_grib_to_df(self) -> pd.DataFrame:
"Transform grib file into pandas dataframe"
def _grib_bytes_to_df(self, grib_str: bytes) -> pd.DataFrame:
"""
Converts GRIB data (in binary format) into a pandas DataFrame.
This method writes the binary GRIB data to a temporary file, reads it using
the `cfgrib` engine via xarray, and converts the resulting xarray Dataset
into a pandas DataFrame.
Args:
grib_str (bytes): Binary GRIB data as a byte string.
Returns:
pd.DataFrame: A pandas DataFrame containing the extracted GRIB data,
with columns like `time`, `latitude`, `longitude`, and any associated
variables from the GRIB file.
Raises:
ValueError: If the input `grib_str` is not of type `bytes` or `bytearray`.
Notes:
- The method requires the `cfgrib` engine to be installed.
- The temporary file used for parsing is automatically deleted after use.
- Ensure the input GRIB data is valid and encoded in a binary format.
"""

with tempfile.NamedTemporaryFile() as temp_file:
# Write the GRIB binary data to the temporary file
temp_file.write(grib_str)
temp_file.flush() # Ensure the data is written to disk

# Open the GRIB file as an xarray Dataset using the cfgrib engine
ds = xr.open_dataset(temp_file.name, engine="cfgrib")

# Convert the Dataset to a pandas DataFrame
df = ds.to_dataframe().reset_index()

ds = xr.open_dataset(self.filepath, engine="cfgrib")
df = ds.to_dataframe().reset_index()
return df

def _get_data_single_forecast(
Expand All @@ -401,7 +431,7 @@ def _get_data_single_forecast(
pd.DataFrame: The forecast for the specified time.
"""

filepath = self._get_coverage_file(
grib_binary: bytes = self._get_coverage_file(
coverage_id=coverage_id,
height=height,
pressure=pressure,
Expand All @@ -410,10 +440,9 @@ def _get_data_single_forecast(
long=long,
)

df = self._transform_grib_to_df()

self._remove_coverage_files(filepath)
df: pd.DataFrame = self._grib_bytes_to_df(grib_binary)

# Drop and rename columns
df.drop(columns=["surface", "valid_time"], errors="ignore", inplace=True)
df.rename(
columns={
Expand All @@ -422,7 +451,6 @@ def _get_data_single_forecast(
},
inplace=True,
)

known_columns = {"latitude", "longitude", "run", "forecast_horizon", "heightAboveGround", "isobaricInhPa"}
indicator_column = (set(df.columns) - known_columns).pop()

Expand All @@ -449,42 +477,6 @@ def _get_data_single_forecast(

return df

def _remove_coverage_files(self, filepath: Path) -> None:
"""
Removes a coverage file and its associated index files (.idx).
If the parent directory becomes empty after file removal, it deletes the parent directory.
Args:
filepath (Path): Path to the main coverage file to be removed.
Raises:
FileNotFoundError: If the specified file does not exist.
PermissionError: If the file or directory cannot be removed due to insufficient permissions.
"""
# Ensure filepath is a Path object
filepath = Path(filepath)

# remove file
os.remove(str(filepath))
# Remove the main file
if filepath.exists():
filepath.unlink()

# remove potential idx files
idx_files = filepath.parent.glob(f"{filepath.name}.*.idx")
for idx_file in idx_files:
os.remove(idx_file)

# Remove the parent directory if it's empty
parent_dir = filepath.parent
try:
if not any(parent_dir.iterdir()): # Check if the directory is empty
parent_dir.rmdir()
except OSError as e:
# Handle potential errors (e.g., directory in use or permissions issue)
raise PermissionError(f"Failed to remove directory '{parent_dir}': {e}") from e

def _get_coverage_file(
self,
coverage_id: str,
Expand All @@ -493,9 +485,7 @@ def _get_coverage_file(
forecast_horizon_in_seconds: int = 0,
lat: tuple = (37.5, 55.4),
long: tuple = (-12, 16),
file_format: str = "grib",
filepath: Path | None = None,
) -> Path:
) -> bytes:
"""
Retrieves raster data for a specified model prediction and saves it to a file.
Expand Down Expand Up @@ -528,41 +518,23 @@ def _get_coverage_file(
See Also:
raster.plot_tiff_file: Method for plotting raster data stored in TIFF format.
"""
self.filepath = filepath

file_extension = "tiff" if file_format == "tiff" else "grib"

filename = (
f"{height or '_'}m_{forecast_horizon_in_seconds}Z_{lat[0]}-{lat[1]}_{long[0]}-{long[1]}.{file_extension}"
)

if self.filepath is None:
current_working_directory = Path(os.getcwd())
self.filepath = current_working_directory / coverage_id / filename
self.folderpath = current_working_directory / coverage_id
logger.debug(f"{self.filepath}")
logger.debug("File not found in Cache, fetching data")
url = f"{self._model_base_path}/{self._entry_point}/GetCoverage"
params = {
"service": "WCS",
"version": "2.0.1",
"coverageid": coverage_id,
"format": "application/wmo-grib",
"subset": [
*([f"pressure({pressure})"] if pressure is not None else []),
*([f"height({height})"] if height is not None else []),
f"time({forecast_horizon_in_seconds})",
f"lat({lat[0]},{lat[1]})",
f"long({long[0]},{long[1]})",
],
}
response = self._client.get(url, params=params)

self.filepath.parent.mkdir(parents=True, exist_ok=True)
with open(self.filepath, "wb") as f:
f.write(response.content)
url = f"{self._model_base_path}/{self._entry_point}/GetCoverage"
params = {
"service": "WCS",
"version": "2.0.1",
"coverageid": coverage_id,
"format": "application/wmo-grib",
"subset": [
*([f"pressure({pressure})"] if pressure is not None else []),
*([f"height({height})"] if height is not None else []),
f"time({forecast_horizon_in_seconds})",
f"lat({lat[0]},{lat[1]})",
f"long({long[0]},{long[1]})",
],
}
response = self._client.get(url, params=params)

return self.filepath
return response.content

@staticmethod
def _get_available_feature(grid_axis, feature_name):
Expand Down
47 changes: 7 additions & 40 deletions tests/test_forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,43 +156,10 @@ def test_get_coverage_description(self, mock_get_request, mock_get_capabilities)
self.assertIn("wcs:CoverageDescriptions", description)

@patch("meteole._arome.AromeForecast.get_capabilities")
@patch("meteole.clients.MeteoFranceClient.get")
def test_get_coverage_file(self, mock_get_request, mock_get_capabilities):
mock_response = MagicMock()
mock_response.content = b"fake_data"
mock_get_request.return_value = mock_response
mock_get_capabilities.return_value = None

forecast = AromeForecast(
self.client,
precision=self.precision,
territory=self.territory,
)

coverage_id = "coverage_1"
path = forecast._get_coverage_file(
coverage_id=coverage_id,
height=2,
forecast_horizon_in_seconds=0,
lat=(37.5, 55.4),
long=(-12, 16),
)

expected_path = Path(os.getcwd()) / coverage_id / "2m_0Z_37.5-55.4_-12-16.grib"
self.assertTrue(expected_path.exists())
self.assertTrue(expected_path == path)

# remove the folder created in _get_coverage_file
forecast._remove_coverage_files(path)

@patch("meteole._arome.AromeForecast.get_capabilities")
@patch("meteole._arome.AromeForecast._transform_grib_to_df")
@patch("meteole._arome.AromeForecast._grib_bytes_to_df")
@patch("meteole._arome.AromeForecast._get_coverage_file")
@patch("meteole._arome.AromeForecast._remove_coverage_files")
def test_get_data_single_forecast(
self, mock_remove_coverage_files, mock_get_coverage_file, mock_transform_grib_to_df, mock_get_capabilities
):
mock_transform_grib_to_df.return_value = pd.DataFrame({"data": [1, 2, 3]})
def test_get_data_single_forecast(self, mock_get_coverage_file, mock_grib_bytes_to_df, mock_get_capabilities):
mock_grib_bytes_to_df.return_value = pd.DataFrame({"data": [1, 2, 3]})

forecast = AromeForecast(
self.client,
Expand All @@ -212,13 +179,13 @@ def test_get_data_single_forecast(
self.assertTrue("data" in df.columns)

@patch("meteole._arome.AromeForecast.get_capabilities")
@patch("meteole._arome.AromeForecast._transform_grib_to_df")
@patch("meteole._arome.AromeForecast._grib_bytes_to_df")
@patch("meteole._arome.AromeForecast._get_coverage_file")
@patch("meteole._arome.AromeForecast._remove_coverage_files")
def test_get_data_single_forecast_with_height(
self, mock_remove_coverage_files, mock_get_coverage_file, mock_transform_grib_to_df, mock_get_capabilities
self, mock_get_coverage_file, mock_grib_bytes_to_df, mock_get_capabilities
):
mock_transform_grib_to_df.return_value = pd.DataFrame({"data": [1, 2, 3], "heightAboveGround": ["2", "2", "2"]})
mock_get_coverage_file.return_value = ""
mock_grib_bytes_to_df.return_value = pd.DataFrame({"data": [1, 2, 3], "heightAboveGround": ["2", "2", "2"]})

forecast = AromeForecast(
self.client,
Expand Down

0 comments on commit 2327399

Please sign in to comment.