Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Added `PUT /catalogs/{catalog_id}` endpoint to update existing catalogs. Allows modification of catalog metadata (title, description, etc.) while preserving internal fields like parent_ids and catalog relationships. [#573](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/673)
- Added catalog poly-hierarchy support with hierarchical catalog endpoints (`GET /catalogs/{catalog_id}/catalogs` and `POST /catalogs/{catalog_id}/catalogs`), enabling unlimited nesting levels and allowing catalogs to belong to multiple parent catalogs simultaneously. Includes cursor-based pagination and performance optimizations. [#573](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/573)
- Added end_datetime alias for datetime-based indexes with use_datetime=false, so that start_datetime/end_datetime queries select a smaller range of indexes (limiting the end)

### Fixed

Expand Down
25 changes: 25 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.13-slim

RUN apt-get update && apt-get install -y \
build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY README.md .
COPY stac_fastapi/core/pyproject.toml stac_fastapi/core/
COPY stac_fastapi/sfeos_helpers/pyproject.toml stac_fastapi/sfeos_helpers/
COPY stac_fastapi/opensearch/pyproject.toml stac_fastapi/opensearch/

RUN pip install --no-cache-dir --upgrade pip setuptools wheel

COPY stac_fastapi/ stac_fastapi/

RUN pip install --no-cache-dir ./stac_fastapi/core[redis]
RUN pip install --no-cache-dir ./stac_fastapi/sfeos_helpers
RUN pip install --no-cache-dir ./stac_fastapi/opensearch[server,redis]

EXPOSE 8080

CMD ["uvicorn", "stac_fastapi.opensearch.app:app", "--host", "0.0.0.0", "--port", "8080"]
20 changes: 11 additions & 9 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,18 +546,20 @@ async def post_all_collections(
return await self.all_collections(
limit=search_request.limit if hasattr(search_request, "limit") else None,
bbox=search_request.bbox if hasattr(search_request, "bbox") else None,
datetime=search_request.datetime
if hasattr(search_request, "datetime")
else None,
datetime=(
search_request.datetime if hasattr(search_request, "datetime") else None
),
token=search_request.token if hasattr(search_request, "token") else None,
fields=fields,
sortby=sortby,
filter_expr=search_request.filter
if hasattr(search_request, "filter")
else None,
filter_lang=search_request.filter_lang
if hasattr(search_request, "filter_lang")
else None,
filter_expr=(
search_request.filter if hasattr(search_request, "filter") else None
),
filter_lang=(
search_request.filter_lang
if hasattr(search_request, "filter_lang")
else None
),
query=search_request.query if hasattr(search_request, "query") else None,
q=search_request.q if hasattr(search_request, "q") else None,
request=request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from stac_fastapi.sfeos_helpers.search_engine import (
BaseIndexInserter,
BaseIndexSelector,
DatetimeIndexInserter,
IndexInsertionFactory,
IndexSelectorFactory,
)
Expand Down Expand Up @@ -667,7 +668,7 @@ def apply_datetime_filter(
),
],
)
return search.query(filter_query), datetime_search
return search.query(filter_query), datetime_search

@staticmethod
def apply_bbox_filter(search: Search, bbox: List):
Expand Down Expand Up @@ -822,7 +823,7 @@ async def execute_search(
token: Optional[str],
sort: Optional[Dict[str, Dict[str, str]]],
collection_ids: Optional[List[str]],
datetime_search: Dict[str, Optional[str]],
datetime_search: str,
ignore_unavailable: bool = True,
) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]:
"""Execute a search query with limit and other optional parameters.
Expand All @@ -833,7 +834,7 @@ async def execute_search(
token (Optional[str]): The token used to return the next set of results.
sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted.
collection_ids (Optional[List[str]]): The collection ids to search.
datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection.
datetime_search (str): Datetime used for index selection.
ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True.

Returns:
Expand Down Expand Up @@ -933,7 +934,7 @@ async def aggregate(
geometry_geohash_grid_precision: int,
geometry_geotile_grid_precision: int,
datetime_frequency_interval: str,
datetime_search,
datetime_search: str,
ignore_unavailable: Optional[bool] = True,
):
"""Return aggregations of STAC Items."""
Expand Down Expand Up @@ -1114,19 +1115,25 @@ def bulk_sync_prep_create_item(
raise NotFoundError(f"Collection {item['collection']} does not exist")

# Check if the item already exists in the database
if not exist_ok and self.sync_client.exists(
index=index_alias_by_collection_id(item["collection"]),
id=mk_item_id(item["id"], item["collection"]),
):
error_message = (
f"Item {item['id']} in collection {item['collection']} already exists."
)
if self.sync_settings.raise_on_bulk_error:
raise ConflictError(error_message)
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
)
alias = index_alias_by_collection_id(item["collection"])
doc_id = mk_item_id(item["id"], item["collection"])

if not exist_ok:
alias_exists = self.sync_client.indices.exists_alias(name=alias)

if alias_exists:
alias_info = self.sync_client.indices.get_alias(name=alias)
indices = list(alias_info.keys())

for index in indices:
if self.sync_client.exists(index=index, id=doc_id):
error_message = f"Item {item['id']} in collection {item['collection']} already exists."
if self.sync_settings.raise_on_bulk_error:
raise ConflictError(error_message)
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
)

# Serialize the item into a database-compatible format
prepped_item = self.item_serializer.stac_to_db(item, base_url)
Expand Down Expand Up @@ -1171,6 +1178,31 @@ async def create_item(
f"Creating item {item_id} in collection {collection_id} with refresh={refresh}"
)

if exist_ok and isinstance(self.async_index_inserter, DatetimeIndexInserter):
existing_item = await self.get_one_item(collection_id, item_id)
primary_datetime_name = self.async_index_inserter.primary_datetime_name

existing_primary_datetime = existing_item.get("properties", {}).get(
primary_datetime_name
)
new_primary_datetime = item.get("properties", {}).get(primary_datetime_name)

if existing_primary_datetime != new_primary_datetime:
self.async_index_inserter.validate_datetime_field_update(
f"properties/{primary_datetime_name}"
)

if primary_datetime_name == "start_datetime":
existing_end_datetime = existing_item.get("properties", {}).get(
"end_datetime"
)
new_end_datetime = item.get("properties", {}).get("end_datetime")

if existing_end_datetime != new_end_datetime:
self.async_index_inserter.validate_datetime_field_update(
"properties/end_datetime"
)

# Prepare the item for insertion
item = await self.async_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
Expand Down Expand Up @@ -1239,6 +1271,10 @@ async def json_patch_item(
Returns:
patched item.
"""
for operation in operations:
if operation.op in ["add", "replace", "remove"]:
self.async_index_inserter.validate_datetime_field_update(operation.path)

new_item_id = None
new_collection_id = None
script_operations = []
Expand All @@ -1259,8 +1295,6 @@ async def json_patch_item(
else:
script_operations.append(operation)

script = operations_to_script(script_operations, create_nest=create_nest)

try:
search_response = await self.client.search(
index=index_alias_by_collection_id(collection_id),
Expand All @@ -1273,13 +1307,18 @@ async def json_patch_item(
raise NotFoundError(
f"Item {item_id} does not exist inside Collection {collection_id}"
)
document_index = search_response["hits"]["hits"][0]["_index"]
await self.client.update(
index=document_index,
id=mk_item_id(item_id, collection_id),
script=script,
refresh=True,
)

if script_operations:
script = operations_to_script(
script_operations, create_nest=create_nest
)
document_index = search_response["hits"]["hits"][0]["_index"]
await self.client.update(
index=document_index,
id=mk_item_id(item_id, collection_id),
script=script,
refresh=True,
)
except ESNotFoundError:
raise NotFoundError(
f"Item {item_id} does not exist inside Collection {collection_id}"
Expand All @@ -1292,34 +1331,16 @@ async def json_patch_item(
item = await self.get_one_item(collection_id, item_id)

if new_collection_id:
await self.client.reindex(
body={
"dest": {
"index": f"{ITEMS_INDEX_PREFIX}{new_collection_id}"
}, # # noqa
"source": {
"index": f"{ITEMS_INDEX_PREFIX}{collection_id}",
"query": {"term": {"id": {"value": item_id}}},
},
"script": {
"lang": "painless",
"source": (
f"""ctx._id = ctx._id.replace('{collection_id}', '{new_collection_id}');""" # noqa
f"""ctx._source.collection = '{new_collection_id}';""" # noqa
),
},
},
wait_for_completion=True,
refresh=True,
)
item["collection"] = new_collection_id
item = await self.async_prep_create_item(item=item, base_url=base_url)
await self.create_item(item=item, refresh=True)

await self.delete_item(
item_id=item_id,
collection_id=collection_id,
refresh=refresh,
)

item["collection"] = new_collection_id
collection_id = new_collection_id

if new_item_id:
Expand Down Expand Up @@ -1705,6 +1726,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any):
index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh
)
await delete_item_index(collection_id)
await self.async_index_inserter.refresh_cache()

async def bulk_async(
self,
Expand Down
Loading