Skip to content

Commit

Permalink
Add attribute retrieval functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Stevenray Janke committed Apr 29, 2024
1 parent fe5aeb5 commit bccd086
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 42 deletions.
39 changes: 35 additions & 4 deletions src/rashdf/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import h5py
from .utils import hdf5_attrs_to_dict
from typing import Dict


class RasHdf(h5py.File):
"""Base class for reading RAS HDF files."""

def __init__(self, name: str, **kwargs):
"""Open a HEC-RAS HDF file.
Parameters
----------
name : str
The path to the RAS HDF file.
kwargs : dict
Additional keyword arguments to pass to h5py.File
"""
super().__init__(name, mode="r", **kwargs)
super().__init__(name, **kwargs)

@classmethod
def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) -> 'RasHdf':
def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) -> "RasHdf":
"""Open a HEC-RAS HDF file from a URI.
Parameters
Expand All @@ -41,5 +43,34 @@ def open_uri(cls, uri: str, fsspec_kwargs: dict = {}, h5py_kwargs: dict = {}) ->
>>> results_hdf = RasHdf.open_uri("s3://my-bucket/results.hdf")
"""
import fsspec

remote_file = fsspec.open(uri, mode="rb", **fsspec_kwargs)
return cls(remote_file.open(), **h5py_kwargs)
return cls(remote_file.open(), **h5py_kwargs)

def get_attrs(self, attr_path: str) -> Dict:
"""Convert attributes from a HEC-RAS HDF file into a Python dictionary for a given attribute path.
Parameters
----------
attr_path (str): The path within the HEC-RAS HDF file where the desired attributes are located (Ex. "Plan Data/Plan Parameters").
Returns
-------
plan_attrs (dict): Dictionary filled with attributes at given path, if attributes exist at that path.
"""
attr_object = self.get(attr_path)

if attr_object:
return hdf5_attrs_to_dict(attr_object.attrs)

return {}

def get_root_attrs(self):
"""Returns attributes at root level of HEC-RAS HDF file.
Returns
-------
dict
Dictionary filled with HEC-RAS HDF root attributes.
"""
return self.get_attrs("/")
103 changes: 79 additions & 24 deletions src/rashdf/geom.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .base import RasHdf
from .utils import convert_ras_hdf_string
from .utils import convert_ras_hdf_string, get_first_hdf_group, hdf5_attrs_to_dict

import numpy as np
import pandas as pd
Expand All @@ -12,10 +12,16 @@

class RasGeomHdf(RasHdf):

def __init__(self, name: str):
super().__init__(name)
self.geom_path = "Geometry"
self.geom_structures_path = "Geometry/Structures"
self.flow_area_2d_path = "Geometry/2D Flow Areas"

def projection(self) -> Optional[CRS]:
"""Return the projection of the RAS geometry as a
pyproj.CRS object.
Returns
-------
CRS
Expand All @@ -27,11 +33,11 @@ def projection(self) -> Optional[CRS]:
if type(proj_wkt) == bytes or type(proj_wkt) == np.bytes_:
proj_wkt = proj_wkt.decode("utf-8")
return CRS.from_wkt(proj_wkt)

def mesh_area_names(self) -> list:
"""Return a list of the 2D mesh area names of
"""Return a list of the 2D mesh area names of
the RAS geometry.
Returns
-------
list
Expand All @@ -43,7 +49,7 @@ def mesh_area_names(self) -> list:

def mesh_areas(self) -> GeoDataFrame:
"""Return 2D flow area perimeter polygons.
Returns
-------
GeoDataFrame
Expand All @@ -53,11 +59,13 @@ def mesh_areas(self) -> GeoDataFrame:
if not mesh_area_names:
return GeoDataFrame()
mesh_area_polygons = [Polygon(self[f"/Geometry/2D Flow Areas/{n}/Perimeter"][()]) for n in mesh_area_names]
return GeoDataFrame({"mesh_name" : mesh_area_names, "geometry" : mesh_area_polygons}, geometry="geometry", crs=self.projection())
return GeoDataFrame(
{"mesh_name": mesh_area_names, "geometry": mesh_area_polygons}, geometry="geometry", crs=self.projection()
)

def mesh_cell_polygons(self) -> GeoDataFrame:
"""Return the 2D flow mesh cell polygons.
Returns
-------
GeoDataFrame
Expand All @@ -69,30 +77,38 @@ def mesh_cell_polygons(self) -> GeoDataFrame:

face_gdf = self.mesh_cell_faces()

cell_dict = {"mesh_name":[], "cell_id":[], "geometry":[]}
cell_dict = {"mesh_name": [], "cell_id": [], "geometry": []}
for i, mesh_name in enumerate(mesh_area_names):
cell_cnt = self["/Geometry/2D Flow Areas/Cell Info"][()][i][1]
cell_ids = list(range(cell_cnt))
cell_face_info = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Info"][()]
cell_face_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Values"][()][:,0]
cell_face_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Cells Face and Orientation Values"][()][:, 0]
face_id_lists = list(
np.vectorize(
lambda cell_id: str(cell_face_values[cell_face_info[cell_id][0]:cell_face_info[cell_id][0]+cell_face_info[cell_id][1]])
lambda cell_id: str(
cell_face_values[
cell_face_info[cell_id][0] : cell_face_info[cell_id][0] + cell_face_info[cell_id][1]
]
)
)(cell_ids)
)
mesh_faces = face_gdf[face_gdf.mesh_name == mesh_name][["face_id", "geometry"]].set_index("face_id").to_numpy()
cell_dict["mesh_name"] += [mesh_name]*cell_cnt
mesh_faces = (
face_gdf[face_gdf.mesh_name == mesh_name][["face_id", "geometry"]].set_index("face_id").to_numpy()
)
cell_dict["mesh_name"] += [mesh_name] * cell_cnt
cell_dict["cell_id"] += cell_ids
cell_dict["geometry"] += list(
np.vectorize(
lambda face_id_list: polygonize(np.ravel(mesh_faces[np.array(face_id_list.strip("[]").split()).astype(int)])).geoms[0]
lambda face_id_list: polygonize(
np.ravel(mesh_faces[np.array(face_id_list.strip("[]").split()).astype(int)])
).geoms[0]
)(face_id_lists)
)
return GeoDataFrame(cell_dict, geometry="geometry", crs=self.projection())

def mesh_cell_points(self) -> GeoDataFrame:
"""Return the 2D flow mesh cell points.
Returns
-------
GeoDataFrame
Expand All @@ -101,18 +117,20 @@ def mesh_cell_points(self) -> GeoDataFrame:
mesh_area_names = self.mesh_area_names()
if not mesh_area_names:
return GeoDataFrame()
pnt_dict = {"mesh_name":[], "cell_id":[], "geometry":[]}
pnt_dict = {"mesh_name": [], "cell_id": [], "geometry": []}
for i, mesh_name in enumerate(mesh_area_names):
starting_row, count = self["/Geometry/2D Flow Areas/Cell Info"][()][i]
cell_pnt_coords = self["/Geometry/2D Flow Areas/Cell Points"][()][starting_row:starting_row+count]
pnt_dict["mesh_name"] += [mesh_name]*cell_pnt_coords.shape[0]
cell_pnt_coords = self["/Geometry/2D Flow Areas/Cell Points"][()][starting_row : starting_row + count]
pnt_dict["mesh_name"] += [mesh_name] * cell_pnt_coords.shape[0]
pnt_dict["cell_id"] += range(count)
pnt_dict["geometry"] += list(np.vectorize(lambda coords: Point(coords), signature="(n)->()")(cell_pnt_coords))
pnt_dict["geometry"] += list(
np.vectorize(lambda coords: Point(coords), signature="(n)->()")(cell_pnt_coords)
)
return GeoDataFrame(pnt_dict, geometry="geometry", crs=self.projection())

def mesh_cell_faces(self) -> GeoDataFrame:
"""Return the 2D flow mesh cell faces.
Returns
-------
GeoDataFrame
Expand All @@ -121,26 +139,63 @@ def mesh_cell_faces(self) -> GeoDataFrame:
mesh_area_names = self.mesh_area_names()
if not mesh_area_names:
return GeoDataFrame()
face_dict = {"mesh_name":[], "face_id":[], "geometry":[]}
face_dict = {"mesh_name": [], "face_id": [], "geometry": []}
for mesh_name in mesh_area_names:
facepoints_index = self[f"/Geometry/2D Flow Areas/{mesh_name}/Faces FacePoint Indexes"][()]
facepoints_coordinates = self[f"/Geometry/2D Flow Areas/{mesh_name}/FacePoints Coordinate"][()]
faces_perimeter_info = self[f"/Geometry/2D Flow Areas/{mesh_name}/Faces Perimeter Info"][()]
faces_perimeter_values = self[f"/Geometry/2D Flow Areas/{mesh_name}/Faces Perimeter Values"][()]
face_id = -1
for pnt_a_index, pnt_b_index in facepoints_index:
face_id+=1
face_id += 1
face_dict["mesh_name"].append(mesh_name)
face_dict["face_id"].append(face_id)
coordinates = list()
coordinates.append(facepoints_coordinates[pnt_a_index])
starting_row, count = faces_perimeter_info[face_id]
if count > 0:
coordinates += list(faces_perimeter_values[starting_row:starting_row+count])
coordinates += list(faces_perimeter_values[starting_row : starting_row + count])
coordinates.append(facepoints_coordinates[pnt_b_index])
face_dict["geometry"].append(LineString(coordinates))
return GeoDataFrame(face_dict, geometry="geometry", crs=self.projection())

def get_geom_attrs(self):
"""Returns base geometry attributes from a HEC-RAS HDF geom file.
Returns
-------
dict
Dictionary filled with base geometry attributes.
"""
return self.get_attrs(self.geom_path)

def get_geom_structures_attrs(self):
"""Returns geometry structures attributes from a HEC-RAS HDF geom file.
Returns
-------
dict
Dictionary filled with geometry structures attributes.
"""
return self.get_attrs(self.geom_structures_path)

def get_geom_2d_flow_area_attrs(self):
"""Returns geometry 2d flow area attributes from a HEC-RAS HDF geom file.
Returns
-------
dict
Dictionary filled with geometry 2d flow area attributes.
"""
try:
d2_flow_area = get_first_hdf_group(self.get(self.flow_area_2d_path))
except AttributeError:
raise AttributeError(f"Unable to get 2D Flow Area; {self.flow_area_2d_path} group not found in HDF5 file.")

d2_flow_area_attrs = hdf5_attrs_to_dict(d2_flow_area.attrs)

return d2_flow_area_attrs

def bc_lines(self) -> GeoDataFrame:
raise NotImplementedError

Expand Down Expand Up @@ -188,7 +243,7 @@ def flowpaths(self) -> GeoDataFrame:

def bank_points(self) -> GeoDataFrame:
raise NotImplementedError

def bank_lines(self) -> GeoDataFrame:
raise NotImplementedError

Expand Down
68 changes: 58 additions & 10 deletions src/rashdf/plan.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,79 @@
from .geom import RasGeomHdf
from .utils import *
from .base import RasHdf
from typing import Dict
from geopandas import GeoDataFrame


class RasPlanHdf(RasGeomHdf):
class RasPlanHdf(RasHdf):

def get_plan_attrs(self) -> Dict:
raise NotImplementedError
def __init__(self, name: str):
super().__init__(name)
self.plan_info_path = "Plan Data/Plan Information"
self.plan_params_path = "Plan Data/Plan Parameters"
self.meteorology_precip_path = "Event Conditions/Meteorology/Precipitation"
self.results_unsteady_path = "Results/Unsteady"
self.results_summary_path = "Results/Unsteady/Summary"
self.volume_accounting_path = "Results/Unsteady/Summary/Volume Accounting"

def get_plan_info_attrs(self) -> Dict:
raise NotImplementedError
"""Returns plan information attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with plan information attributes.
"""
return self.get_attrs(self.plan_info_path)

def get_plan_param_attrs(self) -> Dict:
raise NotImplementedError
"""Returns plan parameter attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with plan parameter attributes.
"""
return self.get_attrs(self.plan_params_path)

def get_meteorology_precip_attrs(self) -> Dict:
raise NotImplementedError
"""Returns precipitation attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with precipitation attributes.
"""
return self.get_attrs(self.meteorology_precip_path)

def get_results_unsteady_attrs(self) -> Dict:
raise NotImplementedError
"""Returns unsteady attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with unsteady attributes.
"""
return self.get_attrs(self.results_unsteady_path)

def get_results_summary_attrs(self) -> Dict:
raise NotImplementedError
"""Returns results summary attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with results summary attributes.
"""
return self.get_attrs(self.results_summary_path)

def get_results_volume_accounting_attrs(self) -> Dict:
raise NotImplementedError
"""Returns volume accounting attributes from a HEC-RAS HDF plan file.
Returns
-------
dict
Dictionary filled with volume accounting attributes.
"""
return self.get_attrs(self.volume_accounting_path)

def enroachment_points(self) -> GeoDataFrame:
raise NotImplementedError
Loading

0 comments on commit bccd086

Please sign in to comment.