Skip to content

Commit

Permalink
Serialize query strings to avoid Ray Dataset column imputation
Browse files Browse the repository at this point in the history
  • Loading branch information
austintlee committed Feb 11, 2025
1 parent deb175f commit f9be43f
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from copy import deepcopy

Expand Down Expand Up @@ -288,7 +289,7 @@ def __init__(
logger.info(f"OpenSearchReader using PIT: {self.use_pit}")

@timetrace("OpenSearchReader")
def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]:
def _to_parent_doc(self, doc: dict[str, Any]) -> List[dict[str, Any]]:
"""
Get all parent documents from a given slice.
"""
Expand All @@ -304,6 +305,7 @@ def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]:
raise ValueError("Target is not present\n" f"Parameters: {self._query_params}\n")

os_client = client._client
slice_query = json.loads(doc["doc"])

assert (
get_doc_count_for_slice(os_client, slice_query) < 10000
Expand Down Expand Up @@ -341,7 +343,7 @@ def _to_parent_doc(self, slice_query: dict[str, Any]) -> List[dict[str, Any]]:
logger.info(f"Read {len(results)} documents from {self._query_params.index_name}")

except Exception as e:
raise ValueError(f"Error reading from target: {e}")
raise ValueError(f"Error reading from target: {e}, query: {slice_query}")
finally:
if client is not None:
client.close()
Expand Down Expand Up @@ -547,7 +549,8 @@ def _execute_pit(self, **kwargs) -> "Dataset":
}
if "query" in query:
_query["query"] = query["query"]
docs.append(_query)

docs.append({"doc": json.dumps(_query)})
logger.debug(f"Added slice {i} to the query {_query}")
except Exception as e:
raise ValueError(f"Error reading from target: {e}")
Expand Down

0 comments on commit f9be43f

Please sign in to comment.