diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 844f809c85..ac62e450a2 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -17,8 +17,8 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/gocql/gocql" - "github.com/golang/protobuf/proto" "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" gocqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/gocql/gocql" @@ -380,7 +380,7 @@ func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, enti var eventTs time.Time var valueStr []byte var deserializedValue types.Value - rowFeatures := make(map[string]FeatureData) + rowFeatures := make(map[string]*FeatureData) for scanner.Next() { err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) if err != nil { @@ -394,7 +394,7 @@ func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, enti if deserializedValue.Val != nil { // Convert the value to a FeatureData struct - rowFeatures[featureName] = FeatureData{ + rowFeatures[featureName] = &FeatureData{ Reference: serving.FeatureReferenceV2{ FeatureViewName: featureViewName, FeatureName: featureName, @@ -413,9 +413,20 @@ func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, enti } for _, featName := range featureNames { - featureData, ok := rowFeatures[featName] - if !ok { - featureData = FeatureData{ + + if featureData, exists := rowFeatures[featName]; exists { + results[rowIdx][featureNamesToIdx[featName]] = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureData.Reference.FeatureViewName, + FeatureName: featureData.Reference.FeatureName, + }, + Timestamp: timestamppb.Timestamp{Seconds: featureData.Timestamp.Seconds, Nanos: featureData.Timestamp.Nanos}, + Value: types.Value{ + Val: featureData.Value.Val, + }, + } + } else { + results[rowIdx][featureNamesToIdx[featName]] = FeatureData{ Reference: serving.FeatureReferenceV2{ FeatureViewName: featureViewName, FeatureName: featName, @@ -427,7 +438,7 @@ func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, enti }, } } - results[rowIdx][featureNamesToIdx[featName]] = featureData + } }(serializedEntityKey) } @@ -516,7 +527,7 @@ func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entity var valueStr []byte var deserializedValue types.Value // key 1: entityKey - key 2: featureName - batchFeatures := make(map[string]map[string]FeatureData) + batchFeatures := make(map[string]map[string]*FeatureData) for scanner.Next() { err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) if err != nil { @@ -530,9 +541,9 @@ func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entity if deserializedValue.Val != nil { if batchFeatures[entityKey] == nil { - batchFeatures[entityKey] = make(map[string]FeatureData) + batchFeatures[entityKey] = make(map[string]*FeatureData) } - batchFeatures[entityKey][featureName] = FeatureData{ + batchFeatures[entityKey][featureName] = &FeatureData{ Reference: serving.FeatureReferenceV2{ FeatureViewName: featureViewName, FeatureName: featureName, @@ -553,9 +564,20 @@ func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entity for _, serializedEntityKey := range keyBatch { for _, featName := range featureNames { keyString := serializedEntityKey.(string) - featureData, ok := batchFeatures[keyString][featName] - if !ok { - featureData = FeatureData{ + + if featureData, exists := batchFeatures[keyString][featName]; exists { + results[serializedEntityKeyToIndex[keyString]][featureNamesToIdx[featName]] = FeatureData{ + Reference: serving.FeatureReferenceV2{ + FeatureViewName: featureData.Reference.FeatureViewName, + FeatureName: featureData.Reference.FeatureName, + }, + Timestamp: timestamppb.Timestamp{Seconds: featureData.Timestamp.Seconds, Nanos: featureData.Timestamp.Nanos}, + Value: types.Value{ + Val: featureData.Value.Val, + }, + } + } else { + results[serializedEntityKeyToIndex[keyString]][featureNamesToIdx[featName]] = FeatureData{ Reference: serving.FeatureReferenceV2{ FeatureViewName: featureViewName, FeatureName: featName, @@ -567,7 +589,6 @@ func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entity }, } } - results[serializedEntityKeyToIndex[keyString]][featureNamesToIdx[featName]] = featureData } } }(batch, cqlStatement)