Skip to content

Commit

Permalink
fix: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zabarn committed Nov 7, 2024
1 parent f756730 commit 56759e6
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 20 deletions.
3 changes: 1 addition & 2 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ def online_store_ttl(self) -> Optional[int]:
ttl_str = self.tags.get("online_store_ttl")
if ttl_str:
try:
ttl_seconds = int(ttl_str)
return ttl_seconds
return int(ttl_str)
except ValueError:
raise ValueError(
f"Invalid online_store_ttl value '{ttl_str}' in tags. It must be an integer representing seconds."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@
event_ts TIMESTAMP,
created_ts TIMESTAMP,
PRIMARY KEY ((entity_key), feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC)
{table_options};
) WITH CLUSTERING ORDER BY (feature_name ASC) AND default_time_to_live={ttl};
"""

DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};"
Expand Down Expand Up @@ -160,7 +159,7 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
Table deletion is not currently supported in this mode.
"""

ttl: Optional[StrictInt] = None
key_ttl_seconds: Optional[StrictInt] = None
"""Default TTL (in seconds) to apply to all tables if not specified in FeatureView."""

class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
Expand Down Expand Up @@ -570,17 +569,8 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)

ttl = (
table.online_store_ttl
if table.online_store_ttl is not None
else config.online_store.ttl
)
table_options = f" AND default_time_to_live = {ttl}" if ttl is not None else ""

create_cql = self._get_cql_statement(
config, "create", fqtable, table_options=table_options
)
ttl = table.online_store_ttl or config.online_store.key_ttl_seconds or 0
create_cql = self._get_cql_statement(config, "create", fqtable, ttl=ttl)
logger.info(f"Creating table {fqtable} with TTL {ttl}.")
session.execute(create_cql)

Expand Down
11 changes: 7 additions & 4 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,13 @@ def online_write_batch(

pipe.hset(redis_key_bin, mapping=entity_hset)

if online_store_config.key_ttl_seconds:
pipe.expire(
name=redis_key_bin, time=online_store_config.key_ttl_seconds
)
ttl = (
table.online_store_ttl
or online_store_config.key_ttl_seconds
or None
)
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
if progress:
progress(len(results))
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
from tests.integration.feature_repos.universal.online_store.bigtable import (
BigtableOnlineStoreCreator,
)
from tests.integration.feature_repos.universal.online_store.cassandra import (
CassandraOnlineStoreCreator,
)
from tests.integration.feature_repos.universal.online_store.datastore import (
DatastoreOnlineStoreCreator,
)
Expand Down Expand Up @@ -110,6 +113,21 @@
"instance": os.getenv("BIGTABLE_INSTANCE_ID", "feast-integration-tests"),
}

SCYLLADB_CONFIG = {
"type": "scylladb",
"hosts": [os.getenv("SCYLLADB_HOSTNAME", "")],
"username": os.getenv("SCYLLADB_USERNAME", ""),
"password": os.getenv("SCYLLADB_PASSWORD", ""),
"keyspace": os.getenv("SCYLLADB_KEYSPACE", ""),
"protocol_version": 4,
"load_balancing": {
"load_balancing_policy": "TokenAwarePolicy(DCAwareRoundRobinPolicy)",
"local_dc": "aws-us-west-2",
},
"lazy_table_creation": True,
"key_ttl_seconds": 86400,
}

IKV_CONFIG = {
"type": "ikv",
"account_id": os.getenv("IKV_ACCOUNT_ID", ""),
Expand Down Expand Up @@ -164,6 +182,7 @@
AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None)
AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None)
AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None)
AVAILABLE_ONLINE_STORES["scylladb"] = (SCYLLADB_CONFIG, None)

# Uncomment to test using private IKV account. Currently not enabled as
# there is no dedicated IKV instance for CI testing and there is no
Expand Down Expand Up @@ -214,6 +233,7 @@
"dynamodb": (DYNAMO_CONFIG, DynamoDBOnlineStoreCreator),
"datastore": ("datastore", DatastoreOnlineStoreCreator),
"bigtable": ("bigtable", BigtableOnlineStoreCreator),
"scylladb": (SCYLLADB_CONFIG, CassandraOnlineStoreCreator),
}

for key, replacement in replacements.items():
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,46 @@ def test_update_materialization_intervals():
second_updated_feature_view.materialization_intervals[0][1]
== updated_feature_view.materialization_intervals[0][1]
)


def test_online_store_ttl_retrieval():
# Test when TTL is set as a valid integer in tags
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_with_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_ttl": "3600"},
)
assert feature_view.online_store_ttl == 3600


def test_online_store_ttl_none_when_not_set():
# Test when TTL is not set in tags, expecting None
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_without_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={},
)
assert feature_view.online_store_ttl is None


def test_online_store_ttl_invalid_value():
# Test when TTL is set as a non-integer string, expecting a ValueError
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_invalid_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_ttl": "invalid_ttl"},
)
with pytest.raises(
ValueError,
match="Invalid online_store_ttl value 'invalid_ttl' in tags. It must be an integer representing seconds.",
):
_ = feature_view.online_store_ttl

0 comments on commit 56759e6

Please sign in to comment.