Skip to content

Commit

Permalink
fix: add configuration write batch size to avoid timeout issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
piket committed Jul 29, 2024
1 parent b3cd2ca commit 648a237
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
password: str
""" password to connect to Elasticsearch """

write_batch_size: Optional[int] = 40
""" The number of rows to write in a single batch """


class ElasticsearchConnectionManager:
def __init__(self, online_config: RepoConfig):
Expand All @@ -94,6 +97,14 @@ def __exit__(self, exc_type, exc_value, traceback):


class ElasticsearchOnlineStore(OnlineStore):
def _get_bulk_documents(self, index_name, data):
for entity_key, values, timestamp, created_ts in data:
id_val = self._get_value_from_value_proto(entity_key.entity_values[0])
document = {entity_key.join_keys[0]: id_val}
for feature_name, val in values.items():
document[feature_name] = self._get_value_from_value_proto(val)
yield {"_index": index_name, "_id": id_val, "_source": document}

def online_write_batch(
self,
config: RepoConfig,
Expand All @@ -107,20 +118,23 @@ def online_write_batch(
resp = es.indices.exists(index=table.name)
if not resp.body:
self._create_index(es, table)
bulk_documents = []
for entity_key, values, timestamp, created_ts in data:
id_val = self._get_value_from_value_proto(entity_key.entity_values[0])
document = {entity_key.join_keys[0]: id_val}
for feature_name, val in values.items():
document[feature_name] = self._get_value_from_value_proto(val)
bulk_documents.append(
{"_index": table.name, "_id": id_val, "_source": document}
)

successes, errors = helpers.bulk(client=es, actions=bulk_documents)

successes = 0
errors: List[Any] = []
error_count = 0
for i in range(0, len(data), config.online_store.write_batch_size):
batch = data[i : i + config.online_store.write_batch_size]
count, errs = helpers.bulk(client=es, actions=self._get_bulk_documents(table.name, batch))
successes += count
if type(errs) is int:
error_count += errs
elif type(errs) is list:
errors.extend(errs)
logger.info(f"bulk write completed with {successes} successes")
if error_count:
logger.error(f"bulk write encountered {errors} errors")
if errors:
logger.error(f"bulk write return errors: {errors}")
logger.error(f"bulk write returned errors: {errors}")

def online_read(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def repo_config(embedded_elasticsearch):
endpoint=f"http://{embedded_elasticsearch['host']}:{embedded_elasticsearch['port']}",
username=embedded_elasticsearch["username"],
password=embedded_elasticsearch["password"],
write_batch_size=5
),
offline_store=DaskOfflineStoreConfig(),
entity_key_serialization_version=2,
Expand Down

0 comments on commit 648a237

Please sign in to comment.