Skip to content

Commit

Permalink
Add OpenSearch shard related logging (#1145)
Browse files Browse the repository at this point in the history
* add opensearch.search shard related logging
  • Loading branch information
baitsguy authored Jan 31, 2025
1 parent 0ebf836 commit 1a33a86
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def __init__(self, client: "OpenSearch"):
@classmethod
@requires_modules("opensearchpy", extra="opensearch")
def from_client_params(cls, params: BaseDBReader.ClientParams) -> "OpenSearchReaderClient":
from opensearchpy import OpenSearch
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

assert isinstance(params, OpenSearchReaderClientParams)
client = OpenSearch(**params.os_client_args)
client = OpenSearchClientWithLogging(**params.os_client_args)
return OpenSearchReaderClient(client)

def read_records(self, query_params: BaseDBReader.QueryParams) -> "OpenSearchReaderQueryResponse":
Expand Down Expand Up @@ -276,7 +276,7 @@ def __init__(
):
assert isinstance(
query_params, OpenSearchReaderQueryParams
), f"Wrong kind of query parameters found: {self._query_params}"
), f"Wrong kind of query parameters found: {query_params}"

super().__init__(client_params, query_params, **kwargs)
self._client_params = client_params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ def __init__(self, os_client: "OpenSearch"):
@classmethod
@requires_modules(["opensearchpy", "opensearchpy.helpers"], extra="opensearch")
def from_client_params(cls, params: BaseDBWriter.ClientParams) -> "OpenSearchWriterClient":
from opensearchpy import OpenSearch
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

assert isinstance(
params, OpenSearchWriterClientParams
), f"Provided params was not of type OpenSearchWriterClientParams:\n{params}"
paramsdict = asdict(params)
os_client = OpenSearch(**paramsdict)
os_client = OpenSearchClientWithLogging(**paramsdict)
os_client.ping()
return OpenSearchWriterClient(os_client)

Expand Down
18 changes: 17 additions & 1 deletion lib/sycamore/sycamore/connectors/opensearch/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
from typing import Optional
import logging
from typing import Optional, Any

from opensearchpy import OpenSearch

from sycamore import Context
from sycamore.context import context_params
from sycamore.transforms import Embedder


logger = logging.getLogger("opensearch")


class OpenSearchClientWithLogging(OpenSearch):
def search(self, **kwargs) -> Any:
"""Helper method to execute OpenSearch search queries, and silent errors."""
response = super().search(**kwargs)
shards = response.get("_shards", {})
if shards.get("total") != shards.get("successful"):
logger.error(f"OpenSearch query skipped shards: {response}")
return response


@context_params("opensearch")
def get_knn_query(
text_embedder: Embedder,
Expand Down
5 changes: 3 additions & 2 deletions lib/sycamore/sycamore/query/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import structlog
import yaml
from rich.console import Console

from sycamore.schema import Schema

import sycamore
Expand Down Expand Up @@ -131,7 +132,7 @@ def __init__(
llm: Optional[Union[LLM, str]] = None,
query_plan_strategy: Optional[QueryPlanStrategy] = None,
):
from opensearchpy import OpenSearch
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

self.llm_cache_dir = llm_cache_dir
self.os_config = os_config
Expand All @@ -155,7 +156,7 @@ def __init__(

assert self.context.params, "Could not find required params in Context"
self.os_client_args = self.context.params.get("opensearch", {}).get("os_client_args", os_client_args)
self._os_client = OpenSearch(**self.os_client_args)
self._os_client = OpenSearchClientWithLogging(**self.os_client_args)
self._os_query_executor = OpenSearchQueryExecutor(self.os_client_args)

def get_opensearch_indices(self) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import os
import tempfile

from opensearchpy import OpenSearch
import sycamore
from sycamore.connectors.file.file_scan import JsonManifestMetadataProvider
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.transforms.partition import HtmlPartitioner
Expand Down Expand Up @@ -75,4 +75,4 @@ def test_html_to_opensearch(exec_mode):
ds.write.opensearch(os_client_args=os_client_args, index_name="toyindex", index_settings=index_settings)
finally:
tmp_manifest.close()
OpenSearch(**os_client_args).indices.delete("toyindex")
OpenSearchClientWithLogging(**os_client_args).indices.delete("toyindex")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional, Dict, Any

import pytest
from opensearchpy import OpenSearch

import sycamore
from sycamore import EXEC_LOCAL, ExecMode
Expand All @@ -21,7 +20,9 @@

@pytest.fixture(scope="class")
def os_client():
client = OpenSearch(**TestOpenSearchRead.OS_CLIENT_ARGS)
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

client = OpenSearchClientWithLogging(**TestOpenSearchRead.OS_CLIENT_ARGS)
yield client


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import boto3
from urllib.parse import urlparse
from opensearchpy import OpenSearch

import sycamore
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.context import OperationTypes, ExecMode
from sycamore.functions import HuggingFaceTokenizer
from sycamore.llms import OpenAIModels, OpenAI
Expand Down Expand Up @@ -143,7 +143,7 @@ def test_pdf_to_opensearch_with_llm_caching():
)
ds.write.opensearch()

OpenSearch(**os_client_args).indices.delete("toyindex")
OpenSearchClientWithLogging(**os_client_args).indices.delete("toyindex")

# validate caching

Expand Down
4 changes: 2 additions & 2 deletions lib/sycamore/sycamore/tests/integration/query/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import pytest
from opensearchpy import OpenSearch

import sycamore
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.functions import HuggingFaceTokenizer
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
Expand Down Expand Up @@ -69,7 +69,7 @@ def query_integration_test_index():
index_name=QUERY_INTEGRATION_TEST_INDEX_NAME,
index_settings=index_settings,
)
osc = OpenSearch(**OS_CLIENT_ARGS)
osc = OpenSearchClientWithLogging(**OS_CLIENT_ARGS)
osc.indices.refresh(QUERY_INTEGRATION_TEST_INDEX_NAME)
yield QUERY_INTEGRATION_TEST_INDEX_NAME
osc.indices.delete(QUERY_INTEGRATION_TEST_INDEX_NAME)
5 changes: 2 additions & 3 deletions lib/sycamore/sycamore/tests/integration/query/test_planner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from opensearchpy import OpenSearch

from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.tests.integration.query.conftest import OS_CLIENT_ARGS, OS_CONFIG
from sycamore.query.planner import LlmPlanner
from sycamore.query.schema import OpenSearchSchema, OpenSearchSchemaField
Expand All @@ -10,7 +9,7 @@ def test_simple_llm_planner(query_integration_test_index: str):
Simple test ensuring nodes are being created and dependencies are being set.
Using a simple query here for consistent query plans.
"""
os_client = OpenSearch(OS_CLIENT_ARGS)
os_client = OpenSearchClientWithLogging(OS_CLIENT_ARGS)

schema = OpenSearchSchema(
fields={
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pytest
import sycamore
from sycamore import ExecMode
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging
from sycamore.data.document import OpenSearchQuery
from sycamore.tests.config import TEST_DIR
from sycamore.transforms.embed import SentenceTransformerEmbedder
from sycamore.transforms.partition import UnstructuredPdfPartitioner
from sycamore.transforms.query import OpenSearchQueryExecutor
from opensearchpy import OpenSearch


@pytest.fixture(scope="class")
Expand Down Expand Up @@ -47,7 +47,7 @@ def setup_index():
index_name=TestQueryOpenSearch.INDEX,
index_settings=index_settings,
)
osc = OpenSearch(**TestQueryOpenSearch.OS_CLIENT_ARGS)
osc = OpenSearchClientWithLogging(**TestQueryOpenSearch.OS_CLIENT_ARGS)
osc.indices.refresh(TestQueryOpenSearch.INDEX)
yield TestQueryOpenSearch.INDEX
osc.indices.delete(TestQueryOpenSearch.INDEX)
Expand Down
5 changes: 3 additions & 2 deletions lib/sycamore/sycamore/transforms/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import abstractmethod, ABC
from typing import Any

from sycamore.utils.import_utils import requires_modules

from sycamore.data import OpenSearchQueryResult, Element, OpenSearchQuery
Expand All @@ -26,10 +27,10 @@ def __init__(self, os_client_args: dict) -> None:

@requires_modules("opensearchpy", extra="opensearch")
def query(self, query: OpenSearchQuery) -> OpenSearchQueryResult:
from opensearchpy import OpenSearch
from sycamore.connectors.opensearch.utils import OpenSearchClientWithLogging

logger.debug("Executing OS query: " + str(query))
client = OpenSearch(**self._os_client_args)
client = OpenSearchClientWithLogging(**self._os_client_args)

os_result = client.transport.perform_request(
"POST",
Expand Down

0 comments on commit 1a33a86

Please sign in to comment.