diff --git a/src/rashdf/base.py b/src/rashdf/base.py index 4b1aaab..6a3544a 100644 --- a/src/rashdf/base.py +++ b/src/rashdf/base.py @@ -1,4 +1,6 @@ import h5py +from .utils import hdf5_attrs_to_dict +from typing import Dict class RasHdf(h5py.File): @@ -6,7 +8,7 @@ class RasHdf(h5py.File): def __init__(self, name: str, **kwargs): """Open a HEC-RAS HDF file. - + Parameters ---------- name : str @@ -14,10 +16,10 @@ def __init__(self, name: str, **kwargs): kwargs : dict Additional keyword arguments to pass to h5py.File """ - super().__init__(name, mode="r", **kwargs) + super().__init__(name, **kwargs) @classmethod - def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) -> 'RasHdf': + def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) -> "RasHdf": """Open a HEC-RAS HDF file from a URI. Parameters @@ -41,5 +43,34 @@ def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) -> >>> results_hdf = RasHdf.open_uri("s3://my-bucket/results.hdf") """ import fsspec + remote_file = fsspec.open(uri, mode="rb", **fsspec_kwargs) - return cls(remote_file.open(), **h5py_kwargs) \ No newline at end of file + return cls(remote_file.open(), **h5py_kwargs) + + def get_attrs(self, attr_path: str) -> Dict: + """Convert attributes from a HEC-RAS HDF file into a Python dictionary for a given attribute path. + + Parameters + ---------- + attr_path (str): The path within the HEC-RAS HDF file where the desired attributes are located (Ex. "Plan Data/Plan Parameters"). + + Returns + ------- + plan_attrs (dict): Dictionary filled with attributes at given path, if attributes exist at that path. + """ + attr_object = self.get(attr_path) + + if attr_object: + return hdf5_attrs_to_dict(attr_object.attrs) + + return {} + + def get_root_attrs(self): + """Returns attributes at root level of HEC-RAS HDF file. + + Returns + ------- + dict + Dictionary filled with HEC-RAS HDF root attributes. + """ + return self.get_attrs("/") diff --git a/src/rashdf/geom.py b/src/rashdf/geom.py index 5209655..b2ea5a7 100644 --- a/src/rashdf/geom.py +++ b/src/rashdf/geom.py @@ -1,5 +1,5 @@ from .base import RasHdf -from .utils import convert_ras_hdf_string +from .utils import convert_ras_hdf_string, get_first_hdf_group, hdf5_attrs_to_dict import numpy as np import pandas as pd @@ -12,10 +12,16 @@ class RasGeomHdf(RasHdf): + def __init__(self, name: str): + super().__init__(name) + self.geom_path = "Geometry" + self.geom_structures_path = "Geometry/Structures" + self.flow_area_2d_path = "Geometry/2D Flow Areas" + def projection(self) -> Optional[CRS]: """Return the projection of the RAS geometry as a pyproj.CRS object. - + Returns ------- CRS @@ -27,11 +33,11 @@ def projection(self) -> Optional[CRS]: if type(proj_wkt) == bytes or type(proj_wkt) == np.bytes_: proj_wkt = proj_wkt.decode("utf-8") return CRS.from_wkt(proj_wkt) - + def mesh_area_names(self) -> list: - """Return a list of the 2D mesh area names of + """Return a list of the 2D mesh area names of the RAS geometry. - + Returns ------- list @@ -43,7 +49,7 @@ def mesh_area_names(self) -> list: def mesh_areas(self) -> GeoDataFrame: """Return 2D flow area perimeter polygons. - + Returns ------- GeoDataFrame @@ -53,11 +59,13 @@ def mesh_areas(self) -> GeoDataFrame: if not mesh_area_names: return GeoDataFrame() mesh_area_polygons = [Polygon(self[f"/Geometry/2D Flow Areas/{n}/Perimeter"][()]) for n in mesh_area_names] - return GeoDataFrame({"mesh_name" : mesh_area_names, "geometry" : mesh_area_polygons}, geometry="geometry", crs=self.projection()) + return GeoDataFrame( + {"mesh_name": mesh_area_names, "geometry": mesh_area_polygons}, geometry="geometry", crs=self.projection() + ) def mesh_cell_polygons(self) -> GeoDataFrame: """Return the 2D flow mesh cell polygons. - + Returns ------- GeoDataFrame @@ -69,30 +77,38 @@ def mesh_cell_polygons(self) -> GeoDataFrame: face_gdf = self.mesh_cell_faces() - cell_dict = {"mesh_name":[], "cell_id":[], "geometry":[]} + cell_dict = {"mesh_name": [], "cell_id": [], "geometry": []} for i, mesh_name in enumerate(mesh_area_names): cell_cnt = self["/Geometry/2D Flow Areas/Cell Info"][()][i][1] cell_ids = list(range(cell_cnt)) cell_face_info = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Info"][()] - cell_face_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Values"][()][:,0] + cell_face_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Values"][()][:, 0] face_id_lists = list( np.vectorize( - lambda cell_id: str(cell_face_values[cell_face_info[cell_id][0]:cell_face_info[cell_id][0]+cell_face_info[cell_id][1]]) + lambda cell_id: str( + cell_face_values[ + cell_face_info[cell_id][0] : cell_face_info[cell_id][0] + cell_face_info[cell_id][1] + ] + ) )(cell_ids) ) - mesh_faces = face_gdf[face_gdf.mesh_name == mesh_name][["face_id", "geometry"]].set_index("face_id").to_numpy() - cell_dict["mesh_name"] += [mesh_name]*cell_cnt + mesh_faces = ( + face_gdf[face_gdf.mesh_name == mesh_name][["face_id", "geometry"]].set_index("face_id").to_numpy() + ) + cell_dict["mesh_name"] += [mesh_name] * cell_cnt cell_dict["cell_id"] += cell_ids cell_dict["geometry"] += list( np.vectorize( - lambda face_id_list: polygonize(np.ravel(mesh_faces[np.array(face_id_list.strip("[]").split()).astype(int)])).geoms[0] + lambda face_id_list: polygonize( + np.ravel(mesh_faces[np.array(face_id_list.strip("[]").split()).astype(int)]) + ).geoms[0] )(face_id_lists) ) return GeoDataFrame(cell_dict, geometry="geometry", crs=self.projection()) def mesh_cell_points(self) -> GeoDataFrame: """Return the 2D flow mesh cell points. - + Returns ------- GeoDataFrame @@ -101,18 +117,20 @@ def mesh_cell_points(self) -> GeoDataFrame: mesh_area_names = self.mesh_area_names() if not mesh_area_names: return GeoDataFrame() - pnt_dict = {"mesh_name":[], "cell_id":[], "geometry":[]} + pnt_dict = {"mesh_name": [], "cell_id": [], "geometry": []} for i, mesh_name in enumerate(mesh_area_names): starting_row, count = self["/Geometry/2D Flow Areas/Cell Info"][()][i] - cell_pnt_coords = self["/Geometry/2D Flow Areas/Cell Points"][()][starting_row:starting_row+count] - pnt_dict["mesh_name"] += [mesh_name]*cell_pnt_coords.shape[0] + cell_pnt_coords = self["/Geometry/2D Flow Areas/Cell Points"][()][starting_row : starting_row + count] + pnt_dict["mesh_name"] += [mesh_name] * cell_pnt_coords.shape[0] pnt_dict["cell_id"] += range(count) - pnt_dict["geometry"] += list(np.vectorize(lambda coords: Point(coords), signature="(n)->()")(cell_pnt_coords)) + pnt_dict["geometry"] += list( + np.vectorize(lambda coords: Point(coords), signature="(n)->()")(cell_pnt_coords) + ) return GeoDataFrame(pnt_dict, geometry="geometry", crs=self.projection()) def mesh_cell_faces(self) -> GeoDataFrame: """Return the 2D flow mesh cell faces. - + Returns ------- GeoDataFrame @@ -121,7 +139,7 @@ def mesh_cell_faces(self) -> GeoDataFrame: mesh_area_names = self.mesh_area_names() if not mesh_area_names: return GeoDataFrame() - face_dict = {"mesh_name":[], "face_id":[], "geometry":[]} + face_dict = {"mesh_name": [], "face_id": [], "geometry": []} for mesh_name in mesh_area_names: facepoints_index = self[f"/Geometry/2D Flow Areas/{mesh_name}/Faces FacePoint Indexes"][()] facepoints_coordinates = self[f"/Geometry/2D Flow Areas/{mesh_name}/FacePoints Coordinate"][()] @@ -129,18 +147,55 @@ def mesh_cell_faces(self) -> GeoDataFrame: faces_perimeter_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Faces Perimeter Values"][()] face_id = -1 for pnt_a_index, pnt_b_index in facepoints_index: - face_id+=1 + face_id += 1 face_dict["mesh_name"].append(mesh_name) face_dict["face_id"].append(face_id) coordinates = list() coordinates.append(facepoints_coordinates[pnt_a_index]) starting_row, count = faces_perimeter_info[face_id] if count > 0: - coordinates += list(faces_perimeter_values[starting_row:starting_row+count]) + coordinates += list(faces_perimeter_values[starting_row : starting_row + count]) coordinates.append(facepoints_coordinates[pnt_b_index]) face_dict["geometry"].append(LineString(coordinates)) return GeoDataFrame(face_dict, geometry="geometry", crs=self.projection()) + def get_geom_attrs(self): + """Returns base geometry attributes from a HEC-RAS HDF geom file. + + Returns + ------- + dict + Dictionary filled with base geometry attributes. + """ + return self.get_attrs(self.geom_path) + + def get_geom_structures_attrs(self): + """Returns geometry structures attributes from a HEC-RAS HDF geom file. + + Returns + ------- + dict + Dictionary filled with geometry structures attributes. + """ + return self.get_attrs(self.geom_structures_path) + + def get_geom_2d_flow_area_attrs(self): + """Returns geometry 2d flow area attributes from a HEC-RAS HDF geom file. + + Returns + ------- + dict + Dictionary filled with geometry 2d flow area attributes. + """ + try: + d2_flow_area = get_first_hdf_group(self.get(self.flow_area_2d_path)) + except AttributeError: + raise AttributeError(f"Unable to get 2D Flow Area; {self.flow_area_2d_path} group not found in HDF5 file.") + + d2_flow_area_attrs = hdf5_attrs_to_dict(d2_flow_area.attrs) + + return d2_flow_area_attrs + def bc_lines(self) -> GeoDataFrame: raise NotImplementedError @@ -188,7 +243,7 @@ def flowpaths(self) -> GeoDataFrame: def bank_points(self) -> GeoDataFrame: raise NotImplementedError - + def bank_lines(self) -> GeoDataFrame: raise NotImplementedError diff --git a/src/rashdf/plan.py b/src/rashdf/plan.py index 3a072a2..cb6acdc 100644 --- a/src/rashdf/plan.py +++ b/src/rashdf/plan.py @@ -1,31 +1,79 @@ from .geom import RasGeomHdf -from .utils import * +from .base import RasHdf from typing import Dict from geopandas import GeoDataFrame -class RasPlanHdf(RasGeomHdf): +class RasPlanHdf(RasHdf): - def get_plan_attrs(self) -> Dict: - raise NotImplementedError + def __init__(self, name: str): + super().__init__(name) + self.plan_info_path = "Plan Data/Plan Information" + self.plan_params_path = "Plan Data/Plan Parameters" + self.meteorology_precip_path = "Event Conditions/Meteorology/Precipitation" + self.results_unsteady_path = "Results/Unsteady" + self.results_summary_path = "Results/Unsteady/Summary" + self.volume_accounting_path = "Results/Unsteady/Summary/Volume Accounting" def get_plan_info_attrs(self) -> Dict: - raise NotImplementedError + """Returns plan information attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with plan information attributes. + """ + return self.get_attrs(self.plan_info_path) def get_plan_param_attrs(self) -> Dict: - raise NotImplementedError + """Returns plan parameter attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with plan parameter attributes. + """ + return self.get_attrs(self.plan_params_path) def get_meteorology_precip_attrs(self) -> Dict: - raise NotImplementedError + """Returns precipitation attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with precipitation attributes. + """ + return self.get_attrs(self.meteorology_precip_path) def get_results_unsteady_attrs(self) -> Dict: - raise NotImplementedError + """Returns unsteady attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with unsteady attributes. + """ + return self.get_attrs(self.results_unsteady_path) def get_results_summary_attrs(self) -> Dict: - raise NotImplementedError + """Returns results summary attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with results summary attributes. + """ + return self.get_attrs(self.results_summary_path) def get_results_volume_accounting_attrs(self) -> Dict: - raise NotImplementedError + """Returns volume accounting attributes from a HEC-RAS HDF plan file. + + Returns + ------- + dict + Dictionary filled with volume accounting attributes. + """ + return self.get_attrs(self.volume_accounting_path) def enroachment_points(self) -> GeoDataFrame: raise NotImplementedError diff --git a/src/rashdf/utils.py b/src/rashdf/utils.py index 218ed28..7eeae60 100644 --- a/src/rashdf/utils.py +++ b/src/rashdf/utils.py @@ -1,6 +1,6 @@ import numpy as np - -from typing import Any, List, Tuple, Union +import h5py +from typing import Any, List, Tuple, Union, Optional from datetime import datetime, timedelta import re @@ -40,7 +40,7 @@ def parse_ras_simulation_window_datetime(datetime_str) -> datetime: def parse_run_time_window(window: str) -> Tuple[datetime, datetime]: """ Parse a run time window string into a tuple of datetime objects. - + Parameters ---------- window (str): The run time window string to be parsed. @@ -167,4 +167,49 @@ def convert_ras_hdf_value( # Convert all other types to string else: - return str(value) \ No newline at end of file + return str(value) + + +def hdf5_attrs_to_dict(attrs: dict, prefix: str = None) -> dict: + """ + Convert a dictionary of attributes from an HDF5 file into a Python dictionary. + + Parameters: + ---------- + attrs (dict): The attributes to be converted. + prefix (str, optional): An optional prefix to prepend to the keys. + + Returns: + ---------- + dict: A dictionary with the converted attributes. + """ + results = {} + for k, v in attrs.items(): + value = convert_ras_hdf_value(v) + if prefix: + key = f"{prefix}:{k}" + else: + key = k + results[key] = value + return results + + +def get_first_hdf_group(parent_group: h5py.Group) -> Optional[h5py.Group]: + """ + Get the first HDF5 group from a parent group. + + This function iterates over the items in the parent group and returns the first item that is an instance of + h5py.Group. If no such item is found, it returns None. + + Parameters: + ---------- + parent_group (h5py.Group): The parent group to search in. + + Returns: + ---------- + Optional[h5py.Group]: The first HDF5 group in the parent group, or None if no group is found. + """ + for _, item in parent_group.items(): + if isinstance(item, h5py.Group): + return item + return None