From 118f9473286b66a20cbeac65870fc56555577bd8 Mon Sep 17 00:00:00 2001 From: Jose Acevedo Date: Thu, 5 Dec 2024 20:20:20 -0500 Subject: [PATCH 01/13] Implement key batching for cassandra online store in go --- .../feast/onlinestore/cassandraonlinestore.go | 100 ++++++++++-------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 897b2e13a61..3ac9ff2f9d7 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "math" "os" "strings" "sync" @@ -209,16 +210,22 @@ func (c *CassandraOnlineStore) getFqTableName(tableName string) string { return fmt.Sprintf(`"%s"."%s_%s"`, c.clusterConfigs.Keyspace, c.project, tableName) } -func (c *CassandraOnlineStore) getCQLStatement(tableName string, featureNames []string) string { +func (c *CassandraOnlineStore) getCQLStatement(tableName string, featureNames []string, nkeys int) string { // this prevents fetching unnecessary features quotedFeatureNames := make([]string, len(featureNames)) for i, featureName := range featureNames { quotedFeatureNames[i] = fmt.Sprintf(`'%s'`, featureName) } + keyPlaceholders := make([]string, nkeys) + for i := 0; i < nkeys; i++ { + keyPlaceholders[i] = "?" + } + return fmt.Sprintf( - `SELECT "entity_key", "feature_name", "event_ts", "value" FROM %s WHERE "entity_key" = ? AND "feature_name" IN (%s)`, + `SELECT "entity_key", "feature_name", "event_ts", "value" FROM %s WHERE "entity_key" IN (%s) AND "feature_name" IN (%s)`, tableName, + strings.Join(keyPlaceholders, ","), strings.Join(quotedFeatureNames, ","), ) } @@ -265,37 +272,34 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ // Prepare the query tableName := c.getFqTableName(featureViewName) - cqlStatement := c.getCQLStatement(tableName, featureNames) - var waitGroup sync.WaitGroup - waitGroup.Add(len(serializedEntityKeys)) + // do batching + nKeys := len(serializedEntityKeys) + batchSize := 2 + nBatches := int(math.Ceil(float64(nKeys) / float64(batchSize))) + + batches := make([][]any, nBatches) + nAssigned := 0 + for i := 0; i < nBatches; i++ { + thisBatchSize := int(math.Min(float64(batchSize), float64(nKeys-nAssigned))) + nAssigned += thisBatchSize + batches[i] = make([]any, thisBatchSize) + for j := 0; j < thisBatchSize; j++ { + batches[i][j] = serializedEntityKeys[i*batchSize+j] + } + } - errorsChannel := make(chan error, len(serializedEntityKeys)) - for _, serializedEntityKey := range serializedEntityKeys { - go func(serEntityKey any) { - defer waitGroup.Done() + var waitGroup sync.WaitGroup + waitGroup.Add(nBatches) - iter := c.session.Query(cqlStatement, serEntityKey).WithContext(ctx).Iter() + errorsChannel := make(chan error, nBatches) - rowIdx := serializedEntityKeyToIndex[serializedEntityKey.(string)] + for _, batch := range batches { + go func(keyBatch []any) { + defer waitGroup.Done() - // fill the row with nulls if not found - if iter.NumRows() == 0 { - for _, featName := range featureNames { - results[rowIdx][featureNamesToIdx[featName]] = FeatureData{ - Reference: serving.FeatureReferenceV2{ - FeatureViewName: featureViewName, - FeatureName: featName, - }, - Value: types.Value{ - Val: &types.Value_NullVal{ - NullVal: types.Null_NULL, - }, - }, - } - } - return - } + cqlStatement := c.getCQLStatement(tableName, featureNames, len(keyBatch)) + iter := c.session.Query(cqlStatement, keyBatch...).WithContext(ctx).Iter() scanner := iter.Scanner() var entityKey string @@ -303,7 +307,8 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ var eventTs time.Time var valueStr []byte var deserializedValue types.Value - rowFeatures := make(map[string]FeatureData) + // key 1: entityKey - key 2: featureName + batchFeatures := make(map[string]map[string]FeatureData) for scanner.Next() { err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) if err != nil { @@ -317,7 +322,10 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ if deserializedValue.Val != nil { // Convert the value to a FeatureData struct - rowFeatures[featureName] = FeatureData{ + if batchFeatures[entityKey] == nil { + batchFeatures[entityKey] = make(map[string]FeatureData) + } + batchFeatures[entityKey][featureName] = FeatureData{ Reference: serving.FeatureReferenceV2{ FeatureViewName: featureViewName, FeatureName: featureName, @@ -335,26 +343,28 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ return } - for _, featName := range featureNames { - featureData, ok := rowFeatures[featName] - if !ok { - featureData = FeatureData{ - Reference: serving.FeatureReferenceV2{ - FeatureViewName: featureViewName, - FeatureName: featName, - }, - Value: types.Value{ - Val: &types.Value_NullVal{ - NullVal: types.Null_NULL, + for _, serializedEntityKey := range keyBatch { + for _, featName := range featureNames { + keyString := serializedEntityKey.(string) + featureData, ok := batchFeatures[keyString][featName] + if !ok { + featureData = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureViewName, + FeatureName: featName, }, - }, + Value: types.Value{ + Val: &types.Value_NullVal{ + NullVal: types.Null_NULL, + }, + }, + } } + results[serializedEntityKeyToIndex[keyString]][featureNamesToIdx[featName]] = featureData } - results[rowIdx][featureNamesToIdx[featName]] = featureData } - }(serializedEntityKey) + }(batch) } - // wait until all concurrent single-key queries are done waitGroup.Wait() close(errorsChannel) From e7f7c0ff79099484dbf0f004a63471083d95ae0a Mon Sep 17 00:00:00 2001 From: Jose Acevedo Date: Fri, 6 Dec 2024 16:16:11 -0500 Subject: [PATCH 02/13] Have batches of 10 keys and cache CQL query --- go/internal/feast/onlinestore/cassandraonlinestore.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 3ac9ff2f9d7..36e299cf053 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -275,7 +275,7 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ // do batching nKeys := len(serializedEntityKeys) - batchSize := 2 + batchSize := 10 nBatches := int(math.Ceil(float64(nKeys) / float64(batchSize))) batches := make([][]any, nBatches) @@ -294,11 +294,17 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ errorsChannel := make(chan error, nBatches) + var prevBatchLength int + var cqlStatement string for _, batch := range batches { go func(keyBatch []any) { defer waitGroup.Done() - cqlStatement := c.getCQLStatement(tableName, featureNames, len(keyBatch)) + // this caches the previous batch query if it had the same number of keys + if len(keyBatch) != prevBatchLength { + cqlStatement = c.getCQLStatement(tableName, featureNames, len(keyBatch)) + } + iter := c.session.Query(cqlStatement, keyBatch...).WithContext(ctx).Iter() scanner := iter.Scanner() From 2c3f1de6c2d3978fc1a9e89c3b51e6f5f411f1eb Mon Sep 17 00:00:00 2001 From: Jose Acevedo Date: Wed, 11 Dec 2024 14:01:34 -0500 Subject: [PATCH 03/13] Use batch size of 20 --- go/internal/feast/onlinestore/cassandraonlinestore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 36e299cf053..79c820cf975 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -275,7 +275,7 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ // do batching nKeys := len(serializedEntityKeys) - batchSize := 10 + batchSize := 20 nBatches := int(math.Ceil(float64(nKeys) / float64(batchSize))) batches := make([][]any, nBatches) From 535741b58115e7a1a9892bd3ebc5fa7652daa0fc Mon Sep 17 00:00:00 2001 From: Jose Acevedo Date: Fri, 20 Dec 2024 15:05:49 -0500 Subject: [PATCH 04/13] Implement configurable batching + removed misleading log --- .../feast/onlinestore/cassandraonlinestore.go | 196 +++++++++++++++++- .../onlinestore/cassandraonlinestore_test.go | 15 +- 2 files changed, 200 insertions(+), 11 deletions(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 79c820cf975..ec988ea691b 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -33,6 +33,9 @@ type CassandraOnlineStore struct { session *gocql.Session config *registry.RepoConfig + + // The number of keys to include in a single CQL query for retrieval from the database + keyBatchSize int } type CassandraConfig struct { @@ -44,6 +47,7 @@ type CassandraConfig struct { loadBalancingPolicy gocql.HostSelectionPolicy connectionTimeoutMillis int64 requestTimeoutMillis int64 + keyBatchSize int } func parseStringField(config map[string]any, fieldName string, defaultValue string) (string, error) { @@ -156,6 +160,13 @@ func extractCassandraConfig(onlineStoreConfig map[string]any) (*CassandraConfig, } cassandraConfig.requestTimeoutMillis = int64(requestTimeoutMillis.(float64)) + keyBatchSize, ok := onlineStoreConfig["key_batch_size"] + if !ok { + keyBatchSize = 10.0 + log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 10") + } + cassandraConfig.keyBatchSize = int(keyBatchSize.(float64)) + return &cassandraConfig, nil } @@ -176,8 +187,9 @@ func NewCassandraOnlineStore(project string, config *registry.RepoConfig, online store.clusterConfigs.PoolConfig.HostSelectionPolicy = cassandraConfig.loadBalancingPolicy - if cassandraConfig.username != "" && cassandraConfig.password != "" { - log.Warn().Msg("username/password not defined, will not be using authentication") + if cassandraConfig.username == "" || cassandraConfig.password == "" { + log.Warn().Msg("username and/or password not defined, will not be using authentication") + } else { store.clusterConfigs.Authenticator = gocql.PasswordAuthenticator{ Username: cassandraConfig.username, Password: cassandraConfig.password, @@ -203,6 +215,16 @@ func NewCassandraOnlineStore(project string, config *registry.RepoConfig, online return nil, fmt.Errorf("unable to connect to the ScyllaDB database") } store.session = createdSession + + if cassandraConfig.keyBatchSize <= 0 { + return nil, fmt.Errorf("key_batch_size must be greater than zero") + } else if cassandraConfig.keyBatchSize == 1 { + log.Info().Msg("key batching is disabled") + } else { + log.Info().Msgf("key batching is enabled with a batch size of %d", cassandraConfig.keyBatchSize) + } + store.keyBatchSize = cassandraConfig.keyBatchSize + return &store, nil } @@ -210,7 +232,21 @@ func (c *CassandraOnlineStore) getFqTableName(tableName string) string { return fmt.Sprintf(`"%s"."%s_%s"`, c.clusterConfigs.Keyspace, c.project, tableName) } -func (c *CassandraOnlineStore) getCQLStatement(tableName string, featureNames []string, nkeys int) string { +func (c *CassandraOnlineStore) getSingleKeyCQLStatement(tableName string, featureNames []string) string { + // this prevents fetching unnecessary features + quotedFeatureNames := make([]string, len(featureNames)) + for i, featureName := range featureNames { + quotedFeatureNames[i] = fmt.Sprintf(`'%s'`, featureName) + } + + return fmt.Sprintf( + `SELECT "entity_key", "feature_name", "event_ts", "value" FROM %s WHERE "entity_key" = ? AND "feature_name" IN (%s)`, + tableName, + strings.Join(quotedFeatureNames, ","), + ) +} + +func (c *CassandraOnlineStore) getMultiKeyCQLStatement(tableName string, featureNames []string, nkeys int) string { // this prevents fetching unnecessary features quotedFeatureNames := make([]string, len(featureNames)) for i, featureName := range featureNames { @@ -244,7 +280,143 @@ func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.Enti } return cassandraKeys, cassandraKeyToEntityIndex, nil } -func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + +func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + uniqueNames := make(map[string]int32) + for _, fvName := range featureViewNames { + uniqueNames[fvName] = 0 + } + if len(uniqueNames) != 1 { + return nil, fmt.Errorf("rejecting OnlineRead as more than 1 feature view was tried to be read at once") + } + + serializedEntityKeys, serializedEntityKeyToIndex, err := c.buildCassandraEntityKeys(entityKeys) + + if err != nil { + return nil, fmt.Errorf("error when serializing entity keys for Cassandra") + } + results := make([][]FeatureData, len(entityKeys)) + for i := range results { + results[i] = make([]FeatureData, len(featureNames)) + } + + featureNamesToIdx := make(map[string]int) + for idx, name := range featureNames { + featureNamesToIdx[name] = idx + } + + featureViewName := featureViewNames[0] + + // Prepare the query + tableName := c.getFqTableName(featureViewName) + cqlStatement := c.getSingleKeyCQLStatement(tableName, featureNames) + + var waitGroup sync.WaitGroup + waitGroup.Add(len(serializedEntityKeys)) + + errorsChannel := make(chan error, len(serializedEntityKeys)) + for _, serializedEntityKey := range serializedEntityKeys { + go func(serEntityKey any) { + defer waitGroup.Done() + + iter := c.session.Query(cqlStatement, serEntityKey).WithContext(ctx).Iter() + + rowIdx := serializedEntityKeyToIndex[serializedEntityKey.(string)] + + // fill the row with nulls if not found + if iter.NumRows() == 0 { + for _, featName := range featureNames { + results[rowIdx][featureNamesToIdx[featName]] = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureViewName, + FeatureName: featName, + }, + Value: types.Value{ + Val: &types.Value_NullVal{ + NullVal: types.Null_NULL, + }, + }, + } + } + return + } + + scanner := iter.Scanner() + var entityKey string + var featureName string + var eventTs time.Time + var valueStr []byte + var deserializedValue types.Value + rowFeatures := make(map[string]FeatureData) + for scanner.Next() { + err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) + if err != nil { + errorsChannel <- errors.New("could not read row in query for (entity key, feature name, value, event ts)") + return + } + if err := proto.Unmarshal(valueStr, &deserializedValue); err != nil { + errorsChannel <- errors.New("error converting parsed Cassandra Value to types.Value") + return + } + + if deserializedValue.Val != nil { + // Convert the value to a FeatureData struct + rowFeatures[featureName] = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureViewName, + FeatureName: featureName, + }, + Timestamp: timestamppb.Timestamp{Seconds: eventTs.Unix(), Nanos: int32(eventTs.Nanosecond())}, + Value: types.Value{ + Val: deserializedValue.Val, + }, + } + } + } + + if err := scanner.Err(); err != nil { + errorsChannel <- errors.New("failed to scan features: " + err.Error()) + return + } + + for _, featName := range featureNames { + featureData, ok := rowFeatures[featName] + if !ok { + featureData = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureViewName, + FeatureName: featName, + }, + Value: types.Value{ + Val: &types.Value_NullVal{ + NullVal: types.Null_NULL, + }, + }, + } + } + results[rowIdx][featureNamesToIdx[featName]] = featureData + } + }(serializedEntityKey) + } + + // wait until all concurrent single-key queries are done + waitGroup.Wait() + close(errorsChannel) + + var collectedErrors []error + for err := range errorsChannel { + if err != nil { + collectedErrors = append(collectedErrors, err) + } + } + if len(collectedErrors) > 0 { + return nil, errors.Join(collectedErrors...) + } + + return results, nil +} + +func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { uniqueNames := make(map[string]int32) for _, fvName := range featureViewNames { uniqueNames[fvName] = 0 @@ -273,9 +445,9 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ // Prepare the query tableName := c.getFqTableName(featureViewName) - // do batching + // Key batching nKeys := len(serializedEntityKeys) - batchSize := 20 + batchSize := c.keyBatchSize nBatches := int(math.Ceil(float64(nKeys) / float64(batchSize))) batches := make([][]any, nBatches) @@ -293,7 +465,6 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ waitGroup.Add(nBatches) errorsChannel := make(chan error, nBatches) - var prevBatchLength int var cqlStatement string for _, batch := range batches { @@ -302,7 +473,7 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ // this caches the previous batch query if it had the same number of keys if len(keyBatch) != prevBatchLength { - cqlStatement = c.getCQLStatement(tableName, featureNames, len(keyBatch)) + cqlStatement = c.getMultiKeyCQLStatement(tableName, featureNames, len(keyBatch)) } iter := c.session.Query(cqlStatement, keyBatch...).WithContext(ctx).Iter() @@ -327,7 +498,6 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ } if deserializedValue.Val != nil { - // Convert the value to a FeatureData struct if batchFeatures[entityKey] == nil { batchFeatures[entityKey] = make(map[string]FeatureData) } @@ -388,6 +558,14 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ return results, nil } +func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + if c.keyBatchSize == 1 { + return c.UnbatchedKeysOnlineRead(ctx, entityKeys, featureViewNames, featureNames) + } else { + return c.BatchedKeysOnlineRead(ctx, entityKeys, featureViewNames, featureNames) + } +} + func (c *CassandraOnlineStore) Destruct() { c.session.Close() } diff --git a/go/internal/feast/onlinestore/cassandraonlinestore_test.go b/go/internal/feast/onlinestore/cassandraonlinestore_test.go index 67a9eea5487..19c53506b3f 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore_test.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore_test.go @@ -60,17 +60,28 @@ func TestGetFqTableName(t *testing.T) { assert.Equal(t, `"scylladb"."dummy_project_dummy_fv"`, fqTableName) } -func TestGetCQLStatement(t *testing.T) { +func TestGetSingleKeyCQLStatement(t *testing.T) { store := CassandraOnlineStore{} fqTableName := `"scylladb"."dummy_project_dummy_fv"` - cqlStatement := store.getCQLStatement(fqTableName, []string{"feat1", "feat2"}) + cqlStatement := store.getSingleKeyCQLStatement(fqTableName, []string{"feat1", "feat2"}) assert.Equal(t, `SELECT "entity_key", "feature_name", "event_ts", "value" FROM "scylladb"."dummy_project_dummy_fv" WHERE "entity_key" = ? AND "feature_name" IN ('feat1','feat2')`, cqlStatement, ) } +func TestGetMultiKeyCQLStatement(t *testing.T) { + store := CassandraOnlineStore{} + fqTableName := `"scylladb"."dummy_project_dummy_fv"` + + cqlStatement := store.getMultiKeyCQLStatement(fqTableName, []string{"feat1", "feat2"}, 5) + assert.Equal(t, + `SELECT "entity_key", "feature_name", "event_ts", "value" FROM "scylladb"."dummy_project_dummy_fv" WHERE "entity_key" IN (?,?,?,?,?) AND "feature_name" IN ('feat1','feat2')`, + cqlStatement, + ) +} + func TestOnlineRead_RejectsDifferentFeatureViewsInSameRead(t *testing.T) { store := CassandraOnlineStore{} _, err := store.OnlineRead(context.TODO(), nil, []string{"fv1", "fv2"}, []string{"feat1", "feat2"}) From 4f4302f35eec8773b5147482a90cea509dd8f389 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Tue, 21 Jan 2025 13:12:23 -0800 Subject: [PATCH 05/13] fix: Dummy commit to generate new version From e4ca43898c79f77a9ce5a80a2875d524a104c1e3 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Wed, 22 Jan 2025 11:39:59 -0800 Subject: [PATCH 06/13] fix: Update the default batch size from 10 to 5. --- go/internal/feast/onlinestore/cassandraonlinestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index ec988ea691b..795fbeba918 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -162,8 +162,8 @@ func extractCassandraConfig(onlineStoreConfig map[string]any) (*CassandraConfig, keyBatchSize, ok := onlineStoreConfig["key_batch_size"] if !ok { - keyBatchSize = 10.0 - log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 10") + keyBatchSize = 5.0 + log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 5") } cassandraConfig.keyBatchSize = int(keyBatchSize.(float64)) From 7f9427ecd0f767fde68a57daef7fb8b8635cead3 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Thu, 23 Jan 2025 20:25:30 -0800 Subject: [PATCH 07/13] fix: Limiting the keyBatchSize max to 100 Limiting the keyBatchSize max to 100 since maximum clustering-key cartesian product size is 100 --- go/internal/feast/onlinestore/cassandraonlinestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 795fbeba918..b81bcd147b1 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -216,8 +216,8 @@ func NewCassandraOnlineStore(project string, config *registry.RepoConfig, online } store.session = createdSession - if cassandraConfig.keyBatchSize <= 0 { - return nil, fmt.Errorf("key_batch_size must be greater than zero") + if cassandraConfig.keyBatchSize <= 0 || cassandraConfig.keyBatchSize > 100 { + return nil, fmt.Errorf("key_batch_size must be greater than zero and less than 100") } else if cassandraConfig.keyBatchSize == 1 { log.Info().Msg("key batching is disabled") } else { From d7335a5a88526a879d0798c6bfbbb5530272100b Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Tue, 28 Jan 2025 21:25:19 -0800 Subject: [PATCH 08/13] feat: Add the key_batch_size to python cassandra config --- .../contrib/cassandra_online_store/cassandra_online_store.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 1998de464ab..d019ca44ece 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -161,6 +161,9 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): key_ttl_seconds: Optional[StrictInt] = None """Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL.""" + + key_batch_size: Optional[StrictInt] = 1 + """Default value of key batch. Value 1 means key batching is disabled""" class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """ From ba060c799c8a39e31362a09c9076131462089db8 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Tue, 28 Jan 2025 21:38:04 -0800 Subject: [PATCH 09/13] fix: Formatting From 749eb1c6eb00da483f4d4808b5c02df8e9e2b1df Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 28 Jan 2025 21:51:10 -0800 Subject: [PATCH 10/13] fix: Formatting --- .../contrib/cassandra_online_store/cassandra_online_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index d019ca44ece..6715c40d614 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -161,7 +161,7 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): key_ttl_seconds: Optional[StrictInt] = None """Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL.""" - + key_batch_size: Optional[StrictInt] = 1 """Default value of key batch. Value 1 means key batching is disabled""" From f36aa4c0a3bfc249ec2bef44fdb242c84f53cff4 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:17:53 -0800 Subject: [PATCH 11/13] Updating key batch size to 10 Co-authored-by: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> --- go/internal/feast/onlinestore/cassandraonlinestore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index b81bcd147b1..989add6f981 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -162,7 +162,7 @@ func extractCassandraConfig(onlineStoreConfig map[string]any) (*CassandraConfig, keyBatchSize, ok := onlineStoreConfig["key_batch_size"] if !ok { - keyBatchSize = 5.0 + keyBatchSize = 10.0 log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 5") } cassandraConfig.keyBatchSize = int(keyBatchSize.(float64)) From 285a39822b68f3cecbe051c32915f933adf3ff89 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:18:16 -0800 Subject: [PATCH 12/13] Updating key batch size to 10 Co-authored-by: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> --- go/internal/feast/onlinestore/cassandraonlinestore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 989add6f981..eeaabec3e04 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -163,7 +163,7 @@ func extractCassandraConfig(onlineStoreConfig map[string]any) (*CassandraConfig, keyBatchSize, ok := onlineStoreConfig["key_batch_size"] if !ok { keyBatchSize = 10.0 - log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 5") + log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 10") } cassandraConfig.keyBatchSize = int(keyBatchSize.(float64)) From e3d44f7b738abc586af6cf85728c1202c03b6f8c Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:19:02 -0800 Subject: [PATCH 13/13] Updating the default to 10 Co-authored-by: Bhargav Dodla <13788369+EXPEbdodla@users.noreply.github.com> --- .../contrib/cassandra_online_store/cassandra_online_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 6715c40d614..b3c3d955f38 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -162,8 +162,8 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): key_ttl_seconds: Optional[StrictInt] = None """Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL.""" - key_batch_size: Optional[StrictInt] = 1 - """Default value of key batch. Value 1 means key batching is disabled""" + key_batch_size: Optional[StrictInt] = 10 + """In Go Feature Server, this configuration is used to query tables with multiple keys at a time using IN clause based on the size specified. Value 1 means key batching is disabled. Valid values are 1 to 100.""" class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """