diff --git a/README.md b/README.md index d4b8754c..e787962b 100644 --- a/README.md +++ b/README.md @@ -36,13 +36,25 @@ ### To install from PyPI: ```shell -pip install stac_fastapi.elasticsearch +pip install stac_fastapi.elasticsearch[server, es] + +``` +if you are using serverless by elasticsearch + +```shell +pip install stac_fastapi.elasticsearch[server, serverless_es] + ``` or ``` pip install stac_fastapi.opensearch ``` +from sources: +``` +RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,serverless_es] + +``` ## Build Elasticsearch API backend ```shell diff --git a/docker-compose.yml b/docker-compose.yml index 23455e2e..15e8dab6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: restart: always build: context: . - dockerfile: dockerfiles/Dockerfile.dev.es + dockerfile: dockerfiles/Dockerfile.dev.serverless.es environment: - STAC_FASTAPI_TITLE=stac-fastapi-elasticsearch - STAC_FASTAPI_DESCRIPTION=A STAC FastAPI with an Elasticsearch backend diff --git a/dockerfiles/Dockerfile.dev.es b/dockerfiles/Dockerfile.dev.es index 009f9681..f14e2f0b 100644 --- a/dockerfiles/Dockerfile.dev.es +++ b/dockerfiles/Dockerfile.dev.es @@ -16,4 +16,4 @@ WORKDIR /app COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core -RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server] +RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,es] diff --git a/dockerfiles/Dockerfile.dev.serverless.es b/dockerfiles/Dockerfile.dev.serverless.es new file mode 100644 index 00000000..20e9672f --- /dev/null +++ b/dockerfiles/Dockerfile.dev.serverless.es @@ -0,0 +1,19 @@ +FROM python:3.10-slim + + +# update apt pkgs, and install build-essential for ciso8601 +RUN apt-get update && \ + apt-get -y upgrade && \ + apt-get install -y build-essential git && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# update certs used by Requests +ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + +WORKDIR /app + +COPY . /app + +RUN pip install --no-cache-dir -e ./stac_fastapi/core +RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,serverless_es] diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 37fc81b3..472e0e24 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -7,12 +7,21 @@ install_requires = [ "stac-fastapi.core==3.0.0", - "elasticsearch[async]==8.11.0", - "elasticsearch-dsl==8.11.0", "uvicorn", "starlette", ] +install_requires_es = [ + "elasticsearch[async]==8.11.0", + "elasticsearch-dsl==8.11.0", +] + +install_requires_serverless_es = [ + "elasticsearch_serverless[async]", + "elasticsearch-dsl", + "requests", +] + extra_reqs = { "dev": [ "pytest", @@ -22,9 +31,12 @@ "requests", "ciso8601", "httpx", + "debugpy", ], "docs": ["mkdocs", "mkdocs-material", "pdocs"], "server": ["uvicorn[standard]==0.19.0"], + "es": install_requires_es, + "serverless_es": install_requires_serverless_es, } setup( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 40318860..fe365cbe 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -5,10 +5,44 @@ from typing import Any, Dict, Set import certifi +import requests + +# RIGHT https://elasticsearch-py.readthedocs.io/en/latest/async.html +# https://elasticsearch-serverless-python.readthedocs.io/en/stable/api.html#module-elasticsearch_serverless +# from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore +from elasticsearch_serverless import AsyncElasticsearch, Elasticsearch -from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore from stac_fastapi.types.config import ApiSettings +# WRONG https://elasticsearch-serverless-python.readthedocs.io/en/latest/api.html#elasticsearch_serverless.client.AsyncSearchClient +# from elasticsearch_serverless.client import AsyncSearchClient + + +def check_serverless_elasticsearch(): + """Check serverless availability.""" + use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true" + scheme = "https" if use_ssl else "http" + + # Configure the hosts parameter with the correct scheme + host = f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}" + + headers = {"Authorization": f"ApiKey {os.getenv('ES_API_KEY')}"} + response = requests.get(host, headers=headers) + if response.ok: + data = response.json() + # Look for specific serverless indicators in the response + if "version" in data and "serverless" == data["version"].get( + "build_flavor", "" + ): + return True, "Serverless Elasticsearch found" + else: + return False, "No serverless indicator found" + else: + return False, "Error accessing Elasticsearch endpoint" + + +serverless, message = check_serverless_elasticsearch() + def _es_config() -> Dict[str, Any]: # Determine the scheme (http or https) @@ -19,9 +53,24 @@ def _es_config() -> Dict[str, Any]: hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"] # Initialize the configuration dictionary + accept = None + if serverless: + accept = "application/vnd.elasticsearch+json; compatible-with=8" + else: + accept = "application/vnd.elasticsearch+json; compatible-with=7" + + headers = {"accept": accept} + + # Handle API key + api_key = os.getenv("ES_API_KEY") + if serverless: + headers.update({"Authorization": f"ApiKey {api_key}"}) + else: + headers.update({"x-api-key": api_key}) + config = { "hosts": hosts, - "headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"}, + "headers": headers, } # Handle API key diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 348b8784..7f1f5e29 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -17,6 +17,7 @@ from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, ) +from stac_fastapi.elasticsearch.config import check_serverless_elasticsearch from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.stac import Collection, Item @@ -24,6 +25,9 @@ NumType = Union[float, int] +serverless, message = check_serverless_elasticsearch() +logger.info(message) + COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") ES_INDEX_NAME_UNSUPPORTED_CHARS = { @@ -180,23 +184,49 @@ async def create_index_templates() -> None: None """ - client = AsyncElasticsearchSettings().create_client - await client.indices.put_template( - name=f"template_{COLLECTIONS_INDEX}", - body={ - "index_patterns": [f"{COLLECTIONS_INDEX}*"], - "mappings": ES_COLLECTIONS_MAPPINGS, - }, - ) - await client.indices.put_template( - name=f"template_{ITEMS_INDEX_PREFIX}", - body={ - "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], - "settings": ES_ITEMS_SETTINGS, - "mappings": ES_ITEMS_MAPPINGS, - }, - ) - await client.close() + # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-component-template.html + # no asynch is provided for index management over serverless + # the body also changes and the name can't start with underscode '_' + if serverless: + client = SyncElasticsearchSettings().create_client + client.indices.put_index_template( + name=f"template_{COLLECTIONS_INDEX}", + body={ + "index_patterns": [f"{COLLECTIONS_INDEX}*"], + "template": { + "mappings": ES_COLLECTIONS_MAPPINGS, + }, + }, + ) + client.indices.put_index_template( + name=f"index_template_{ITEMS_INDEX_PREFIX}", + body={ + "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], + "template": { + "settings": ES_ITEMS_SETTINGS, + "mappings": ES_ITEMS_MAPPINGS, + }, + }, + ) + client.close() + else: + client = AsyncElasticsearchSettings().create_client + await client.indices.put_template( + name=f"template_{COLLECTIONS_INDEX}", + body={ + "index_patterns": [f"{COLLECTIONS_INDEX}*"], + "mappings": ES_COLLECTIONS_MAPPINGS, + }, + ) + await client.indices.put_template( + name=f"template_{ITEMS_INDEX_PREFIX}", + body={ + "index_patterns": [f"{ITEMS_INDEX_PREFIX}*"], + "settings": ES_ITEMS_SETTINGS, + "mappings": ES_ITEMS_MAPPINGS, + }, + ) + await client.close() async def create_collection_index() -> None: @@ -207,13 +237,23 @@ async def create_collection_index() -> None: None """ - client = AsyncElasticsearchSettings().create_client + index = f"{COLLECTIONS_INDEX}-000001" + if serverless: + client = SyncElasticsearchSettings().create_client - await client.options(ignore_status=400).indices.create( - index=f"{COLLECTIONS_INDEX}-000001", - aliases={COLLECTIONS_INDEX: {}}, - ) - await client.close() + client.options(ignore_status=400).indices.create( + index=index, + aliases={COLLECTIONS_INDEX: {}}, + ) + client.close() + else: + client = AsyncElasticsearchSettings().create_client + + await client.options(ignore_status=400).indices.create( + index=index, + aliases={COLLECTIONS_INDEX: {}}, + ) + await client.close() async def create_item_index(collection_id: str): @@ -227,14 +267,25 @@ async def create_item_index(collection_id: str): None """ - client = AsyncElasticsearchSettings().create_client index_name = index_by_collection_id(collection_id) + index = f"{index_by_collection_id(collection_id)}-000001" - await client.options(ignore_status=400).indices.create( - index=f"{index_by_collection_id(collection_id)}-000001", - aliases={index_name: {}}, - ) - await client.close() + if serverless: + client = SyncElasticsearchSettings().create_client + + client.options(ignore_status=400).indices.create( + index=index, + aliases={index_name: {}}, + ) + client.close() + else: + client = AsyncElasticsearchSettings().create_client + + await client.options(ignore_status=400).indices.create( + index=index, + aliases={index_name: {}}, + ) + await client.close() async def delete_item_index(collection_id: str): @@ -243,17 +294,32 @@ async def delete_item_index(collection_id: str): Args: collection_id (str): The ID of the collection whose items index will be deleted. """ - client = AsyncElasticsearchSettings().create_client - name = index_by_collection_id(collection_id) - resolved = await client.indices.resolve_index(name=name) - if "aliases" in resolved and resolved["aliases"]: - [alias] = resolved["aliases"] - await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) - await client.indices.delete(index=alias["indices"]) + + if serverless: + client = SyncElasticsearchSettings().create_client + + resolved = client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + client.indices.delete(index=alias["indices"]) + else: + client.indices.delete(index=name) + client.close() else: - await client.indices.delete(index=name) - await client.close() + client = AsyncElasticsearchSettings().create_client + + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias( + index=alias["indices"], name=alias["name"] + ) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) + await client.close() def mk_item_id(item_id: str, collection_id: str): @@ -329,20 +395,20 @@ async def get_all_collections( Returns: A tuple of (collections, next pagination token if any). """ + body = { + "sort": [{"id": {"order": "asc"}}], + "size": limit, + } search_after = None if token: search_after = [token] + body["search_after"] = search_after - response = await self.client.search( - index=COLLECTIONS_INDEX, - body={ - "sort": [{"id": {"order": "asc"}}], - "size": limit, - "search_after": search_after, - }, - ) - - hits = response["hits"]["hits"] + response = await self.client.search(index=COLLECTIONS_INDEX, body=body) + if serverless: + hits = response.body["hits"]["hits"] + else: + hits = response["hits"]["hits"] collections = [ self.collection_serializer.db_to_stac( collection=hit["_source"], request=request, extensions=self.extensions