diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d58271e..04b4d793 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +- Added support for dynamically-generated queryables based on Elasticsearch/OpenSearch mappings, with extensible metadata augmentation [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) +- Included default queryables configuration for seamless integration. [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) ### Changed +- Refactored database logic to reduce duplication [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) +- Replaced `fastapi-slim` with `fastapi` dependency [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) + +### Fixed +- Improved performance of `mk_actions` and `filter-links` methods [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) ## [v3.2.5] - 2025-04-07 diff --git a/stac_fastapi/core/setup.py b/stac_fastapi/core/setup.py index 01191c1b..aedbe231 100644 --- a/stac_fastapi/core/setup.py +++ b/stac_fastapi/core/setup.py @@ -6,7 +6,7 @@ desc = f.read() install_requires = [ - "fastapi-slim", + "fastapi", "attrs>=23.2.0", "pydantic", "stac_pydantic>=3", diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 56afcbc8..11bd34b4 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -1,10 +1,11 @@ """Core client.""" import logging +from collections import deque from datetime import datetime as datetime_type from datetime import timezone from enum import Enum -from typing import Any, Dict, List, Optional, Set, Type, Union +from typing import Any, Dict, List, Literal, Optional, Set, Type, Union from urllib.parse import unquote_plus, urljoin import attr @@ -41,8 +42,6 @@ logger = logging.getLogger(__name__) -NumType = Union[float, int] - @attr.s class CoreClient(AsyncBaseCoreClient): @@ -907,11 +906,81 @@ def bulk_item_insert( return f"Successfully added {len(processed_items)} Items." +_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { + "id": { + "description": "ID", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/properties/id", + }, + "collection": { + "description": "Collection", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/then/properties/collection", + }, + "geometry": { + "description": "Geometry", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/1/oneOf/0/properties/geometry", + }, + "datetime": { + "description": "Acquisition Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/datetime", + }, + "created": { + "description": "Creation Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/created", + }, + "updated": { + "description": "Creation Timestamp", + "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/updated", + }, + "cloud_cover": { + "description": "Cloud Cover", + "$ref": "https://stac-extensions.github.io/eo/v1.0.0/schema.json#/definitions/fields/properties/eo:cloud_cover", + }, + "cloud_shadow_percentage": { + "title": "Cloud Shadow Percentage", + "description": "Cloud Shadow Percentage", + "type": "number", + "minimum": 0, + "maximum": 100, + }, + "nodata_pixel_percentage": { + "title": "No Data Pixel Percentage", + "description": "No Data Pixel Percentage", + "type": "number", + "minimum": 0, + "maximum": 100, + }, +} + +_ES_MAPPING_TYPE_TO_JSON: Dict[ + str, Literal["string", "number", "boolean", "object", "array", "null"] +] = { + "date": "string", + "date_nanos": "string", + "keyword": "string", + "match_only_text": "string", + "text": "string", + "wildcard": "string", + "byte": "number", + "double": "number", + "float": "number", + "half_float": "number", + "long": "number", + "scaled_float": "number", + "short": "number", + "token_count": "number", + "unsigned_long": "number", + "geo_point": "object", + "geo_shape": "object", + "nested": "array", +} + + @attr.s class EsAsyncBaseFiltersClient(AsyncBaseFiltersClient): """Defines a pattern for implementing the STAC filter extension.""" - # todo: use the ES _mapping endpoint to dynamically find what fields exist + database: BaseDatabaseLogic = attr.ib() + async def get_queryables( self, collection_id: Optional[str] = None, **kwargs ) -> Dict[str, Any]: @@ -932,55 +1001,62 @@ async def get_queryables( Returns: Dict[str, Any]: A dictionary containing the queryables for the given collection. """ - return { + queryables: Dict[str, Any] = { "$schema": "https://json-schema.org/draft/2019-09/schema", "$id": "https://stac-api.example.com/queryables", "type": "object", - "title": "Queryables for Example STAC API", - "description": "Queryable names for the example STAC API Item Search filter.", - "properties": { - "id": { - "description": "ID", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/properties/id", - }, - "collection": { - "description": "Collection", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/then/properties/collection", - }, - "geometry": { - "description": "Geometry", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/1/oneOf/0/properties/geometry", - }, - "datetime": { - "description": "Acquisition Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/datetime", - }, - "created": { - "description": "Creation Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/created", - }, - "updated": { - "description": "Creation Timestamp", - "$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/datetime.json#/properties/updated", - }, - "cloud_cover": { - "description": "Cloud Cover", - "$ref": "https://stac-extensions.github.io/eo/v1.0.0/schema.json#/definitions/fields/properties/eo:cloud_cover", - }, - "cloud_shadow_percentage": { - "description": "Cloud Shadow Percentage", - "title": "Cloud Shadow Percentage", - "type": "number", - "minimum": 0, - "maximum": 100, - }, - "nodata_pixel_percentage": { - "description": "No Data Pixel Percentage", - "title": "No Data Pixel Percentage", - "type": "number", - "minimum": 0, - "maximum": 100, - }, - }, + "title": "Queryables for STAC API", + "description": "Queryable names for the STAC API Item Search filter.", + "properties": _DEFAULT_QUERYABLES, "additionalProperties": True, } + if not collection_id: + return queryables + + properties: Dict[str, Any] = queryables["properties"] + queryables.update( + { + "properties": properties, + "additionalProperties": False, + } + ) + + mapping_data = await self.database.get_items_mapping(collection_id) + mapping_properties = next(iter(mapping_data.values()))["mappings"]["properties"] + stack = deque(mapping_properties.items()) + + while stack: + field_name, field_def = stack.popleft() + + # Iterate over nested fields + field_properties = field_def.get("properties") + if field_properties: + # Fields in Item Properties should be exposed with their un-prefixed names, + # and not require expressions to prefix them with properties, + # e.g., eo:cloud_cover instead of properties.eo:cloud_cover. + if field_name == "properties": + stack.extend(field_properties.items()) + else: + stack.extend( + (f"{field_name}.{k}", v) for k, v in field_properties.items() + ) + + # Skip non-indexed or disabled fields + field_type = field_def.get("type") + if not field_type or not field_def.get("enabled", True): + continue + + # Generate field properties + field_result = _DEFAULT_QUERYABLES.get(field_name, {}) + properties[field_name] = field_result + + field_name_human = field_name.replace("_", " ").title() + field_result.setdefault("title", field_name_human) + + field_type_json = _ES_MAPPING_TYPE_TO_JSON.get(field_type, field_type) + field_result.setdefault("type", field_type_json) + + if field_type in {"date", "date_nanos"}: + field_result.setdefault("format", "date-time") + + return queryables diff --git a/stac_fastapi/core/stac_fastapi/core/database_logic.py b/stac_fastapi/core/stac_fastapi/core/database_logic.py new file mode 100644 index 00000000..7ddd8af7 --- /dev/null +++ b/stac_fastapi/core/stac_fastapi/core/database_logic.py @@ -0,0 +1,226 @@ +"""Database logic core.""" + +import os +from functools import lru_cache +from typing import Any, Dict, List, Optional, Protocol + +from stac_fastapi.types.stac import Item + + +# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, +# So create our own Protocol for typing +# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] +class Geometry(Protocol): # noqa + type: str + coordinates: Any + + +COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") +ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") + +ES_INDEX_NAME_UNSUPPORTED_CHARS = { + "\\", + "/", + "*", + "?", + '"', + "<", + ">", + "|", + " ", + ",", + "#", + ":", +} + +_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE = str.maketrans( + "", "", "".join(ES_INDEX_NAME_UNSUPPORTED_CHARS) +) + +ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" + +DEFAULT_SORT = { + "properties.datetime": {"order": "desc"}, + "id": {"order": "desc"}, + "collection": {"order": "desc"}, +} + +ES_ITEMS_SETTINGS = { + "index": { + "sort.field": list(DEFAULT_SORT.keys()), + "sort.order": [v["order"] for v in DEFAULT_SORT.values()], + } +} + +ES_MAPPINGS_DYNAMIC_TEMPLATES = [ + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + { + "descriptions": { + "match_mapping_type": "string", + "match": "description", + "mapping": {"type": "text"}, + } + }, + { + "titles": { + "match_mapping_type": "string", + "match": "title", + "mapping": {"type": "text"}, + } + }, + # Projection Extension https://github.com/stac-extensions/projection + {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, + { + "proj_projjson": { + "match": "proj:projjson", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "proj_centroid": { + "match": "proj:centroid", + "mapping": {"type": "geo_point"}, + } + }, + { + "proj_geometry": { + "match": "proj:geometry", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "no_index_href": { + "match": "href", + "mapping": {"type": "text", "index": False}, + } + }, + # Default all other strings not otherwise specified to keyword + {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, + {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, +] + +ES_ITEMS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "id": {"type": "keyword"}, + "collection": {"type": "keyword"}, + "geometry": {"type": "geo_shape"}, + "assets": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "properties": { + "type": "object", + "properties": { + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + "datetime": {"type": "date"}, + "start_datetime": {"type": "date"}, + "end_datetime": {"type": "date"}, + "created": {"type": "date"}, + "updated": {"type": "date"}, + # Satellite Extension https://github.com/stac-extensions/sat + "sat:absolute_orbit": {"type": "integer"}, + "sat:relative_orbit": {"type": "integer"}, + }, + }, + }, +} + +ES_COLLECTIONS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "id": {"type": "keyword"}, + "extent.spatial.bbox": {"type": "long"}, + "extent.temporal.interval": {"type": "date"}, + "providers": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "item_assets": {"type": "object", "enabled": False}, + }, +} + + +@lru_cache(256) +def index_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index name. + + Args: + collection_id (str): The collection id to translate into an index name. + + Returns: + str: The index name derived from the collection id. + """ + cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) + return ( + f"{ITEMS_INDEX_PREFIX}{cleaned.lower()}_{collection_id.encode('utf-8').hex()}" + ) + + +@lru_cache(256) +def index_alias_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index alias. + + Args: + collection_id (str): The collection id to translate into an index alias. + + Returns: + str: The index alias derived from the collection id. + """ + cleaned = collection_id.translate(_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE) + return f"{ITEMS_INDEX_PREFIX}{cleaned}" + + +def indices(collection_ids: Optional[List[str]]) -> str: + """ + Get a comma-separated string of index names for a given list of collection ids. + + Args: + collection_ids: A list of collection ids. + + Returns: + A string of comma-separated index names. If `collection_ids` is empty, returns the default indices. + """ + return ( + ",".join(map(index_alias_by_collection_id, collection_ids)) + if collection_ids + else ITEM_INDICES + ) + + +def mk_item_id(item_id: str, collection_id: str) -> str: + """Create the document id for an Item in Elasticsearch. + + Args: + item_id (str): The id of the Item. + collection_id (str): The id of the Collection that the Item belongs to. + + Returns: + str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. + """ + return f"{item_id}|{collection_id}" + + +def mk_actions(collection_id: str, processed_items: List[Item]) -> List[Dict[str, Any]]: + """Create Elasticsearch bulk actions for a list of processed items. + + Args: + collection_id (str): The identifier for the collection the items belong to. + processed_items (List[Item]): The list of processed items to be bulk indexed. + + Returns: + List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, + each action being a dictionary with the following keys: + - `_index`: the index to store the document in. + - `_id`: the document's identifier. + - `_source`: the source of the document. + """ + index_alias = index_alias_by_collection_id(collection_id) + return [ + { + "_index": index_alias, + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/query.py b/stac_fastapi/core/stac_fastapi/core/extensions/query.py index 97342c66..3084cbf8 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/query.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/query.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from enum import auto from types import DynamicClassAttribute -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Dict, Optional from pydantic import BaseModel, root_validator from stac_pydantic.utils import AutoValueEnum @@ -17,8 +17,6 @@ logger = logging.getLogger("uvicorn") logger.setLevel(logging.INFO) -# Be careful: https://github.com/samuelcolvin/pydantic/issues/1423#issuecomment-642797287 -NumType = Union[float, int] class Operator(str, AutoValueEnum): diff --git a/stac_fastapi/core/stac_fastapi/core/models/links.py b/stac_fastapi/core/stac_fastapi/core/models/links.py index 76f0ce5b..f72d4ed4 100644 --- a/stac_fastapi/core/stac_fastapi/core/models/links.py +++ b/stac_fastapi/core/stac_fastapi/core/models/links.py @@ -12,7 +12,7 @@ # These can be inferred from the item/collection, so they aren't included in the database # Instead they are dynamically generated when querying the database using the classes defined below -INFERRED_LINK_RELS = ["self", "item", "parent", "collection", "root"] +INFERRED_LINK_RELS = {"self", "item", "parent", "collection", "root"} def merge_params(url: str, newparams: Dict) -> str: diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 5e6307e7..9510eaa6 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -39,13 +39,15 @@ settings = ElasticsearchSettings() session = Session.create_from_settings(settings) -filter_extension = FilterExtension(client=EsAsyncBaseFiltersClient()) +database_logic = DatabaseLogic() + +filter_extension = FilterExtension( + client=EsAsyncBaseFiltersClient(database=database_logic) +) filter_extension.conformance_classes.append( "http://www.opengis.net/spec/cql2/1.0/conf/advanced-comparison-operators" ) -database_logic = DatabaseLogic() - aggregation_extension = AggregationExtension( client=EsAsyncAggregationClient( database=database_logic, session=session, settings=settings diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 0f272218..c46b208d 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -3,16 +3,30 @@ import asyncio import json import logging -import os from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type import attr from elasticsearch_dsl import Q, Search from starlette.requests import Request from elasticsearch import exceptions, helpers # type: ignore +from stac_fastapi.core.database_logic import ( + COLLECTIONS_INDEX, + DEFAULT_SORT, + ES_COLLECTIONS_MAPPINGS, + ES_ITEMS_MAPPINGS, + ES_ITEMS_SETTINGS, + ITEM_INDICES, + ITEMS_INDEX_PREFIX, + Geometry, + index_alias_by_collection_id, + index_by_collection_id, + indices, + mk_actions, + mk_item_id, +) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon @@ -25,168 +39,6 @@ logger = logging.getLogger(__name__) -NumType = Union[float, int] - -COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") -ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") -ES_INDEX_NAME_UNSUPPORTED_CHARS = { - "\\", - "/", - "*", - "?", - '"', - "<", - ">", - "|", - " ", - ",", - "#", - ":", -} - -ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" - -DEFAULT_SORT = { - "properties.datetime": {"order": "desc"}, - "id": {"order": "desc"}, - "collection": {"order": "desc"}, -} - -ES_ITEMS_SETTINGS = { - "index": { - "sort.field": list(DEFAULT_SORT.keys()), - "sort.order": [v["order"] for v in DEFAULT_SORT.values()], - } -} - -ES_MAPPINGS_DYNAMIC_TEMPLATES = [ - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - { - "descriptions": { - "match_mapping_type": "string", - "match": "description", - "mapping": {"type": "text"}, - } - }, - { - "titles": { - "match_mapping_type": "string", - "match": "title", - "mapping": {"type": "text"}, - } - }, - # Projection Extension https://github.com/stac-extensions/projection - {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, - { - "proj_projjson": { - "match": "proj:projjson", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "proj_centroid": { - "match": "proj:centroid", - "mapping": {"type": "geo_point"}, - } - }, - { - "proj_geometry": { - "match": "proj:geometry", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "no_index_href": { - "match": "href", - "mapping": {"type": "text", "index": False}, - } - }, - # Default all other strings not otherwise specified to keyword - {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, - {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, -] - -ES_ITEMS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "id": {"type": "keyword"}, - "collection": {"type": "keyword"}, - "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "properties": { - "type": "object", - "properties": { - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - "datetime": {"type": "date"}, - "start_datetime": {"type": "date"}, - "end_datetime": {"type": "date"}, - "created": {"type": "date"}, - "updated": {"type": "date"}, - # Satellite Extension https://github.com/stac-extensions/sat - "sat:absolute_orbit": {"type": "integer"}, - "sat:relative_orbit": {"type": "integer"}, - }, - }, - }, -} - -ES_COLLECTIONS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "id": {"type": "keyword"}, - "extent.spatial.bbox": {"type": "long"}, - "extent.temporal.interval": {"type": "date"}, - "providers": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "item_assets": {"type": "object", "enabled": False}, - }, -} - - -def index_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index name. - - Args: - collection_id (str): The collection id to translate into an index name. - - Returns: - str: The index name derived from the collection id. - """ - return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id.lower() if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}_{collection_id.encode('utf-8').hex()}" - - -def index_alias_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index alias. - - Args: - collection_id (str): The collection id to translate into an index alias. - - Returns: - str: The index alias derived from the collection id. - """ - return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}" - - -def indices(collection_ids: Optional[List[str]]) -> str: - """ - Get a comma-separated string of index names for a given list of collection ids. - - Args: - collection_ids: A list of collection ids. - - Returns: - A string of comma-separated index names. If `collection_ids` is None, returns the default indices. - """ - if collection_ids is None or collection_ids == []: - return ITEM_INDICES - else: - return ",".join([index_alias_by_collection_id(c) for c in collection_ids]) - async def create_index_templates() -> None: """ @@ -271,51 +123,6 @@ async def delete_item_index(collection_id: str): await client.close() -def mk_item_id(item_id: str, collection_id: str): - """Create the document id for an Item in Elasticsearch. - - Args: - item_id (str): The id of the Item. - collection_id (str): The id of the Collection that the Item belongs to. - - Returns: - str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. - """ - return f"{item_id}|{collection_id}" - - -def mk_actions(collection_id: str, processed_items: List[Item]): - """Create Elasticsearch bulk actions for a list of processed items. - - Args: - collection_id (str): The identifier for the collection the items belong to. - processed_items (List[Item]): The list of processed items to be bulk indexed. - - Returns: - List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, - each action being a dictionary with the following keys: - - `_index`: the index to store the document in. - - `_id`: the document's identifier. - - `_source`: the source of the document. - """ - return [ - { - "_index": index_alias_by_collection_id(collection_id), - "_id": mk_item_id(item["id"], item["collection"]), - "_source": item, - } - for item in processed_items - ] - - -# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, -# So create our own Protocol for typing -# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] -class Geometry(Protocol): # noqa - type: str - coordinates: Any - - @attr.s class DatabaseLogic: """Database logic.""" @@ -466,7 +273,7 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) except exceptions.NotFoundError: raise NotFoundError( - f"Item {item_id} does not exist in Collection {collection_id}" + f"Item {item_id} does not exist inside Collection {collection_id}" ) return item["_source"] @@ -918,6 +725,24 @@ async def delete_item( f"Item {item_id} in collection {collection_id} not found" ) + async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: + """Get the mapping for the specified collection's items index. + + Args: + collection_id (str): The ID of the collection to get items mapping for. + + Returns: + Dict[str, Any]: The mapping information. + """ + index_name = index_alias_by_collection_id(collection_id) + try: + mapping = await self.client.indices.get_mapping( + index=index_name, allow_no_indices=False + ) + return mapping.body + except exceptions.NotFoundError: + raise NotFoundError(f"Mapping for index {index_name} not found") + async def create_collection(self, collection: Collection, refresh: bool = False): """Create a single collection in the database. @@ -1001,7 +826,7 @@ async def update_collection( "source": {"index": f"{ITEMS_INDEX_PREFIX}{collection_id}"}, "script": { "lang": "painless", - "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", + "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", # noqa: E702 }, }, wait_for_completion=True, diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py index 8be0eafd..90038302 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py @@ -39,13 +39,15 @@ settings = OpensearchSettings() session = Session.create_from_settings(settings) -filter_extension = FilterExtension(client=EsAsyncBaseFiltersClient()) +database_logic = DatabaseLogic() + +filter_extension = FilterExtension( + client=EsAsyncBaseFiltersClient(database=database_logic) +) filter_extension.conformance_classes.append( "http://www.opengis.net/spec/cql2/1.0/conf/advanced-comparison-operators" ) -database_logic = DatabaseLogic() - aggregation_extension = AggregationExtension( client=EsAsyncAggregationClient( database=database_logic, session=session, settings=settings diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 498c9c01..7bb7ac33 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -3,10 +3,9 @@ import asyncio import json import logging -import os from base64 import urlsafe_b64decode, urlsafe_b64encode from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type import attr from opensearchpy import exceptions, helpers @@ -16,6 +15,21 @@ from starlette.requests import Request from stac_fastapi.core import serializers +from stac_fastapi.core.database_logic import ( + COLLECTIONS_INDEX, + DEFAULT_SORT, + ES_COLLECTIONS_MAPPINGS, + ES_ITEMS_MAPPINGS, + ES_ITEMS_SETTINGS, + ITEM_INDICES, + ITEMS_INDEX_PREFIX, + Geometry, + index_alias_by_collection_id, + index_by_collection_id, + indices, + mk_actions, + mk_item_id, +) from stac_fastapi.core.extensions import filter from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon from stac_fastapi.opensearch.config import ( @@ -27,168 +41,6 @@ logger = logging.getLogger(__name__) -NumType = Union[float, int] - -COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") -ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") -ES_INDEX_NAME_UNSUPPORTED_CHARS = { - "\\", - "/", - "*", - "?", - '"', - "<", - ">", - "|", - " ", - ",", - "#", - ":", -} - -ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" - -DEFAULT_SORT = { - "properties.datetime": {"order": "desc"}, - "id": {"order": "desc"}, - "collection": {"order": "desc"}, -} - -ES_ITEMS_SETTINGS = { - "index": { - "sort.field": list(DEFAULT_SORT.keys()), - "sort.order": [v["order"] for v in DEFAULT_SORT.values()], - } -} - -ES_MAPPINGS_DYNAMIC_TEMPLATES = [ - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - { - "descriptions": { - "match_mapping_type": "string", - "match": "description", - "mapping": {"type": "text"}, - } - }, - { - "titles": { - "match_mapping_type": "string", - "match": "title", - "mapping": {"type": "text"}, - } - }, - # Projection Extension https://github.com/stac-extensions/projection - {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, - { - "proj_projjson": { - "match": "proj:projjson", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "proj_centroid": { - "match": "proj:centroid", - "mapping": {"type": "geo_point"}, - } - }, - { - "proj_geometry": { - "match": "proj:geometry", - "mapping": {"type": "object", "enabled": False}, - } - }, - { - "no_index_href": { - "match": "href", - "mapping": {"type": "text", "index": False}, - } - }, - # Default all other strings not otherwise specified to keyword - {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, - {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, -] - -ES_ITEMS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "id": {"type": "keyword"}, - "collection": {"type": "keyword"}, - "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "properties": { - "type": "object", - "properties": { - # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md - "datetime": {"type": "date"}, - "start_datetime": {"type": "date"}, - "end_datetime": {"type": "date"}, - "created": {"type": "date"}, - "updated": {"type": "date"}, - # Satellite Extension https://github.com/stac-extensions/sat - "sat:absolute_orbit": {"type": "integer"}, - "sat:relative_orbit": {"type": "integer"}, - }, - }, - }, -} - -ES_COLLECTIONS_MAPPINGS = { - "numeric_detection": False, - "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, - "properties": { - "id": {"type": "keyword"}, - "extent.spatial.bbox": {"type": "long"}, - "extent.temporal.interval": {"type": "date"}, - "providers": {"type": "object", "enabled": False}, - "links": {"type": "object", "enabled": False}, - "item_assets": {"type": "object", "enabled": False}, - }, -} - - -def index_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index name. - - Args: - collection_id (str): The collection id to translate into an index name. - - Returns: - str: The index name derived from the collection id. - """ - return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id.lower() if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}_{collection_id.encode('utf-8').hex()}" - - -def index_alias_by_collection_id(collection_id: str) -> str: - """ - Translate a collection id into an Elasticsearch index alias. - - Args: - collection_id (str): The collection id to translate into an index alias. - - Returns: - str: The index alias derived from the collection id. - """ - return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}" - - -def indices(collection_ids: Optional[List[str]]) -> str: - """ - Get a comma-separated string of index names for a given list of collection ids. - - Args: - collection_ids: A list of collection ids. - - Returns: - A string of comma-separated index names. If `collection_ids` is None, returns the default indices. - """ - if collection_ids is None or collection_ids == []: - return ITEM_INDICES - else: - return ",".join([index_alias_by_collection_id(c) for c in collection_ids]) - async def create_index_templates() -> None: """ @@ -292,51 +144,6 @@ async def delete_item_index(collection_id: str): await client.close() -def mk_item_id(item_id: str, collection_id: str): - """Create the document id for an Item in Elasticsearch. - - Args: - item_id (str): The id of the Item. - collection_id (str): The id of the Collection that the Item belongs to. - - Returns: - str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. - """ - return f"{item_id}|{collection_id}" - - -def mk_actions(collection_id: str, processed_items: List[Item]): - """Create Elasticsearch bulk actions for a list of processed items. - - Args: - collection_id (str): The identifier for the collection the items belong to. - processed_items (List[Item]): The list of processed items to be bulk indexed. - - Returns: - List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, - each action being a dictionary with the following keys: - - `_index`: the index to store the document in. - - `_id`: the document's identifier. - - `_source`: the source of the document. - """ - return [ - { - "_index": index_alias_by_collection_id(collection_id), - "_id": mk_item_id(item["id"], item["collection"]), - "_source": item, - } - for item in processed_items - ] - - -# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, -# So create our own Protocol for typing -# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] -class Geometry(Protocol): # noqa - type: str - coordinates: Any - - @attr.s class DatabaseLogic: """Database logic.""" @@ -495,7 +302,7 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) except exceptions.NotFoundError: raise NotFoundError( - f"Item {item_id} does not exist in Collection {collection_id}" + f"Item {item_id} does not exist inside Collection {collection_id}" ) return item["_source"] @@ -950,6 +757,24 @@ async def delete_item( f"Item {item_id} in collection {collection_id} not found" ) + async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: + """Get the mapping for the specified collection's items index. + + Args: + collection_id (str): The ID of the collection to get items mapping for. + + Returns: + Dict[str, Any]: The mapping information. + """ + index_name = index_alias_by_collection_id(collection_id) + try: + mapping = await self.client.indices.get_mapping( + index=index_name, params={"allow_no_indices": "false"} + ) + return mapping + except exceptions.NotFoundError: + raise NotFoundError(f"Mapping for index {index_name} not found") + async def create_collection(self, collection: Collection, refresh: bool = False): """Create a single collection in the database. @@ -1033,7 +858,7 @@ async def update_collection( "source": {"index": f"{ITEMS_INDEX_PREFIX}{collection_id}"}, "script": { "lang": "painless", - "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", + "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", # noqa: E702 }, }, wait_for_completion=True, diff --git a/stac_fastapi/tests/rate_limit/test_rate_limit.py b/stac_fastapi/tests/rate_limit/test_rate_limit.py index fd6b5bce..4a7a7da5 100644 --- a/stac_fastapi/tests/rate_limit/test_rate_limit.py +++ b/stac_fastapi/tests/rate_limit/test_rate_limit.py @@ -18,7 +18,7 @@ async def test_rate_limit(app_client_rate_limit: AsyncClient, ctx): except RateLimitExceeded: status_code = 429 - logger.info(f"Request {i+1}: Status code {status_code}") + logger.info(f"Request {i + 1}: Status code {status_code}") assert ( status_code == expected_status_code ), f"Expected status code {expected_status_code}, but got {status_code}" @@ -32,7 +32,7 @@ async def test_rate_limit_no_limit(app_client: AsyncClient, ctx): response = await app_client.get("/collections") status_code = response.status_code - logger.info(f"Request {i+1}: Status code {status_code}") + logger.info(f"Request {i + 1}: Status code {status_code}") assert ( status_code == expected_status_code ), f"Expected status code {expected_status_code}, but got {status_code}"