diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index a08c8380a0f..77ffc67875a 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - // "os" + "os" "strings" "github.com/apache/arrow/go/v8/arrow/memory" @@ -59,11 +59,8 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf return nil, err } sanitizedProjectName := strings.Replace(config.Project, "_", "-", -1) - // productName := os.Getenv("PRODUCT") - // fmt.Println("===========PRODUCT============") - // fmt.Println(productName) - // fmt.Println("===========PRODUCT============") - endpoint := fmt.Sprintf("%s-transformations.unified-feature-store.cluster.local:80", sanitizedProjectName) + productName := os.Getenv("PRODUCT") + endpoint := fmt.Sprintf("%s-transformations.%s.cluster.local:80", sanitizedProjectName, productName) transformationService, _ := transformation.NewGrpcTransformationService(config, endpoint) return &FeatureStore{ diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 9e83bebc7ae..784bb43a9a8 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -2,7 +2,6 @@ import logging import sys import threading -import time from concurrent import futures from ddtrace import Pin, patch, Tracer @@ -186,62 +185,23 @@ def GetTransformationServiceInfo(self, request, context): return response def TransformFeatures(self, request, context): - function_start_time = time.process_time() - function_timer = time.process_time() try: - funtion_timer = time.process_time() odfv = self.fs.get_on_demand_feature_view( name=request.on_demand_feature_view_name, allow_cache=True ) - odfv_get_duration = time.process_time() - function_timer except OnDemandFeatureViewNotFoundException: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) raise - funtion_timer = time.process_time() df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() - df_file_duration = time.process_time() - funtion_timer - - function_timer = time.process_time() result_df = odfv.get_transformed_features_df(df, True) - transform_func_duration = time.process_time() - function_timer - - function_timer = time.process_time() result_arrow = pa.Table.from_pandas(result_df) - pandas_to_arrow_duration = time.process_time() - function_timer - - function_timer = time.process_time() sink = pa.BufferOutputStream() - buffer_output_duration = time.process_time() - function_timer - - function_timer = time.process_time() writer = pa.ipc.new_file(sink, result_arrow.schema) - writer_declaration_duration = time.process_time() - function_timer - - function_timer = time.process_time() writer.write_table(result_arrow) - writer_write_duration = time.process_time() - function_timer - - function_timer = time.process_time() writer.close() - writer_close_duration = time.process_time() - function_timer - - function_timer = time.process_time() buf = sink.getvalue().to_pybytes() - to_pybytes_duration = time.process_time() - function_timer - - print(f'Time to get odfv: {odfv_get_duration}') - print(f'Time for ipc.open_file(): {df_file_duration}') - print(f'Time for transformation callback: {transform_func_duration}') - print(f'Time for arrow Table.from_pandas(): {pandas_to_arrow_duration}') - print(f'Time for pa.BufferOutputStream(): {buffer_output_duration}') - print(f'Time for ipc.new_file(): {writer_declaration_duration}') - print(f'Time for write_table(): {writer_write_duration}') - print(f'Time for writer.close(): {writer_close_duration}') - print(f'Time for to_pybytes(): {to_pybytes_duration}') - print("-----------------------------------") - print(f'Elapsed function time: {(time.process_time() - function_start_time)}') return TransformFeaturesResponse( transformation_output=ValueType(arrow_value=buf)