Skip to content

Commit

Permalink
Issue #346 Some more ProcessArgs porting
Browse files Browse the repository at this point in the history
for less boilerplate code and better/earlier error messages
  • Loading branch information
soxofaan committed Dec 18, 2024
1 parent aaffbe5 commit fe014b4
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 163 deletions.
268 changes: 112 additions & 156 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import time
import warnings
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union, Sequence
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union, Sequence, Optional

import geopandas as gpd
import numpy as np
Expand Down Expand Up @@ -931,22 +931,18 @@ def save_result(args: Dict, env: EvalEnv) -> SaveResult: # TODO: return type no

@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/save_ml_model.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/save_ml_model.json"))
def save_ml_model(args: dict, env: EvalEnv) -> MlModelResult:
data: DriverMlModel = extract_arg(args, "data", process_id="save_ml_model")
if not isinstance(data, DriverMlModel):
raise ProcessParameterInvalidException(
parameter="data", process="save_ml_model", reason=f"Invalid data type {type(data)!r} expected raster-cube."
)
options = args.get("options", {})
def save_ml_model(args: ProcessArgs, env: EvalEnv) -> MlModelResult:
data = args.get_required("data", expected_type=DriverMlModel)
options = args.get_optional("options", default={}, expected_type=dict)
return MlModelResult(ml_model=data, options=options)


@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/load_ml_model.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/load_ml_model.json"))
def load_ml_model(args: dict, env: EvalEnv) -> DriverMlModel:
def load_ml_model(args: ProcessArgs, env: EvalEnv) -> DriverMlModel:
if env.get(ENV_DRY_RUN_TRACER):
return DriverMlModel()
job_id = extract_arg(args, "id")
job_id = args.get_required("id", expected_type=str)
return env.backend_implementation.load_ml_model(job_id)


Expand Down Expand Up @@ -1182,51 +1178,34 @@ def add_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:


@process
def drop_dimension(args: dict, env: EvalEnv) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
if not isinstance(data_cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="drop_dimension",
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
)
return data_cube.drop_dimension(name=extract_arg(args, 'name'))
def drop_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
name: str = args.get_required("name", expected_type=str)
return cube.drop_dimension(name=name)


@process
def dimension_labels(args: dict, env: EvalEnv) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
if not isinstance(data_cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="dimension_labels",
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
)
return data_cube.dimension_labels(dimension=extract_arg(args, 'dimension'))
def dimension_labels(args: ProcessArgs, env: EvalEnv) -> List[str]:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
dimension: str = args.get_required("dimension", expected_type=str)
return cube.dimension_labels(dimension=dimension)


@process
def rename_dimension(args: dict, env: EvalEnv) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
if not isinstance(data_cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="rename_dimension",
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
)
return data_cube.rename_dimension(source=extract_arg(args, 'source'),target=extract_arg(args, 'target'))
def rename_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
source: str = args.get_required("source", expected_type=str)
target: str = args.get_required("target", expected_type=str)
return cube.rename_dimension(source=source, target=target)


@process
def rename_labels(args: dict, env: EvalEnv) -> DriverDataCube:
data_cube = extract_arg(args, 'data')
if not isinstance(data_cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="rename_labels",
reason=f"Invalid data type {type(data_cube)!r} expected raster-cube."
)
return data_cube.rename_labels(
dimension=extract_arg(args, 'dimension'),
target=extract_arg(args, 'target'),
source=args.get('source',[])
)
def rename_labels(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
dimension: str = args.get_required("dimension", expected_type=str)
target: List = args.get_required("target", expected_type=list)
source: Optional[list] = args.get_optional("source", default=None, expected_type=list)
return cube.rename_labels(dimension=dimension, target=target, source=source)


@process
Expand Down Expand Up @@ -1372,14 +1351,10 @@ def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:


@process
def mask(args: dict, env: EvalEnv) -> DriverDataCube:
cube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="mask", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
mask = extract_arg(args, 'mask')
replacement = args.get('replacement', None)
def mask(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
mask: DriverDataCube = args.get_required("mask", expected_type=DriverDataCube)
replacement = args.get_optional("replacement", default=None)
return cube.mask(mask=mask, replacement=replacement)


Expand Down Expand Up @@ -1411,7 +1386,10 @@ def mask_polygon(args: dict, env: EvalEnv) -> DriverDataCube:
return image_collection


def _extract_temporal_extent(args: dict, field="extent", process_id="filter_temporal") -> Tuple[str, str]:
def _extract_temporal_extent(
args: Union[dict, ProcessArgs], field="extent", process_id="filter_temporal"
) -> Tuple[str, str]:
# TODO #346: make this a ProcessArgs method?
extent = extract_arg(args, name=field, process_id=process_id)
if len(extent) != 2:
raise ProcessParameterInvalidException(
Expand All @@ -1436,29 +1414,27 @@ def _extract_temporal_extent(args: dict, field="extent", process_id="filter_temp


@process
def filter_temporal(args: dict, env: EvalEnv) -> DriverDataCube:
cube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="filter_temporal",
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
def filter_temporal(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
extent = _extract_temporal_extent(args, field="extent", process_id="filter_temporal")
return cube.filter_temporal(start=extent[0], end=extent[1])


@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/filter_labels.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/filter_labels.json"))
def filter_labels(args: dict, env: EvalEnv) -> DriverDataCube:
cube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="filter_labels",
reason=f"Invalid data type {type(cube)!r} expected cube."
)
def filter_labels(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
# TODO: validation that condition is a process graph construct
condition = args.get_required("condition", expected_type=dict)
dimension = args.get_required("dimension", expected_type=str)
context = args.get_optional("context", default=None)
return cube.filter_labels(condition=condition, dimension=dimension, context=context, env=env)

return cube.filter_labels(condition=extract_arg(args,"condition"),dimension=extract_arg(args,"dimension"),context=args.get("context",None),env=env)

def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", handle_geojson=False) -> dict:
def _extract_bbox_extent(
args: Union[dict, ProcessArgs], field="extent", process_id="filter_bbox", handle_geojson=False
) -> dict:
# TODO #346: make this a ProcessArgs method?
extent = extract_arg(args, name=field, process_id=process_id)
if handle_geojson and extent.get("type") in [
"Polygon",
Expand All @@ -1483,24 +1459,16 @@ def _extract_bbox_extent(args: dict, field="extent", process_id="filter_bbox", h


@process
def filter_bbox(args: Dict, env: EvalEnv) -> DriverDataCube:
cube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="filter_bbox", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
def filter_bbox(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
spatial_extent = _extract_bbox_extent(args, "extent", process_id="filter_bbox")
return cube.filter_bbox(**spatial_extent)


@process
def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
cube = extract_arg(args, 'data')
geometries = extract_arg(args, 'geometries')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="filter_spatial", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
def filter_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
geometries = args.get_required("geometries")

if isinstance(geometries, dict):
if "type" in geometries and geometries["type"] != "GeometryCollection":
Expand Down Expand Up @@ -1529,32 +1497,22 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:


@process
def filter_bands(args: Dict, env: EvalEnv) -> Union[DriverDataCube, DriverVectorCube]:
cube: Union[DriverDataCube, DriverVectorCube] = extract_arg(args, "data")
if not isinstance(cube, DriverDataCube) and not isinstance(cube, DriverVectorCube):
raise ProcessParameterInvalidException(
parameter="data", process="filter_bands", reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
bands = extract_arg(args, "bands", process_id="filter_bands")
def filter_bands(args: ProcessArgs, env: EvalEnv) -> Union[DriverDataCube, DriverVectorCube]:
cube: Union[DriverDataCube, DriverVectorCube] = args.get_required(
"data", expected_type=(DriverDataCube, DriverVectorCube)
)
bands = args.get_required("bands", expected_type=list)
return cube.filter_bands(bands=bands)


@process
def apply_kernel(args: Dict, env: EvalEnv) -> DriverDataCube:
image_collection = extract_arg(args, 'data')
kernel = np.asarray(extract_arg(args, 'kernel'))
factor = args.get('factor', 1.0)
border = args.get('border', 0)
if not isinstance(image_collection, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="apply_kernel",
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
)
if border == "0":
# R-client sends `0` border as a string
border = 0
replace_invalid = args.get('replace_invalid', 0)
return image_collection.apply_kernel(kernel=kernel, factor=factor, border=border, replace_invalid=replace_invalid)
def apply_kernel(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
kernel = np.asarray(args.get_required("kernel", expected_type=list))
factor = args.get_optional("factor", default=1.0, expected_type=(int, float))
border = args.get_optional("border", default=0, expected_type=int)
replace_invalid = args.get_optional("replace_invalid", default=0, expected_type=(int, float))
return cube.apply_kernel(kernel=kernel, factor=factor, border=border, replace_invalid=replace_invalid)


@process
Expand Down Expand Up @@ -1586,16 +1544,30 @@ def resample_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:


@process
def resample_cube_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
image_collection = extract_arg(args, 'data')
target_image_collection = extract_arg(args, 'target')
method = args.get('method', 'near')
if not isinstance(image_collection, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="resample_cube_spatial",
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
)
return image_collection.resample_cube_spatial(target=target_image_collection, method=method)
def resample_cube_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
target: DriverDataCube = args.get_required("target", expected_type=DriverDataCube)
method = args.get_enum(
"method",
options=[
"average",
"bilinear",
"cubic",
"cubicspline",
"lanczos",
"max",
"med",
"min",
"mode",
"near",
"q1",
"q3",
"rms",
"sum",
],
default="near",
)
return cube.resample_cube_spatial(target=target, method=method)


@process
Expand Down Expand Up @@ -1694,20 +1666,17 @@ def run_udf(args: dict, env: EvalEnv):


@process
def linear_scale_range(args: dict, env: EvalEnv) -> DriverDataCube:
image_collection = extract_arg(args, 'x')

inputMin = extract_arg(args, "inputMin")
inputMax = extract_arg(args, "inputMax")
outputMax = args.get("outputMax", 1.0)
outputMin = args.get("outputMin", 0.0)
if not isinstance(image_collection, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="linear_scale_range",
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
)

return image_collection.linear_scale_range(inputMin, inputMax, outputMin, outputMax)
def linear_scale_range(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
# TODO: eliminate this top-level linear_scale_range process implementation (should be used as `apply` callback)
_log.warning("DEPRECATED: linear_scale_range usage directly on cube is deprecated/non-standard.")
cube: DriverDataCube = args.get_required("x", expected_type=DriverDataCube)
# Note: non-standard camelCase parameter names (https://github.com/Open-EO/openeo-processes/issues/302)
input_min = args.get_required("inputMin")
input_max = args.get_required("inputMax")
output_min = args.get_optional("outputMin", default=0.0)
output_max = args.get_optional("outputMax", default=1.0)
# TODO linear_scale_range is defined on GeopysparkDataCube, but not on DriverDataCube
return cube.linear_scale_range(input_min, input_max, output_min, output_max)


@process
Expand Down Expand Up @@ -1984,14 +1953,10 @@ def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]:
.param('data', description="A raster data cube.", schema={"type": "object", "subtype": "raster-cube"})
.returns("vector-cube", schema={"type": "object", "subtype": "vector-cube"})
)
def raster_to_vector(args: Dict, env: EvalEnv):
image_collection = extract_arg(args, 'data')
if not isinstance(image_collection, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="raster_to_vector",
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
)
return image_collection.raster_to_vector()
def raster_to_vector(args: ProcessArgs, env: EvalEnv):
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
# TODO: raster_to_vector is only defined on GeopysparkDataCube, not DriverDataCube
return cube.raster_to_vector()


@non_standard_process(
Expand Down Expand Up @@ -2231,13 +2196,8 @@ def discard_result(args: ProcessArgs, env: EvalEnv):

@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/mask_scl_dilation.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/mask_scl_dilation.json"))
def mask_scl_dilation(args: Dict, env: EvalEnv):
cube: DriverDataCube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="mask_scl_dilation",
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
def mask_scl_dilation(args: ProcessArgs, env: EvalEnv):
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
if hasattr(cube, "mask_scl_dilation"):
the_args = args.copy()
del the_args["data"]
Expand Down Expand Up @@ -2268,13 +2228,8 @@ def to_scl_dilation_mask(args: ProcessArgs, env: EvalEnv):

@process_registry_100.add_function(spec=read_spec("openeo-processes/experimental/mask_l1c.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/experimental/mask_l1c.json"))
def mask_l1c(args: Dict, env: EvalEnv):
cube: DriverDataCube = extract_arg(args, 'data')
if not isinstance(cube, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="mask_l1c",
reason=f"Invalid data type {type(cube)!r} expected raster-cube."
)
def mask_l1c(args: ProcessArgs, env: EvalEnv):
cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
if hasattr(cube, "mask_l1c"):
return cube.mask_l1c()
else:
Expand Down Expand Up @@ -2369,10 +2324,11 @@ def load_result(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:

@process_registry_100.add_function(spec=read_spec("openeo-processes/1.x/proposals/inspect.json"))
@process_registry_2xx.add_function(spec=read_spec("openeo-processes/2.x/proposals/inspect.json"))
def inspect(args: dict, env: EvalEnv):
data = extract_arg(args, "data")
message = args.get("message", "")
level = args.get("level", "info")
def inspect(args: ProcessArgs, env: EvalEnv):
data = args.get_required("data")
message = args.get_optional("message", default="")
code = args.get_optional("code", default="User")
level = args.get_optional("level", default="info")
if message:
_log.log(level=logging.getLevelName(level.upper()), msg=message)
data_message = str(data)
Expand Down
Loading

0 comments on commit fe014b4

Please sign in to comment.