From 9e31cb4206b707f2ce680def7b55e951bd21f53e Mon Sep 17 00:00:00 2001 From: omiranda Date: Thu, 30 Jan 2025 15:15:01 -0600 Subject: [PATCH 1/2] fix: Reuse online store for materialization writes --- .../contrib/spark/spark_materialization_engine.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index cf30a42d729..6fa4a987c9c 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -233,6 +233,7 @@ def _map_by_partition( iterator, spark_serialized_artifacts: _SparkSerializedArtifacts, ): + feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() """Load pandas df to online store""" for pdf in iterator: pdf_row_count = pdf.shape[0] @@ -240,17 +241,10 @@ def _map_by_partition( # convert to pyarrow table if pdf_row_count == 0: print("INFO!!! Dataframe has 0 records to process") - return + continue table = pyarrow.Table.from_pandas(pdf) - # unserialize artifacts - ( - feature_view, - online_store, - repo_config, - ) = spark_serialized_artifacts.unserialize() - if feature_view.batch_source.field_mapping is not None: # Spark offline store does the field mapping during pull_latest_from_table_or_query # This is for the case where the offline store is not spark From 891ab1207e8f1c1ad56c908619b1860c023fbf86 Mon Sep 17 00:00:00 2001 From: omiranda Date: Fri, 31 Jan 2025 14:25:03 -0600 Subject: [PATCH 2/2] revert to returning on empty pdf --- .../contrib/spark/spark_materialization_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 6fa4a987c9c..320ccc6161d 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -241,7 +241,7 @@ def _map_by_partition( # convert to pyarrow table if pdf_row_count == 0: print("INFO!!! Dataframe has 0 records to process") - continue + return table = pyarrow.Table.from_pandas(pdf)