Skip to content

Commit 5076bd6

Browse files
author
Bhargav Dodla
committed
fix: Added sleep to test rate limiting
1 parent 51dfd59 commit 5076bd6

File tree

2 files changed

+20
-231
lines changed

2 files changed

+20
-231
lines changed

sdk/python/feast/infra/contrib/spark_kafka_processor.py

Lines changed: 3 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
1-
import time
2-
from datetime import datetime
31
from types import MethodType
42
from typing import List, Optional, Set, Union, no_type_check
53

64
import pandas as pd
7-
import pyarrow
85
from pyspark import SparkContext
96
from pyspark.sql import DataFrame, SparkSession
107
from pyspark.sql.avro.functions import from_avro
118
from pyspark.sql.column import Column, _to_java_column
12-
from pyspark.sql.functions import col, from_json, udf
9+
from pyspark.sql.functions import col, from_json
1310
from pyspark.sql.streaming import StreamingQuery
14-
from pyspark.sql.types import BinaryType
1511

1612
from feast import FeatureView
1713
from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat
@@ -27,7 +23,6 @@
2723
)
2824
from feast.infra.provider import get_provider
2925
from feast.stream_feature_view import StreamFeatureView
30-
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
3126

3227

3328
class SparkProcessorConfig(ProcessorConfig):
@@ -142,10 +137,7 @@ def ingest_stream_feature_view(
142137
self._create_infra_if_necessary()
143138
ingested_stream_df = self._ingest_stream_data()
144139
transformed_df = self._construct_transformation_plan(ingested_stream_df)
145-
if self.fs.config.provider == "expedia":
146-
online_store_query = self._write_stream_data_expedia(transformed_df, to)
147-
else:
148-
online_store_query = self._write_stream_data(transformed_df, to)
140+
online_store_query = self._write_stream_data(transformed_df, to)
149141
return online_store_query
150142

151143
# In the line 116 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 80).
@@ -247,208 +239,17 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
247239

248240
if len(drop_list) > 0:
249241
print(
250-
f"INFO!!! Dropping extra columns in the DataFrame: {drop_list}. Avoid unnecessary columns in the dataframe."
242+
f"INFO: Dropping extra columns in the DataFrame: {drop_list}. Avoid unnecessary columns in the dataframe."
251243
)
252244
return df.drop(*drop_list)
253245
else:
254246
raise Exception(f"Stream source is not defined for {self.sfv.name}")
255247
elif isinstance(self.sfv, StreamFeatureView):
256248
return self.sfv.udf.__call__(df) if self.sfv.udf else df
257249

258-
def _write_stream_data_expedia(self, df: StreamTable, to: PushMode):
259-
"""
260-
Ensures materialization logic in sync with stream ingestion.
261-
Support only write to online store. No support for preprocess_fn also.
262-
In Spark 3.2.2, toPandas() is throwing error when the dataframe has Boolean columns.
263-
To fix this error, we need spark 3.4.0 or numpy < 1.20.0 but feast needs numpy >= 1.22.
264-
Switching to use mapInPandas to solve the problem for boolean columns and
265-
toPandas() also load all data into driver's memory.
266-
Error Message:
267-
AttributeError: module 'numpy' has no attribute 'bool'.
268-
`np.bool` was a deprecated alias for the builtin `bool`.
269-
To avoid this error in existing code, use `bool` by itself.
270-
Doing this will not modify any behavior and is safe.
271-
If you specifically wanted the numpy scalar type, use `np.bool_` here.
272-
"""
273-
274-
# TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method
275-
276-
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
277-
def batch_write_pandas_df(
278-
iterator, spark_serialized_artifacts, join_keys, batch_id
279-
):
280-
current_datetime_with_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
281-
:-3
282-
]
283-
print(f"{current_datetime_with_ms} Started processing batch {batch_id}")
284-
for pdf in iterator:
285-
(
286-
feature_view,
287-
online_store,
288-
repo_config,
289-
) = spark_serialized_artifacts.unserialize()
290-
291-
if isinstance(feature_view, StreamFeatureView):
292-
ts_field = feature_view.timestamp_field
293-
else:
294-
ts_field = feature_view.stream_source.timestamp_field
295-
296-
# Extract the latest feature values for each unique entity row (i.e. the join keys).
297-
pdf = (
298-
pdf.sort_values(by=[*join_keys, ts_field], ascending=False)
299-
.groupby(join_keys)
300-
.nth(0)
301-
)
302-
303-
table = pyarrow.Table.from_pandas(pdf)
304-
current_datetime_with_ms = datetime.now().strftime(
305-
"%Y-%m-%d %H:%M:%S.%f"
306-
)[:-3]
307-
print(
308-
f"{current_datetime_with_ms} Started processing _run_pyarrow_field_mapping {batch_id}"
309-
)
310-
if feature_view.batch_source.field_mapping is not None:
311-
table = _run_pyarrow_field_mapping(
312-
table, feature_view.batch_source.field_mapping
313-
)
314-
315-
join_key_to_value_type = {
316-
entity.name: entity.dtype.to_value_type()
317-
for entity in feature_view.entity_columns
318-
}
319-
current_datetime_with_ms = datetime.now().strftime(
320-
"%Y-%m-%d %H:%M:%S.%f"
321-
)[:-3]
322-
print(
323-
f"{current_datetime_with_ms} Started processing _convert_arrow_to_proto {batch_id}"
324-
)
325-
rows_to_write = _convert_arrow_to_proto(
326-
table, feature_view, join_key_to_value_type
327-
)
328-
online_store.online_write_batch(
329-
repo_config,
330-
feature_view,
331-
rows_to_write,
332-
lambda x: None,
333-
)
334-
# data_list = online_store.online_write_batch_connector(
335-
# repo_config,
336-
# feature_view,
337-
# rows_to_write,
338-
# lambda x: None,
339-
# )
340-
341-
# keyspace = repo_config.online_store.keyspace
342-
343-
# fqtable = CassandraOnlineStore._fq_table_name(
344-
# keyspace, repo_config.project, feature_view
345-
# )
346-
347-
# schema = StructType(
348-
# [
349-
# StructField("feature_name", StringType(), False),
350-
# StructField("value", BinaryType(), True),
351-
# StructField("entity_key", StringType(), False),
352-
# StructField("event_ts", TimestampType(), True),
353-
# ]
354-
# )
355-
356-
# df = self.spark.createDataFrame(
357-
# data_list,
358-
# schema=schema,
359-
# )
360-
361-
# df.write.format("org.apache.spark.sql.cassandra").mode(
362-
# "append"
363-
# ).options(table=fqtable, keyspace=keyspace).save()
364-
365-
yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result
366-
367-
def batch_write(
368-
sdf: DataFrame,
369-
batch_id: int,
370-
spark_serialized_artifacts,
371-
join_keys,
372-
feature_view,
373-
):
374-
start_time = time.time()
375-
current_datetime_with_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
376-
:-3
377-
]
378-
print(f"{current_datetime_with_ms} Started batch write..")
379-
sdf.mapInPandas(
380-
lambda x: batch_write_pandas_df(
381-
x, spark_serialized_artifacts, join_keys, batch_id
382-
),
383-
"status int",
384-
).count() # dummy action to force evaluation
385-
current_datetime_with_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
386-
:-3
387-
]
388-
print(
389-
f"{current_datetime_with_ms} Time taken to write batch {batch_id} is: {(time.time() - start_time) * 1000:.2f} ms"
390-
)
391-
392-
def batch_write_with_connector(
393-
sdf: DataFrame,
394-
batch_id: int,
395-
):
396-
start_time = time.time()
397-
convert_to_blob = udf(lambda s: s.encode("utf-8"), BinaryType())
398-
sdf = sdf.withColumn("value", convert_to_blob(col("feature_value"))).drop(
399-
"event_header",
400-
"feature_value",
401-
)
402-
sdf.write.format("org.apache.spark.sql.cassandra").mode("append").options(
403-
table="mlpfs_scylladb_perf_test_cc_stream_fv", keyspace="feast"
404-
).save()
405-
print(
406-
f"Time taken to write batch {batch_id} is: {(time.time() - start_time) * 1000:.2f} ms"
407-
)
408-
409-
query = None
410-
if self.sfv.name != "cc_stream_fv":
411-
query = (
412-
df.writeStream.outputMode("update")
413-
.option("checkpointLocation", self.checkpoint_location)
414-
.trigger(processingTime=self.processing_time)
415-
.foreachBatch(
416-
lambda df, batch_id: batch_write(
417-
df,
418-
batch_id,
419-
self.spark_serialized_artifacts,
420-
self.join_keys,
421-
self.sfv,
422-
)
423-
)
424-
.start()
425-
)
426-
else:
427-
query = (
428-
df.writeStream.outputMode("update")
429-
.option("checkpointLocation", self.checkpoint_location)
430-
.trigger(processingTime=self.processing_time)
431-
.foreachBatch(
432-
lambda df, batch_id: batch_write_with_connector(
433-
df,
434-
batch_id,
435-
)
436-
)
437-
.start()
438-
)
439-
440-
query.awaitTermination(timeout=self.query_timeout)
441-
return query
442-
443250
def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery:
444251
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
445252
def batch_write(row: DataFrame, batch_id: int):
446-
current_datetime_with_ms = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[
447-
:-3
448-
]
449-
print(
450-
f"{current_datetime_with_ms} Started batch write for batch_id: {batch_id}"
451-
)
452253
rows: pd.DataFrame = row.toPandas()
453254

454255
# Extract the latest feature values for each unique entity row (i.e. the join keys).
@@ -463,7 +264,6 @@ def batch_write(row: DataFrame, batch_id: int):
463264
.nth(0)
464265
)
465266
# Created column is not used anywhere in the code, but it is added to the dataframe.
466-
# Expedia provider drops the unused columns from dataframe
467267
# Commenting this out as it is not used anywhere in the code
468268
# rows["created"] = pd.to_datetime("now", utc=True)
469269

@@ -477,19 +277,7 @@ def batch_write(row: DataFrame, batch_id: int):
477277
# Finally persist the data to the online store and/or offline store.
478278
if rows.size > 0:
479279
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
480-
current_datetime_with_ms = datetime.now().strftime(
481-
"%Y-%m-%d %H:%M:%S.%f"
482-
)[:-3]
483-
print(
484-
f"{current_datetime_with_ms} Started write_to_online_store for batch_id: {batch_id}"
485-
)
486280
self.fs.write_to_online_store(self.sfv.name, rows)
487-
current_datetime_with_ms = datetime.now().strftime(
488-
"%Y-%m-%d %H:%M:%S.%f"
489-
)[:-3]
490-
print(
491-
f"{current_datetime_with_ms} Completed write_to_online_store for batch_id: {batch_id}"
492-
)
493281
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
494282
self.fs.write_to_offline_store(self.sfv.name, rows)
495283

sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"""
2020

2121
import logging
22+
import time
2223
from datetime import datetime
2324
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
2425

@@ -352,6 +353,7 @@ def online_write_batch(
352353
display progress.
353354
"""
354355
write_concurrency = config.online_store.write_concurrency
356+
write_limit = 1.0 / write_concurrency
355357
project = config.project
356358
ttl = (
357359
table.online_store_key_ttl_seconds
@@ -386,39 +388,38 @@ def online_write_batch(
386388
)
387389
batch.add(insert_cql, params)
388390
futures.append(session.execute_async(batch))
391+
time.sleep(write_limit)
389392

390393
# TODO: Make this efficient by leveraging continuous writes rather
391394
# than blocking until all writes are done. We may need to rate limit
392395
# the writes to reduce the impact on read performance.
393396
if len(futures) >= write_concurrency:
394397
# Raises exception if at least one of the batch fails
395-
try:
396-
for future in futures:
397-
future.result()
398-
futures = []
399-
except Exception as exc:
400-
logger.error(f"Error writing a batch: {exc}")
401-
print(f"Error writing a batch: {exc}")
402-
raise Exception("Error writing a batch") from exc
398+
self._wait_for_futures(futures)
399+
futures.clear()
403400

404401
# this happens N-1 times, will be corrected outside:
405402
if progress:
406403
progress(1)
407404

408405
if len(futures) > 0:
409-
try:
410-
for future in futures:
411-
future.result()
412-
futures = []
413-
except Exception as exc:
414-
logger.error(f"Error writing a batch: {exc}")
415-
print(f"Error writing a batch: {exc}")
416-
raise Exception("Error writing a batch") from exc
406+
self._wait_for_futures(futures)
407+
futures.clear()
417408

418409
# correction for the last missing call to `progress`:
419410
if progress:
420411
progress(1)
421412

413+
def _wait_for_futures(self, futures):
414+
try:
415+
for future in futures:
416+
future.result()
417+
futures = []
418+
except Exception as exc:
419+
logger.error(f"Error writing a batch: {exc}")
420+
print(f"Error writing a batch: {exc}")
421+
raise Exception("Error writing a batch") from exc
422+
422423
def online_read(
423424
self,
424425
config: RepoConfig,

0 commit comments

Comments
 (0)