From d248fd73bd31a053d0dfac6b60d5fa63709b31ed Mon Sep 17 00:00:00 2001 From: piket Date: Thu, 4 Jan 2024 14:29:35 -0800 Subject: [PATCH] Transformation service (#62) * implement transformation service in go based on java implementation * integrate transformation service into feature store * temporary transformations server endpoint * rough work to add health endpoints to transformation server * small changes * noop * noop * noop * add healthcheck to transformation service/proto * fix health check import * add third party health service to transformation service * rename route * idek anymore * another approach * grpc * take out watch, setup.py * add watch function * add watch to health check service * properly-formatted transformation server endpoint * error out of transformation service doesnt work * use insecure connection * add error logging * add schema to recordValueWriter * take port out of grpc url; sanitize app name * rework grpc url * switch to port 80 * close arrow.FileWriter * print input features df for debugging * switch things up * take out print statement * fix go compilation * add test file * fix go unit tests * take out last grpcio-health-checking reference * test some dummy code * fix and print in test * writing to a bytes buffer? * maybe fix byteslicewriter * printing stuff... * printing more stuff * try reading bytestream differently; remove debug logging * get rid of compilation warning * debug log transformation server return * more debug logging * more debug output * i am optimistic this will work * more debug logging * stupid output mistake * i mean * i didnt mean * more and more and more debug logging * i meaan please * take out debugging print statements * clean up * fix linting * fix linting and indenting problems * fix linting * fix black linting * fix black linting again --------- Co-authored-by: William Parsley --- go/embedded/online_features.go | 2 +- go/internal/feast/featurestore.go | 15 +- .../feast/transformation/transformation.go | 41 +++- .../transformation/transformation_service.go | 204 ++++++++++++++++++ .../grpc/health/v1/HealthService.proto | 4 +- sdk/python/feast/transformation_server.py | 152 +++++++++++++ setup.py | 3 +- 7 files changed, 405 insertions(+), 16 deletions(-) create mode 100644 go/internal/feast/transformation/transformation_service.go diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 66187d9a009..49e3716266b 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -210,7 +210,7 @@ func (s *OnlineFeatureService) GetOnlineFeatures( outputFields := make([]arrow.Field, 0) outputColumns := make([]arrow.Array, 0) - pool := memory.NewCgoArrowAllocator() + pool := memory.NewGoAllocator() for _, featureVector := range resp { outputFields = append(outputFields, arrow.Field{ diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 00389feb781..650cebfc23c 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -3,6 +3,9 @@ package feast import ( "context" "errors" + "fmt" + "os" + "strings" "github.com/apache/arrow/go/v8/arrow/memory" @@ -20,6 +23,7 @@ type FeatureStore struct { registry *registry.Registry onlineStore onlinestore.OnlineStore transformationCallback transformation.TransformationCallback + transformationService *transformation.GrpcTransformationService } // A Features struct specifies a list of features to be retrieved from the online store. These features @@ -54,12 +58,17 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf if err != nil { return nil, err } + sanitizedProjectName := strings.Replace(config.Project, "_", "-", -1) + productName := os.Getenv("PRODUCT") + endpoint := fmt.Sprintf("%s-transformations.%s.svc.cluster.local:80", sanitizedProjectName, productName) + transformationService, _ := transformation.NewGrpcTransformationService(config, endpoint) return &FeatureStore{ config: config, registry: registry, onlineStore: onlineStore, transformationCallback: callback, + transformationService: transformationService, }, nil } @@ -116,7 +125,7 @@ func (fs *FeatureStore) GetOnlineFeatures( } result := make([]*onlineserving.FeatureVector, 0) - arrowMemory := memory.NewCgoArrowAllocator() + arrowMemory := memory.NewGoAllocator() featureViews := make([]*model.FeatureView, len(requestedFeatureViews)) index := 0 for _, featuresAndView := range requestedFeatureViews { @@ -164,13 +173,15 @@ func (fs *FeatureStore) GetOnlineFeatures( result = append(result, vectors...) } - if fs.transformationCallback != nil { + if fs.transformationCallback != nil || fs.transformationService != nil { onDemandFeatures, err := transformation.AugmentResponseWithOnDemandTransforms( + ctx, requestedOnDemandFeatureViews, requestData, joinKeyToEntityValues, result, fs.transformationCallback, + fs.transformationService, arrowMemory, numRows, fullFeatureNames, diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 9690be97ed8..9d986abb899 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -1,6 +1,7 @@ package transformation import ( + "context" "errors" "fmt" "runtime" @@ -32,11 +33,13 @@ Python function is expected to return number of rows added to the output record type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int func AugmentResponseWithOnDemandTransforms( + ctx context.Context, onDemandFeatureViews []*model.OnDemandFeatureView, requestData map[string]*prototypes.RepeatedValue, entityRows map[string]*prototypes.RepeatedValue, features []*onlineserving.FeatureVector, transformationCallback TransformationCallback, + transformationService *GrpcTransformationService, arrowMemory memory.Allocator, numRows int, fullFeatureNames bool, @@ -68,17 +71,33 @@ func AugmentResponseWithOnDemandTransforms( retrievedFeatures[vector.Name] = vector.Values } - onDemandFeatures, err := CallTransformations( - odfv, - retrievedFeatures, - requestContextArrow, - transformationCallback, - numRows, - fullFeatureNames, - ) - if err != nil { - ReleaseArrowContext(requestContextArrow) - return nil, err + var onDemandFeatures []*onlineserving.FeatureVector + if transformationService != nil { + onDemandFeatures, err = transformationService.GetTransformation( + ctx, + odfv, + retrievedFeatures, + requestContextArrow, + numRows, + fullFeatureNames, + ) + if err != nil { + ReleaseArrowContext(requestContextArrow) + return nil, err + } + } else { + onDemandFeatures, err = CallTransformations( + odfv, + retrievedFeatures, + requestContextArrow, + transformationCallback, + numRows, + fullFeatureNames, + ) + if err != nil { + ReleaseArrowContext(requestContextArrow) + return nil, err + } } result = append(result, onDemandFeatures...) diff --git a/go/internal/feast/transformation/transformation_service.go b/go/internal/feast/transformation/transformation_service.go new file mode 100644 index 00000000000..51dc5d29908 --- /dev/null +++ b/go/internal/feast/transformation/transformation_service.go @@ -0,0 +1,204 @@ +package transformation + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "google.golang.org/protobuf/types/known/timestamppb" + "io" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/ipc" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/internal/feast/onlineserving" + "github.com/feast-dev/feast/go/protos/feast/serving" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type GrpcTransformationService struct { + project string + conn *grpc.ClientConn + client *serving.TransformationServiceClient +} + +func NewGrpcTransformationService(config *registry.RepoConfig, endpoint string) (*GrpcTransformationService, error) { + opts := make([]grpc.DialOption, 0) + opts = append(opts, grpc.WithDefaultCallOptions(), grpc.WithTransportCredentials(insecure.NewCredentials())) + + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return nil, err + } + client := serving.NewTransformationServiceClient(conn) + return &GrpcTransformationService{ config.Project, conn, &client }, nil +} + +func (s *GrpcTransformationService) Close() error { + return s.conn.Close() +} + +func (s *GrpcTransformationService) GetTransformation( + ctx context.Context, + featureView *model.OnDemandFeatureView, + retrievedFeatures map[string]arrow.Array, + requestContext map[string]arrow.Array, + numRows int, + fullFeatureNames bool, +) ([]*onlineserving.FeatureVector, error) { + var err error + + inputFields := make([]arrow.Field, 0) + inputColumns := make([]arrow.Array, 0) + for name, arr := range retrievedFeatures { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } + for name, arr := range requestContext { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } + + inputSchema := arrow.NewSchema(inputFields, nil) + inputRecord := array.NewRecord(inputSchema, inputColumns, int64(numRows)) + defer inputRecord.Release() + + recordValueWriter := new(ByteSliceWriter) + arrowWriter, err := ipc.NewFileWriter(recordValueWriter, ipc.WithSchema(inputSchema)) + if err != nil { + return nil, err + } + + err = arrowWriter.Write(inputRecord) + if err != nil { + return nil, err + } + + err = arrowWriter.Close() + if err != nil { + return nil, err + } + + arrowInput := serving.ValueType_ArrowValue{ArrowValue: recordValueWriter.buf} + transformationInput := serving.ValueType{Value: &arrowInput} + + req := serving.TransformFeaturesRequest{ + OnDemandFeatureViewName: featureView.Base.Name, + Project: s.project, + TransformationInput: &transformationInput, + } + + res, err := (*s.client).TransformFeatures(ctx, &req) + if err != nil { + return nil, err + } + + arrowBytes := res.TransformationOutput.GetArrowValue() + return ExtractTransformationResponse(featureView, arrowBytes, numRows, false) +} + +func ExtractTransformationResponse( + featureView *model.OnDemandFeatureView, + arrowBytes []byte, + numRows int, + fullFeatureNames bool, +) ([]*onlineserving.FeatureVector, error) { + arrowMemory := memory.NewGoAllocator() + arrowReader, err := ipc.NewFileReader(bytes.NewReader(arrowBytes), ipc.WithAllocator(arrowMemory)) + if err != nil { + return nil, err + } + + outRecord, err := arrowReader.Read() + if err != nil { + return nil, err + } + result := make([]*onlineserving.FeatureVector, 0) + for idx, field := range outRecord.Schema().Fields() { + dropFeature := true + + featureName := strings.Split(field.Name, "__")[1] + if featureView.Base.Projection != nil { + + for _, feature := range featureView.Base.Projection.Features { + if featureName == feature.Name { + dropFeature = false + } + } + } else { + dropFeature = false + } + + if dropFeature { + continue + } + + statuses := make([]serving.FieldStatus, numRows) + timestamps := make([]*timestamppb.Timestamp, numRows) + + for idx := 0; idx < numRows; idx++ { + statuses[idx] = serving.FieldStatus_PRESENT + timestamps[idx] = timestamppb.Now() + } + + result = append(result, &onlineserving.FeatureVector{ + Name: featureName, + Values: outRecord.Column(idx), + Statuses: statuses, + Timestamps: timestamps, + }) + } + + return result, nil +} + +type ByteSliceWriter struct { + buf []byte + offset int64 +} + +func (w *ByteSliceWriter) Write(p []byte) (n int, err error) { + minCap := int(w.offset) + len(p) + if minCap > cap(w.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(w.buf), minCap+len(p)) // add some extra + copy(buf2, w.buf) + w.buf = buf2 + } + if minCap > len(w.buf) { + w.buf = w.buf[:minCap] + } + copy(w.buf[w.offset:], p) + w.offset += int64(len(p)) + return len(p), nil +} + +func (w *ByteSliceWriter) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + if w.offset != offset && (offset < 0 || offset > int64(len(w.buf))) { + return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf)) + } + w.offset = offset + return offset, nil + case io.SeekCurrent: + newOffset := w.offset + offset + if newOffset != offset && (newOffset < 0 || newOffset > int64(len(w.buf))) { + return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf)) + } + w.offset += offset + return w.offset, nil + case io.SeekEnd: + newOffset := int64(len(w.buf)) + offset + if newOffset != offset && (newOffset < 0 || newOffset > int64(len(w.buf))) { + return 0, fmt.Errorf("invalid seek: new offset %d out of range [0 %d]", offset, len(w.buf)) + } + w.offset = offset + return w.offset, nil + } + return 0, fmt.Errorf("unsupported seek mode %d", whence) +} diff --git a/protos/feast/third_party/grpc/health/v1/HealthService.proto b/protos/feast/third_party/grpc/health/v1/HealthService.proto index 3c5504e9827..50a0271562d 100644 --- a/protos/feast/third_party/grpc/health/v1/HealthService.proto +++ b/protos/feast/third_party/grpc/health/v1/HealthService.proto @@ -13,6 +13,7 @@ enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; } message HealthCheckResponse { @@ -21,4 +22,5 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -} \ No newline at end of file + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 83f4af749e3..47d5659c934 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -1,5 +1,7 @@ +import collections import logging import sys +import threading from concurrent import futures import grpc @@ -19,11 +21,156 @@ TransformationServiceServicer, add_TransformationServiceServicer_to_server, ) +from feast.protos.feast.third_party.grpc.health.v1.HealthService_pb2 import ( + HealthCheckResponse, + ServingStatus, +) +from feast.protos.feast.third_party.grpc.health.v1.HealthService_pb2_grpc import ( + HealthServicer, + add_HealthServicer_to_server, +) from feast.version import get_version log = logging.getLogger(__name__) +class _Watcher: + def __init__(self): + self._condition = threading.Condition() + self._responses = collections.deque() + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._responses and self._open: + self._condition.wait() + if self._responses: + return self._responses.popleft() + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, response): + with self._condition: + self._responses.append(response) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + +def _watcher_to_send_response_callback_adapter(watcher): + def send_response_callback(response): + if response is None: + watcher.close() + else: + watcher.add(response) + + return send_response_callback + + +class HealthServer(HealthServicer): + """Servicer handling RPCs for service statuses.""" + + def __init__(self, experimental_non_blocking=True, experimental_thread_pool=None): + self._lock = threading.RLock() + self._server_status = {"": ServingStatus.SERVING} + self._send_response_callbacks = {} + self.Watch.__func__.experimental_non_blocking = experimental_non_blocking + self.Watch.__func__.experimental_thread_pool = experimental_thread_pool + self._gracefully_shutting_down = False + + def _on_close_callback(self, send_response_callback, service): + def callback(): + with self._lock: + self._send_response_callbacks[service].remove(send_response_callback) + send_response_callback(None) + + return callback + + def Check(self, request, context): + with self._lock: + status = self._server_status.get(request.service) + if status is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return HealthCheckResponse() + else: + return HealthCheckResponse(status=status) + + # pylint: disable=arguments-differ + def Watch(self, request, context, send_response_callback=None): + blocking_watcher = None + if send_response_callback is None: + # The server does not support the experimental_non_blocking + # parameter. For backwards compatibility, return a blocking response + # generator. + blocking_watcher = _Watcher() + send_response_callback = _watcher_to_send_response_callback_adapter( + blocking_watcher + ) + service = request.service + with self._lock: + status = self._server_status.get(service) + if status is None: + status = ServingStatus.SERVICE_UNKNOWN # pylint: disable=no-member + send_response_callback(HealthCheckResponse(status=status)) + if service not in self._send_response_callbacks: + self._send_response_callbacks[service] = set() + self._send_response_callbacks[service].add(send_response_callback) + context.add_callback( + self._on_close_callback(send_response_callback, service) + ) + return blocking_watcher + + def set(self, service, status): + """Sets the status of a service. + + Args: + service: string, the name of the service. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ + with self._lock: + if self._gracefully_shutting_down: + return + else: + self._server_status[service] = status + if service in self._send_response_callbacks: + for send_response_callback in self._send_response_callbacks[ + service + ]: + send_response_callback(HealthCheckResponse(status=status)) + + def enter_graceful_shutdown(self): + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + + This is an EXPERIMENTAL API. + """ + with self._lock: + if self._gracefully_shutting_down: + return + else: + for service in self._server_status: + self.set( + service, ServingStatus.NOT_SERVING + ) # pylint: disable=no-member + self._gracefully_shutting_down = True + + class TransformationServer(TransformationServiceServicer): def __init__(self, fs: FeatureStore) -> None: super().__init__() @@ -62,8 +209,13 @@ def TransformFeatures(self, request, context): def start_server(store: FeatureStore, port: int): + log.info("Starting server..") server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_TransformationServiceServicer_to_server(TransformationServer(store), server) + + # Add health check service to server + add_HealthServicer_to_server(HealthServer(), server) + service_names_available_for_reflection = ( DESCRIPTOR.services_by_name["TransformationService"].full_name, reflection.SERVICE_NAME, diff --git a/setup.py b/setup.py index c563cade344..3a0a2c43145 100644 --- a/setup.py +++ b/setup.py @@ -248,7 +248,7 @@ else: use_scm_version = None -PROTO_SUBDIRS = ["core", "serving", "types", "storage"] +PROTO_SUBDIRS = ["core", "serving", "types", "storage", "third_party/grpc/health/v1"] PYTHON_CODE_PREFIX = "sdk/python" @@ -315,6 +315,7 @@ def run(self): for path in Path(self.python_folder).rglob("*.py"): for folder in self.sub_folders: + folder = folder.replace("/", ".") # Read in the file with open(path, "r") as file: filedata = file.read()