Skip to content

Commit

Permalink
[#8] initial adapter for feature aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
pkdash committed Mar 26, 2024
1 parent d676590 commit 4a2e6c5
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 34 deletions.
289 changes: 266 additions & 23 deletions hsextract/adapters/hydroshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,35 +327,60 @@ def to_catalog_dataset(self):
return aggregation_metadata


class BandInformation(BaseModel):
class RasterBandInformation(BaseModel):
name: str
variable_name: str
variable_unit: str
maximum_value: float
minimum_value: float
no_data_value: float

def to_aggregation_band_as_additional_property(self):
band = schema.PropertyValue.construct()
band.name = self.name
band.name = "bandInformation"
band.maxValue = self.maximum_value
band.minValue = self.minimum_value
band.value = schema.PropertyValue.construct()
band.value.name = "no_data_value"
band.value.value = self.no_data_value
band.value = []
band_name = schema.PropertyValue.construct()
band_name.name = "name"
band_name.value = self.name
band.value.append(band_name)
band_no_data = schema.PropertyValue.construct()
band_no_data.name = "noDataValue"
band_no_data.value = self.no_data_value
band.value.append(band_no_data)
variable_name = schema.PropertyValue.construct()
variable_name.name = "variableName"
variable_name.value = self.variable_name
band.value.append(variable_name)
variable_unit = schema.PropertyValue.construct()
variable_unit.name = "variableUnit"
variable_unit.value = self.variable_unit
band.value.append(variable_unit)
return band


class SpatialReference(BaseModel):
class RasterSpatialReference(BaseModel):
projection_string: str
projection: str
datum: str
eastlimit: float
northlimit: float
westlimit: Optional[float]
southlimit: Optional[float]
units: str

def to_aggregation_spatial_reference_as_additional_property(self):
def to_aggregation_spatial_reference_as_additional_property(self, coverage_type: str):
spatial_reference = schema.PropertyValue.construct()
spatial_reference.name = "spatial_reference"
spatial_reference.name = "spatialReference"
spatial_reference.unitCode = self.units
spatial_reference.value = []

datum = schema.PropertyValue.construct()
datum.name = "datum"
datum.value = self.datum
spatial_reference.value.append(datum)

proj_str = schema.PropertyValue.construct()
proj_str.name = "projection_string"
proj_str.value = self.projection_string
Expand All @@ -375,36 +400,84 @@ def to_aggregation_spatial_reference_as_additional_property(self):
north_limit.name = "northlimit"
north_limit.value = self.northlimit
spatial_reference.value.append(north_limit)
if coverage_type == "box":
west_limit = schema.PropertyValue.construct()
west_limit.name = "westlimit"
west_limit.value = self.westlimit
spatial_reference.value.append(west_limit)

south_limit = schema.PropertyValue.construct()
south_limit.name = "southlimit"
south_limit.value = self.southlimit
spatial_reference.value.append(south_limit)

return spatial_reference


class RasterCellInformation(BaseModel):
name: str
cell_data_type: str
cell_size_x_value: float
cell_size_y_value: float
columns: int
rows: int

def to_aggregation_cell_info_as_additional_properties(self):
cell_info = schema.PropertyValue.construct()
cell_info.name = "cellInformation"
cell_info.value = []

cell_name = schema.PropertyValue.construct()
cell_name.name = "name"
cell_name.value = self.name
cell_info.value.append(cell_name)

cell_data_type = schema.PropertyValue.construct()
cell_data_type.name = "cellDataType"
cell_data_type.value = self.cell_data_type
cell_info.value.append(cell_data_type)

cell_size_x = schema.PropertyValue.construct()
cell_size_x.name = "cellSizeXValue"
cell_size_x.value = self.cell_size_x_value
cell_info.value.append(cell_size_x)

cell_size_y = schema.PropertyValue.construct()
cell_size_y.name = "cellSizeYValue"
cell_size_y.value = self.cell_size_y_value
cell_info.value.append(cell_size_y)

columns = schema.PropertyValue.construct()
columns.name = "columns"
columns.value = self.columns
cell_info.value.append(columns)

rows = schema.PropertyValue.construct()
rows.name = "rows"
rows.value = self.rows
cell_info.value.append(rows)
return cell_info


class _RasterAggregationMetadata(BaseModel):
title: Optional[str]
spatial_coverage: Optional[Union[SpatialCoverageBox, SpatialCoveragePoint]]
period_coverage: Optional[TemporalCoverage]
# the extracted file (media object) metadata is already in schema.MediaObject format
associatedMedia: Optional[List[schema.MediaObject]]
band_information: BandInformation
cell_information: dict
spatial_reference: SpatialReference

def to_aggregation_cell_info_as_additional_properties(self):
additional_properties = []
if self.cell_information:
for key, value in self.cell_information.items():
prop = schema.PropertyValue.construct()
prop.name = key
prop.value = value
additional_properties.append(prop)
return additional_properties
band_information: RasterBandInformation
cell_information: RasterCellInformation
spatial_reference: RasterSpatialReference

def to_aggregation_spatial_coverage(self):
if self.spatial_coverage:
coverage_type = "box" if isinstance(self.spatial_coverage, SpatialCoverageBox) else "point"
aggr_spatial_coverage = self.spatial_coverage.to_dataset_spatial_coverage()
if aggr_spatial_coverage:
aggr_spatial_coverage.additionalProperty = [
self.spatial_reference.to_aggregation_spatial_reference_as_additional_property()]
self.spatial_reference.to_aggregation_spatial_reference_as_additional_property(
coverage_type=coverage_type)
]

return aggr_spatial_coverage
return None
Expand All @@ -419,7 +492,9 @@ def to_catalog_dataset(self):
aggregation_metadata.name = self.title
aggregation_metadata.spatialCoverage = self.to_aggregation_spatial_coverage()
aggregation_metadata.temporalCoverage = self.to_aggregation_period_coverage()
aggregation_metadata.additionalProperty = self.to_aggregation_cell_info_as_additional_properties()
aggregation_metadata.additionalProperty = []
aggregation_metadata.additionalProperty.append(
self.cell_information.to_aggregation_cell_info_as_additional_properties())
aggregation_metadata.additionalProperty.append(
self.band_information.to_aggregation_band_as_additional_property())
aggregation_metadata.associatedMedia = self.associatedMedia
Expand All @@ -432,3 +507,171 @@ def to_catalog_record(aggr_metadata: dict):
"""Converts extracted raster aggregation metadata to a catalog dataset record"""
aggr_model = _RasterAggregationMetadata(**aggr_metadata)
return aggr_model.to_catalog_dataset()


class FieldInformation(BaseModel):
field_name: str
field_precision: int
field_type: str
field_type_code: int
field_width: int

def to_aggregation_field_info_as_additional_property(self, field_info):
field_name = schema.PropertyValue.construct()
field_name.name = "fieldName"
field_name.value = self.field_name
field_info.value.append(field_name)

field_precision = schema.PropertyValue.construct()
field_precision.name = "fieldPrecision"
field_precision.value = self.field_precision
field_info.value.append(field_precision)

field_type = schema.PropertyValue.construct()
field_type.name = "fieldType"
field_type.value = self.field_type
field_info.value.append(field_type)

field_type_code = schema.PropertyValue.construct()
field_type_code.name = "fieldTypeCode"
field_type_code.value = self.field_type_code
field_info.value.append(field_type_code)

field_width = schema.PropertyValue.construct()
field_width.name = "fieldWidth"
field_width.value = self.field_width
field_info.value.append(field_width)


class GeometryInformation(BaseModel):
feature_count: int
geometry_type: str

def to_aggregation_geometry_info_as_additional_property(self):
geometry_info = schema.PropertyValue.construct()
geometry_info.name = "geometryInformation"
geometry_info.value = []

feature_count = schema.PropertyValue.construct()
feature_count.name = "featureCount"
feature_count.value = self.feature_count
geometry_info.value.append(feature_count)

geometry_type = schema.PropertyValue.construct()
geometry_type.name = "geometryType"
geometry_type.value = self.geometry_type
geometry_info.value.append(geometry_type)
return geometry_info


class FeatureSpatialReference(BaseModel):
projection_string: str
projection_name: str
datum: str
eastlimit: float
northlimit: float
westlimit: Optional[float]
southlimit: Optional[float]
units: str

def to_aggregation_spatial_reference_as_additional_property(self, coverage_type: str):
spatial_reference = schema.PropertyValue.construct()
spatial_reference.name = "spatialReference"
spatial_reference.unitCode = self.units
spatial_reference.value = []

datum = schema.PropertyValue.construct()
datum.name = "datum"
datum.value = self.datum
spatial_reference.value.append(datum)

proj_str = schema.PropertyValue.construct()
proj_str.name = "projection_string"
proj_str.value = self.projection_string
spatial_reference.value.append(proj_str)

projection = schema.PropertyValue.construct()
projection.name = "projection_name"
projection.value = self.projection_name
spatial_reference.value.append(projection)

east_limit = schema.PropertyValue.construct()
east_limit.name = "eastlimit"
east_limit.value = self.eastlimit
spatial_reference.value.append(east_limit)

north_limit = schema.PropertyValue.construct()
north_limit.name = "northlimit"
north_limit.value = self.northlimit
spatial_reference.value.append(north_limit)
if coverage_type == "box":
west_limit = schema.PropertyValue.construct()
west_limit.name = "westlimit"
west_limit.value = self.westlimit
spatial_reference.value.append(west_limit)

south_limit = schema.PropertyValue.construct()
south_limit.name = "southlimit"
south_limit.value = self.southlimit
spatial_reference.value.append(south_limit)
return spatial_reference


class _FeatureAggregationMetadata(BaseModel):
title: Optional[str]
abstract: Optional[str]
spatial_coverage: Optional[Union[SpatialCoverageBox, SpatialCoveragePoint]]
period_coverage: Optional[TemporalCoverage]
# the extracted file (media object) metadata is already in schema.MediaObject format
associatedMedia: Optional[List[schema.MediaObject]]
geometry_information: GeometryInformation
field_information: List[FieldInformation]
spatial_reference: FeatureSpatialReference

def to_aggregation_spatial_coverage(self):
if self.spatial_coverage:
coverage_type = "box" if isinstance(self.spatial_coverage, SpatialCoverageBox) else "point"
aggr_spatial_coverage = self.spatial_coverage.to_dataset_spatial_coverage()
if aggr_spatial_coverage:
aggr_spatial_coverage.additionalProperty = [
self.spatial_reference.to_aggregation_spatial_reference_as_additional_property(
coverage_type=coverage_type)
]

return aggr_spatial_coverage
return None

def to_aggregation_period_coverage(self):
if self.period_coverage:
return self.period_coverage.to_dataset_temporal_coverage()
return None

def to_aggregation_field_info_as_additional_property(self):
field_info = schema.PropertyValue.construct()
field_info.name = "fieldInformation"
field_info.value = []
for field in self.field_information:
field.to_aggregation_field_info_as_additional_property(field_info=field_info)
return field_info

def to_catalog_dataset(self):
aggregation_metadata = schema.FeatureAggregationMetadata.construct()
aggregation_metadata.name = self.title
aggregation_metadata.description = self.abstract
aggregation_metadata.spatialCoverage = self.to_aggregation_spatial_coverage()
aggregation_metadata.temporalCoverage = self.to_aggregation_period_coverage()
aggregation_metadata.additionalProperty = []
aggregation_metadata.additionalProperty.append(self.to_aggregation_field_info_as_additional_property())

aggregation_metadata.additionalProperty.append(
self.geometry_information.to_aggregation_geometry_info_as_additional_property())
aggregation_metadata.associatedMedia = self.associatedMedia
return aggregation_metadata


class FeatureAggregationMetadataAdapter:
@staticmethod
def to_catalog_record(aggr_metadata: dict):
"""Converts extracted feature aggregation metadata to a catalog dataset record"""
aggr_model = _FeatureAggregationMetadata(**aggr_metadata)
return aggr_model.to_catalog_dataset()
Loading

0 comments on commit 4a2e6c5

Please sign in to comment.