Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TTL support for ScyllaDB config and FeatureViews #154

Merged
merged 12 commits into from
Nov 8, 2024
19 changes: 19 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,22 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

@property
def online_store_ttl(self) -> Optional[int]:
"""
Retrieves the online store TTL from the FeatureView's tags.

Returns:
An integer representing the TTL in seconds, or None if not set.
"""
ttl_str = self.tags.get("online_store_ttl")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the tag name to online_store_key_ttl_seconds. For easy to remember and align with feature_store.yaml configuration.

if ttl_str:
try:
return int(ttl_str)
except ValueError:
raise ValueError(
f"Invalid online_store_ttl value '{ttl_str}' in tags. It must be an integer representing seconds."
)
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
event_ts TIMESTAMP,
created_ts TIMESTAMP,
PRIMARY KEY ((entity_key), feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC);
) WITH CLUSTERING ORDER BY (feature_name ASC) AND default_time_to_live={ttl};
"""

DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};"
Expand Down Expand Up @@ -159,6 +159,9 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
Table deletion is not currently supported in this mode.
"""

key_ttl_seconds: Optional[StrictInt] = None
"""Default TTL (in seconds) to apply to all tables if not specified in FeatureView."""

class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
"""
Configuration block related to the Cluster's load-balancing policy.
Expand Down Expand Up @@ -566,8 +569,9 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
create_cql = self._get_cql_statement(config, "create", fqtable)
logger.info(f"Creating table {fqtable}.")
ttl = table.online_store_ttl or config.online_store.key_ttl_seconds or 0
create_cql = self._get_cql_statement(config, "create", fqtable, ttl=ttl)
logger.info(f"Creating table {fqtable} with TTL {ttl}.")
session.execute(create_cql)

def _get_cql_statement(
Expand Down
11 changes: 7 additions & 4 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,13 @@ def online_write_batch(

pipe.hset(redis_key_bin, mapping=entity_hset)

if online_store_config.key_ttl_seconds:
pipe.expire(
name=redis_key_bin, time=online_store_config.key_ttl_seconds
)
ttl = (
table.online_store_ttl
or online_store_config.key_ttl_seconds
or None
)
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
if progress:
progress(len(results))
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
from tests.integration.feature_repos.universal.online_store.bigtable import (
BigtableOnlineStoreCreator,
)
from tests.integration.feature_repos.universal.online_store.cassandra import (
CassandraOnlineStoreCreator,
)
from tests.integration.feature_repos.universal.online_store.datastore import (
DatastoreOnlineStoreCreator,
)
Expand Down Expand Up @@ -110,6 +113,21 @@
"instance": os.getenv("BIGTABLE_INSTANCE_ID", "feast-integration-tests"),
}

SCYLLADB_CONFIG = {
"type": "scylladb",
"hosts": [os.getenv("SCYLLADB_HOSTNAME", "")],
"username": os.getenv("SCYLLADB_USERNAME", ""),
"password": os.getenv("SCYLLADB_PASSWORD", ""),
"keyspace": os.getenv("SCYLLADB_KEYSPACE", ""),
"protocol_version": 4,
"load_balancing": {
"load_balancing_policy": "TokenAwarePolicy(DCAwareRoundRobinPolicy)",
"local_dc": "aws-us-west-2",
},
"lazy_table_creation": True,
"key_ttl_seconds": 86400,
}

IKV_CONFIG = {
"type": "ikv",
"account_id": os.getenv("IKV_ACCOUNT_ID", ""),
Expand Down Expand Up @@ -164,6 +182,7 @@
AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None)
AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None)
AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None)
AVAILABLE_ONLINE_STORES["scylladb"] = (SCYLLADB_CONFIG, None)

# Uncomment to test using private IKV account. Currently not enabled as
# there is no dedicated IKV instance for CI testing and there is no
Expand Down Expand Up @@ -214,6 +233,7 @@
"dynamodb": (DYNAMO_CONFIG, DynamoDBOnlineStoreCreator),
"datastore": ("datastore", DatastoreOnlineStoreCreator),
"bigtable": ("bigtable", BigtableOnlineStoreCreator),
"scylladb": (SCYLLADB_CONFIG, CassandraOnlineStoreCreator),
}

for key, replacement in replacements.items():
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,46 @@ def test_update_materialization_intervals():
second_updated_feature_view.materialization_intervals[0][1]
== updated_feature_view.materialization_intervals[0][1]
)


def test_online_store_ttl_retrieval():
# Test when TTL is set as a valid integer in tags
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_with_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_ttl": "3600"},
)
assert feature_view.online_store_ttl == 3600


def test_online_store_ttl_none_when_not_set():
# Test when TTL is not set in tags, expecting None
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_without_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={},
)
assert feature_view.online_store_ttl is None


def test_online_store_ttl_invalid_value():
# Test when TTL is set as a non-integer string, expecting a ValueError
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_invalid_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_ttl": "invalid_ttl"},
)
with pytest.raises(
ValueError,
match="Invalid online_store_ttl value 'invalid_ttl' in tags. It must be an integer representing seconds.",
):
_ = feature_view.online_store_ttl
Loading