From f9be43fbdd643db699d617d5b7395e3f7c078e77 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 10 Feb 2025 17:44:31 -0800 Subject: [PATCH 1/2] Serialize query strings to avoid Ray Dataset column imputation --- .../sycamore/connectors/opensearch/opensearch_reader.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py index 0e5ebb431..a24d7a13f 100644 --- a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py +++ b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py @@ -1,3 +1,4 @@ +import json import logging from copy import deepcopy @@ -288,7 +289,7 @@ def __init__( logger.info(f"OpenSearchReader using PIT: {self.use_pit}") @timetrace("OpenSearchReader") - def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: + def _to_parent_doc(self, doc: dict[str, Any]) -> List[dict[str, Any]]: """ Get all parent documents from a given slice. """ @@ -304,6 +305,7 @@ def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: raise ValueError("Target is not present\n" f"Parameters: {self._query_params}\n") os_client = client._client + slice_query = json.loads(doc["doc"]) assert ( get_doc_count_for_slice(os_client, slice_query) < 10000 @@ -341,7 +343,7 @@ def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: logger.info(f"Read {len(results)} documents from {self._query_params.index_name}") except Exception as e: - raise ValueError(f"Error reading from target: {e}") + raise ValueError(f"Error reading from target: {e}, query: {slice_query}") finally: if client is not None: client.close() @@ -547,7 +549,8 @@ def _execute_pit(self, **kwargs) -> "Dataset": } if "query" in query: _query["query"] = query["query"] - docs.append(_query) + + docs.append({"doc": json.dumps(_query)}) logger.debug(f"Added slice {i} to the query {_query}") except Exception as e: raise ValueError(f"Error reading from target: {e}") From c4372fda6e619f995fa7dfcbed82475d5dc26746 Mon Sep 17 00:00:00 2001 From: Austin Lee Date: Mon, 10 Feb 2025 18:05:23 -0800 Subject: [PATCH 2/2] Fix non reconstruct case, ensure all integ tests pass --- .../sycamore/connectors/opensearch/opensearch_reader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py index a24d7a13f..de150337e 100644 --- a/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py +++ b/lib/sycamore/sycamore/connectors/opensearch/opensearch_reader.py @@ -352,7 +352,7 @@ def _to_parent_doc(self, doc: dict[str, Any]) -> List[dict[str, Any]]: return ret @timetrace("OpenSearchReader") - def _to_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: + def _to_doc(self, doc: dict[str, Any]) -> List[dict[str, Any]]: """ Get all documents from a given slice. """ @@ -370,6 +370,7 @@ def _to_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]: raise ValueError("Target is not present\n" f"Parameters: {self._query_params}\n") os_client = client._client + slice_query = json.loads(doc["doc"]) slice_count = get_doc_count_for_slice(os_client, slice_query) assert slice_count <= 10000, f"Slice count ({slice_count}) should return <= 10,000 documents"