Skip to content

Commit

Permalink
Transformation service (#62)
Browse files Browse the repository at this point in the history
* implement transformation service in go based on java implementation

* integrate transformation service into feature store

* temporary transformations server endpoint

* rough work to add health endpoints to transformation server

* small changes

* noop

* noop

* noop

* add healthcheck to transformation service/proto

* fix health check import

* add third party health service to transformation service

* rename route

* idek anymore

* another approach

* grpc

* take out watch, setup.py

* add watch function

* add watch to health check service

* properly-formatted transformation server endpoint

* error out of transformation service doesnt work

* use insecure connection

* add error logging

* add schema to recordValueWriter

* take port out of grpc url; sanitize app name

* rework grpc url

* switch to port 80

* close arrow.FileWriter

* print input features df for debugging

* switch things up

* take out print statement

* fix go compilation

* add test file

* fix go unit tests

* take out last grpcio-health-checking reference

* test some dummy code

* fix and print in test

* writing to a bytes buffer?

* maybe fix byteslicewriter

* printing stuff...

* printing more stuff

* try reading bytestream differently; remove debug logging

* get rid of compilation warning

* debug log transformation server return

* more debug logging

* more debug output

* i am optimistic this will work

* more debug logging

* stupid output mistake

* i mean

* i didnt mean

* more and more and more debug logging

* i meaan please

* take out debugging print statements

* clean up

* fix linting

* fix linting and indenting problems

* fix linting

* fix black linting

* fix black linting again

---------

Co-authored-by: William Parsley <[email protected]>
  • Loading branch information
piket and William Parsley authored Jan 4, 2024
1 parent 380d9ea commit d248fd7
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures(

outputFields := make([]arrow.Field, 0)
outputColumns := make([]arrow.Array, 0)
pool := memory.NewCgoArrowAllocator()
pool := memory.NewGoAllocator()
for _, featureVector := range resp {
outputFields = append(outputFields,
arrow.Field{
Expand Down
15 changes: 13 additions & 2 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package feast
import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/apache/arrow/go/v8/arrow/memory"

Expand All @@ -20,6 +23,7 @@ type FeatureStore struct {
registry *registry.Registry
onlineStore onlinestore.OnlineStore
transformationCallback transformation.TransformationCallback
transformationService *transformation.GrpcTransformationService
}

// A Features struct specifies a list of features to be retrieved from the online store. These features
Expand Down Expand Up @@ -54,12 +58,17 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
if err != nil {
return nil, err
}
sanitizedProjectName := strings.Replace(config.Project, "_", "-", -1)
productName := os.Getenv("PRODUCT")
endpoint := fmt.Sprintf("%s-transformations.%s.svc.cluster.local:80", sanitizedProjectName, productName)
transformationService, _ := transformation.NewGrpcTransformationService(config, endpoint)

return &FeatureStore{
config: config,
registry: registry,
onlineStore: onlineStore,
transformationCallback: callback,
transformationService: transformationService,
}, nil
}

Expand Down Expand Up @@ -116,7 +125,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
}

result := make([]*onlineserving.FeatureVector, 0)
arrowMemory := memory.NewCgoArrowAllocator()
arrowMemory := memory.NewGoAllocator()
featureViews := make([]*model.FeatureView, len(requestedFeatureViews))
index := 0
for _, featuresAndView := range requestedFeatureViews {
Expand Down Expand Up @@ -164,13 +173,15 @@ func (fs *FeatureStore) GetOnlineFeatures(
result = append(result, vectors...)
}

if fs.transformationCallback != nil {
if fs.transformationCallback != nil || fs.transformationService != nil {
onDemandFeatures, err := transformation.AugmentResponseWithOnDemandTransforms(
ctx,
requestedOnDemandFeatureViews,
requestData,
joinKeyToEntityValues,
result,
fs.transformationCallback,
fs.transformationService,
arrowMemory,
numRows,
fullFeatureNames,
Expand Down
41 changes: 30 additions & 11 deletions go/internal/feast/transformation/transformation.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transformation

import (
"context"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -32,11 +33,13 @@ Python function is expected to return number of rows added to the output record
type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int

func AugmentResponseWithOnDemandTransforms(
ctx context.Context,
onDemandFeatureViews []*model.OnDemandFeatureView,
requestData map[string]*prototypes.RepeatedValue,
entityRows map[string]*prototypes.RepeatedValue,
features []*onlineserving.FeatureVector,
transformationCallback TransformationCallback,
transformationService *GrpcTransformationService,
arrowMemory memory.Allocator,
numRows int,
fullFeatureNames bool,
Expand Down Expand Up @@ -68,17 +71,33 @@ func AugmentResponseWithOnDemandTransforms(
retrievedFeatures[vector.Name] = vector.Values
}

onDemandFeatures, err := CallTransformations(
odfv,
retrievedFeatures,
requestContextArrow,
transformationCallback,
numRows,
fullFeatureNames,
)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
var onDemandFeatures []*onlineserving.FeatureVector
if transformationService != nil {
onDemandFeatures, err = transformationService.GetTransformation(
ctx,
odfv,
retrievedFeatures,
requestContextArrow,
numRows,
fullFeatureNames,
)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
}
} else {
onDemandFeatures, err = CallTransformations(
odfv,
retrievedFeatures,
requestContextArrow,
transformationCallback,
numRows,
fullFeatureNames,
)
if err != nil {
ReleaseArrowContext(requestContextArrow)
return nil, err
}
}
result = append(result, onDemandFeatures...)

Expand Down
204 changes: 204 additions & 0 deletions go/internal/feast/transformation/transformation_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package transformation

import (
"bytes"
"context"
"fmt"
"strings"

"github.com/feast-dev/feast/go/internal/feast/registry"
"google.golang.org/protobuf/types/known/timestamppb"
"io"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/ipc"
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/protos/feast/serving"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type GrpcTransformationService struct {
project string
conn *grpc.ClientConn
client *serving.TransformationServiceClient
}

func NewGrpcTransformationService(config *registry.RepoConfig, endpoint string) (*GrpcTransformationService, error) {
opts := make([]grpc.DialOption, 0)
opts = append(opts, grpc.WithDefaultCallOptions(), grpc.WithTransportCredentials(insecure.NewCredentials()))

conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, err
}
client := serving.NewTransformationServiceClient(conn)
return &GrpcTransformationService{ config.Project, conn, &client }, nil
}

func (s *GrpcTransformationService) Close() error {
return s.conn.Close()
}

func (s *GrpcTransformationService) GetTransformation(
ctx context.Context,
featureView *model.OnDemandFeatureView,
retrievedFeatures map[string]arrow.Array,
requestContext map[string]arrow.Array,
numRows int,
fullFeatureNames bool,
) ([]*onlineserving.FeatureVector, error) {
var err error

inputFields := make([]arrow.Field, 0)
inputColumns := make([]arrow.Array, 0)
for name, arr := range retrievedFeatures {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}
for name, arr := range requestContext {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}

inputSchema := arrow.NewSchema(inputFields, nil)
inputRecord := array.NewRecord(inputSchema, inputColumns, int64(numRows))
defer inputRecord.Release()

recordValueWriter := new(ByteSliceWriter)
arrowWriter, err := ipc.NewFileWriter(recordValueWriter, ipc.WithSchema(inputSchema))
if err != nil {
return nil, err
}

err = arrowWriter.Write(inputRecord)
if err != nil {
return nil, err
}

err = arrowWriter.Close()
if err != nil {
return nil, err
}

arrowInput := serving.ValueType_ArrowValue{ArrowValue: recordValueWriter.buf}
transformationInput := serving.ValueType{Value: &arrowInput}

req := serving.TransformFeaturesRequest{
OnDemandFeatureViewName: featureView.Base.Name,
Project: s.project,
TransformationInput: &transformationInput,
}

res, err := (*s.client).TransformFeatures(ctx, &req)
if err != nil {
return nil, err
}

arrowBytes := res.TransformationOutput.GetArrowValue()
return ExtractTransformationResponse(featureView, arrowBytes, numRows, false)
}

func ExtractTransformationResponse(
featureView *model.OnDemandFeatureView,
arrowBytes []byte,
numRows int,
fullFeatureNames bool,
) ([]*onlineserving.FeatureVector, error) {
arrowMemory := memory.NewGoAllocator()
arrowReader, err := ipc.NewFileReader(bytes.NewReader(arrowBytes), ipc.WithAllocator(arrowMemory))
if err != nil {
return nil, err
}

outRecord, err := arrowReader.Read()
if err != nil {
return nil, err
}
result := make([]*onlineserving.FeatureVector, 0)
for idx, field := range outRecord.Schema().Fields() {
dropFeature := true

featureName := strings.Split(field.Name, "__")[1]
if featureView.Base.Projection != nil {

for _, feature := range featureView.Base.Projection.Features {
if featureName == feature.Name {
dropFeature = false
}
}
} else {
dropFeature = false
}

if dropFeature {
continue
}

statuses := make([]serving.FieldStatus, numRows)
timestamps := make([]*timestamppb.Timestamp, numRows)

for idx := 0; idx < numRows; idx++ {
statuses[idx] = serving.FieldStatus_PRESENT
timestamps[idx] = timestamppb.Now()
}

result = append(result, &onlineserving.FeatureVector{
Name: featureName,
Values: outRecord.Column(idx),
Statuses: statuses,
Timestamps: timestamps,
})
}

return result, nil
}

type ByteSliceWriter struct {
buf []byte
offset int64
}

func (w *ByteSliceWriter) Write(p []byte) (n int, err error) {
minCap := int(w.offset) + len(p)
if minCap > cap(w.buf) { // Make sure buf has enough capacity:
buf2 := make([]byte, len(w.buf), minCap+len(p)) // add some extra
copy(buf2, w.buf)
w.buf = buf2
}
if minCap > len(w.buf) {
w.buf = w.buf[:minCap]
}
copy(w.buf[w.offset:], p)
w.offset += int64(len(p))
return len(p), nil
}

func (w *ByteSliceWriter) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
if w.offset != offset && (offset < 0 || offset > int64(len(w.buf))) {
return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf))
}
w.offset = offset
return offset, nil
case io.SeekCurrent:
newOffset := w.offset + offset
if newOffset != offset && (newOffset < 0 || newOffset > int64(len(w.buf))) {
return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf))
}
w.offset += offset
return w.offset, nil
case io.SeekEnd:
newOffset := int64(len(w.buf)) + offset
if newOffset != offset && (newOffset < 0 || newOffset > int64(len(w.buf))) {
return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf))
}
w.offset = offset
return w.offset, nil
}
return 0, fmt.Errorf("unsupported seek mode %d", whence)
}
4 changes: 3 additions & 1 deletion protos/feast/third_party/grpc/health/v1/HealthService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}

message HealthCheckResponse {
Expand All @@ -21,4 +22,5 @@ message HealthCheckResponse {

service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
Loading

0 comments on commit d248fd7

Please sign in to comment.