diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index cb60f32d9..77ee5b2a5 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -7,20 +7,16 @@ import ( "net" "net/http" "strconv" - "time" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -37,7 +33,7 @@ const ( var ( grpcPort = flag.Int( "grpcPort", - 9002, + runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") grpcHealthPort = flag.Int( "grpcHealthPort", @@ -47,31 +43,31 @@ var ( "metricsPort", 9090, "The metrics port") targetPodHeader = flag.String( "targetPodHeader", - "target-pod", + runserver.DefaultTargetPodHeader, "Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.") poolName = flag.String( "poolName", - "", + runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String( "poolNamespace", - "default", + runserver.DefaultPoolNamespace, "Namespace of the InferencePool this Endpoint Picker is associated with.") serviceName = flag.String( "serviceName", - "", + runserver.DefaultServiceName, "Name of the Service that will be used to read EndpointSlices from") zone = flag.String( "zone", - "", + runserver.DefaultZone, "The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ") refreshPodsInterval = flag.Duration( "refreshPodsInterval", - 10*time.Second, + runserver.DefaultRefreshPodsInterval, "interval to refresh pods") refreshMetricsInterval = flag.Duration( "refreshMetricsInterval", - 50*time.Millisecond, + runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics") scheme = runtime.NewScheme() @@ -103,71 +99,34 @@ func main() { }) klog.Info(flags) - // Create a new manager to manage controllers - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme}) - if err != nil { - klog.Fatalf("Failed to create controller manager: %v", err) - } - - // Create the data store used to cache watched resources datastore := backend.NewK8sDataStore() - // Create the controllers and register them with the manager - if err := (&backend.InferencePoolReconciler{ - Datastore: datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - PoolNamespacedName: types.NamespacedName{ - Name: *poolName, - Namespace: *poolNamespace, - }, - Record: mgr.GetEventRecorderFor("InferencePool"), - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) - } - - if err := (&backend.InferenceModelReconciler{ - Datastore: datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - PoolNamespacedName: types.NamespacedName{ - Name: *poolName, - Namespace: *poolNamespace, - }, - Record: mgr.GetEventRecorderFor("InferenceModel"), - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) - } - - if err := (&backend.EndpointSliceReconciler{ - Datastore: datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Record: mgr.GetEventRecorderFor("endpointslice"), - ServiceName: *serviceName, - Zone: *zone, - }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) + serverRunner := &runserver.ExtProcServerRunner{ + GrpcPort: *grpcPort, + TargetPodHeader: *targetPodHeader, + PoolName: *poolName, + PoolNamespace: *poolNamespace, + ServiceName: *serviceName, + Zone: *zone, + RefreshPodsInterval: *refreshPodsInterval, + RefreshMetricsInterval: *refreshMetricsInterval, + Scheme: scheme, + Config: ctrl.GetConfigOrDie(), + Datastore: datastore, } + serverRunner.Setup() // Start health and ext-proc servers in goroutines healthSvr := startHealthServer(datastore, *grpcHealthPort) - extProcSvr := startExternalProcessorServer( + extProcSvr := serverRunner.Start( datastore, - *grpcPort, - *refreshPodsInterval, - *refreshMetricsInterval, - *targetPodHeader, + &vllm.PodMetricsClientImpl{}, ) // Start metrics handler metricsSvr := startMetricsHandler(*metricsPort, cfg) - // Start the controller manager. Blocking and will return when shutdown is complete. - klog.Infof("Starting controller manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.Fatalf("Error starting controller manager: %v", err) - } - klog.Info("Controller manager shutting down") + // Start manager, blocking + serverRunner.StartManager() // Gracefully shutdown servers if healthSvr != nil { @@ -209,43 +168,6 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server { return svr } -// startExternalProcessorServer starts the Envoy external processor server in a goroutine. -func startExternalProcessorServer( - datastore *backend.K8sDatastore, - port int, - refreshPodsInterval, refreshMetricsInterval time.Duration, - targetPodHeader string, -) *grpc.Server { - svr := grpc.NewServer() - - go func() { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - klog.Fatalf("Ext-proc server failed to listen: %v", err) - } - klog.Infof("Ext-proc server listening on port: %d", port) - - // Initialize backend provider - pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) - if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { - klog.Fatalf("Failed to initialize backend provider: %v", err) - } - - // Register ext_proc handlers - extProcPb.RegisterExternalProcessorServer( - svr, - handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), - ) - - // Blocking and will return when shutdown is complete. - if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { - klog.Fatalf("Ext-proc server failed: %v", err) - } - klog.Info("Ext-proc server shutting down") - }() - return svr -} - func startMetricsHandler(port int, cfg *rest.Config) *http.Server { metrics.Register() diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go new file mode 100644 index 000000000..94c6078c8 --- /dev/null +++ b/pkg/ext-proc/server/runserver.go @@ -0,0 +1,156 @@ +package server + +import ( + "fmt" + "net" + "time" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "google.golang.org/grpc" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + klog "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" +) + +// ExtProcServerRunner provides methods to manage an external process server. +type ExtProcServerRunner struct { + GrpcPort int + TargetPodHeader string + PoolName string + PoolNamespace string + ServiceName string + Zone string + RefreshPodsInterval time.Duration + RefreshMetricsInterval time.Duration + Scheme *runtime.Scheme + Config *rest.Config + Datastore *backend.K8sDatastore + manager ctrl.Manager +} + +// Default values for CLI flags in main +const ( + DefaultGrpcPort = 9002 // default for --grpcPort + DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --poolNamespace + DefaultServiceName = "" // required but no default + DefaultZone = "" // default for --zone + DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval +) + +func NewDefaultExtProcServerRunner() *ExtProcServerRunner { + return &ExtProcServerRunner{ + GrpcPort: DefaultGrpcPort, + TargetPodHeader: DefaultTargetPodHeader, + PoolName: DefaultPoolName, + PoolNamespace: DefaultPoolNamespace, + ServiceName: DefaultServiceName, + Zone: DefaultZone, + RefreshPodsInterval: DefaultRefreshPodsInterval, + RefreshMetricsInterval: DefaultRefreshMetricsInterval, + // Scheme, Config, and Datastore can be assigned later. + } +} + +// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager. +func (r *ExtProcServerRunner) Setup() { + // Create a new manager to manage controllers + mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme}) + if err != nil { + klog.Fatalf("Failed to create controller manager: %v", err) + } + r.manager = mgr + + // Create the controllers and register them with the manager + if err := (&backend.InferencePoolReconciler{ + Datastore: r.Datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + PoolNamespacedName: types.NamespacedName{ + Name: r.PoolName, + Namespace: r.PoolNamespace, + }, + Record: mgr.GetEventRecorderFor("InferencePool"), + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) + } + + if err := (&backend.InferenceModelReconciler{ + Datastore: r.Datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + PoolNamespacedName: types.NamespacedName{ + Name: r.PoolName, + Namespace: r.PoolNamespace, + }, + Record: mgr.GetEventRecorderFor("InferenceModel"), + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) + } + + if err := (&backend.EndpointSliceReconciler{ + Datastore: r.Datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Record: mgr.GetEventRecorderFor("endpointslice"), + ServiceName: r.ServiceName, + Zone: r.Zone, + }).SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) + } +} + +// Start starts the Envoy external processor server in a goroutine. +func (r *ExtProcServerRunner) Start( + podDatastore *backend.K8sDatastore, + podMetricsClient backend.PodMetricsClient, +) *grpc.Server { + svr := grpc.NewServer() + + go func() { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort)) + if err != nil { + klog.Fatalf("Ext-proc server failed to listen: %v", err) + } + klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort) + + // Initialize backend provider + pp := backend.NewProvider(podMetricsClient, podDatastore) + if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil { + klog.Fatalf("Failed to initialize backend provider: %v", err) + } + + // Register ext_proc handlers + extProcPb.RegisterExternalProcessorServer( + svr, + handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore), + ) + + // Blocking and will return when shutdown is complete. + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Ext-proc server failed: %v", err) + } + klog.Info("Ext-proc server shutting down") + }() + return svr +} + +func (r *ExtProcServerRunner) StartManager() { + if r.manager == nil { + klog.Fatalf("Runner has no manager setup to run: %v", r) + } + // Start the controller manager. Blocking and will return when shutdown is complete. + klog.Infof("Starting controller manager") + mgr := r.manager + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.Fatalf("Error starting controller manager: %v", err) + } + klog.Info("Controller manager shutting down") +} diff --git a/pkg/ext-proc/test/hermetic_test.go b/pkg/ext-proc/test/hermetic_test.go deleted file mode 100644 index acbd74a94..000000000 --- a/pkg/ext-proc/test/hermetic_test.go +++ /dev/null @@ -1,178 +0,0 @@ -// Package test contains e2e tests for the ext proc while faking the backend pods. -package test - -import ( - "context" - "fmt" - "testing" - "time" - - configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/testing/protocmp" - "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" - "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" -) - -const ( - port = 9002 -) - -func TestHandleRequestBody(t *testing.T) { - tests := []struct { - name string - req *extProcPb.ProcessingRequest - pods []*backend.PodMetrics - models map[string]*v1alpha1.InferenceModel - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool - }{ - { - name: "success", - req: GenerateRequest("my-model"), - models: map[string]*v1alpha1.InferenceModel{ - "my-model": { - Spec: v1alpha1.InferenceModelSpec{ - ModelName: "my-model", - TargetModels: []v1alpha1.TargetModel{ - { - Name: "my-model-v1", - Weight: pointer(100), - }, - }, - }, - }, - }, - // pod-1 will be picked because it has relatively low queue size, with the requested - // model being active, and has low KV cache. - pods: []*backend.PodMetrics{ - { - Pod: FakePod(0), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - "bar": 1, - }, - }, - }, - { - Pod: FakePod(1), - Metrics: backend.Metrics{ - WaitingQueueSize: 0, - KVCacheUsagePercent: 0.1, - ActiveModels: map[string]int{ - "foo": 1, - "my-model-v1": 1, - }, - }, - }, - { - Pod: FakePod(2), - Metrics: backend.Metrics{ - WaitingQueueSize: 10, - KVCacheUsagePercent: 0.2, - ActiveModels: map[string]int{ - "foo": 1, - }, - }, - }, - }, - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "target-pod", - RawValue: []byte("address-1"), - }, - }, - { - Header: &configPb.HeaderValue{ - Key: "Content-Length", - RawValue: []byte("73"), - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpServer(t, test.pods, test.models) - t.Cleanup(cleanup) - want := &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_Body{ - Body: test.wantBody, - }, - }, - }, - }, - }, - } - res, err := sendRequest(t, client, test.req) - - if (err != nil) != test.wantErr { - t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) - } - - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) - } - }) - } - -} - -func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - server := StartExtProc(port, time.Second, time.Second, pods, models) - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("Failed to connect to %v: %v", address, err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - t.Fatalf("Failed to create client: %v", err) - } - return client, func() { - cancel() - conn.Close() - server.GracefulStop() - } -} - -func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } - - res, err := client.Recv() - if err != nil { - t.Logf("Failed to receive: %v", err) - return nil, err - } - t.Logf("Received request %+v", res) - return res, err -} - -func pointer(v int32) *int32 { - return &v -} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go new file mode 100644 index 000000000..1379285f3 --- /dev/null +++ b/test/integration/hermetic_test.go @@ -0,0 +1,435 @@ +// Package test contains e2e tests for the ext proc while faking the backend pods. +package integration + +import ( + "bufio" + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "log" + "os" + "path/filepath" + "testing" + "time" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/testing/protocmp" + "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" + extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + k8syaml "k8s.io/apimachinery/pkg/util/yaml" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + klog "k8s.io/klog/v2" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/yaml" +) + +const ( + port = runserver.DefaultGrpcPort +) + +var ( + serverRunner *runserver.ExtProcServerRunner + k8sClient k8sclient.Client + testEnv *envtest.Environment + scheme = runtime.NewScheme() +) + +func SKIPTestHandleRequestBody(t *testing.T) { + tests := []struct { + name string + req *extProcPb.ProcessingRequest + pods []*backend.PodMetrics + models map[string]*v1alpha1.InferenceModel + wantHeaders []*configPb.HeaderValueOption + wantBody []byte + wantErr bool + }{ + { + name: "success", + req: extprocutils.GenerateRequest("my-model"), + models: map[string]*v1alpha1.InferenceModel{ + "my-model": { + Spec: v1alpha1.InferenceModelSpec{ + ModelName: "my-model", + TargetModels: []v1alpha1.TargetModel{ + { + Name: "my-model-v1", + Weight: pointer(100), + }, + }, + }, + }, + }, + // pod-1 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + pods: []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "my-model-v1": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + }, + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte("address-1"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("73"), + }, + }, + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-v1\",\"prompt\":\"hello\",\"temperature\":0}"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client, cleanup := setUpServer(t, test.pods, test.models) + t.Cleanup(cleanup) + want := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: test.wantHeaders, + }, + BodyMutation: &extProcPb.BodyMutation{ + Mutation: &extProcPb.BodyMutation_Body{ + Body: test.wantBody, + }, + }, + }, + }, + }, + } + res, err := sendRequest(t, client, test.req) + + if (err != nil) != test.wantErr { + t.Fatalf("Unexpected error, got %v, want %v", err, test.wantErr) + } + + if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + t.Errorf("Unexpected response, (-want +got): %v", diff) + } + }) + } + +} + +func TestKubeInferenceModelRequest(t *testing.T) { + tests := []struct { + name string + req *extProcPb.ProcessingRequest + wantHeaders []*configPb.HeaderValueOption + wantBody []byte + wantErr bool + }{ + { + name: "success", + req: extprocutils.GenerateRequest("sql-lora"), + // pod-1 will be picked because it has relatively low queue size, with the requested + // model being active, and has low KV cache. + wantHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte("address-1"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "Content-Length", + RawValue: []byte("76"), + }, + }, + }, + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), + wantErr: false, + }, + } + + pods := []*backend.PodMetrics{ + { + Pod: extprocutils.FakePod(0), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + "bar": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(1), + Metrics: backend.Metrics{ + WaitingQueueSize: 0, + KVCacheUsagePercent: 0.1, + ActiveModels: map[string]int{ + "foo": 1, + "sql-lora-1fdg2": 1, + }, + }, + }, + { + Pod: extprocutils.FakePod(2), + Metrics: backend.Metrics{ + WaitingQueueSize: 10, + KVCacheUsagePercent: 0.2, + ActiveModels: map[string]int{ + "foo": 1, + }, + }, + }, + } + + // Set up global k8sclient and extproc server runner with test environment config + BeforeSuit() + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client, cleanup := setUpHermeticServer(t, pods) + t.Cleanup(cleanup) + want := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: test.wantHeaders, + }, + BodyMutation: &extProcPb.BodyMutation{ + Mutation: &extProcPb.BodyMutation_Body{ + Body: test.wantBody, + }, + }, + }, + }, + }, + } + res, err := sendRequest(t, client, test.req) + + if err != nil { + if !test.wantErr { + t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + } + } else if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { + t.Errorf("Unexpected response, (-want +got): %v", diff) + } + }) + } +} + +func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { + t.Logf("Setting up ExtProc server") + server := extprocutils.StartExtProc(port, time.Second, time.Second, pods, models) + + address := fmt.Sprintf("localhost:%v", port) + // Create a grpc connection + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to connect to %v: %v", address, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) + if err != nil { + log.Fatalf("Failed to create client: %v", err) + } + return client, func() { + cancel() + conn.Close() + server.GracefulStop() + } +} + +func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { + t.Logf("Setting up hermetic ExtProc server") + klog.InitFlags(nil) + flag.Parse() + // Configure klog verbosity levels to print ext proc logs. + _ = flag.Lookup("v").Value.Set("3") + + // Unmarshal CRDs from file into structs + manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml") + docs, err := readDocuments(manifestsPath) + if err != nil { + log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err) + } + + inferenceModels := make([]*v1alpha1.InferenceModel, 0) + for _, doc := range docs { + inferenceModel := &v1alpha1.InferenceModel{} + if err = yaml.Unmarshal(doc, inferenceModel); err != nil { + log.Fatalf("Can't unmarshal object: %v", doc) + } + if inferenceModel.Kind != "InferenceModel" { + continue + } + inferenceModels = append(inferenceModels, inferenceModel) + } + t.Logf("Inference models to add: %+v", inferenceModels) + for _, model := range inferenceModels { + t.Logf("Creating inference model: %+v", model) + if err := k8sClient.Create(context.Background(), model); err != nil { + log.Fatalf("unable to create inferenceModel %v: %v", model.GetName(), err) + } + } + + ps := make(backend.PodSet) + pms := make(map[backend.Pod]*backend.PodMetrics) + for _, pod := range pods { + ps[pod.Pod] = true + pms[pod.Pod] = pod + } + pmc := &backend.FakePodMetricsClient{Res: pms} + + server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) + if err != nil { + log.Fatalf("Ext-proc failed with the err: %v", err) + } + + // Wait the reconciler to populate the datastore. + time.Sleep(10 * time.Second) + + address := fmt.Sprintf("localhost:%v", port) + // Create a grpc connection + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to connect to %v: %v", address, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) + if err != nil { + log.Fatalf("Failed to create client: %v", err) + } + return client, func() { + cancel() + conn.Close() + server.GracefulStop() + } +} + +// Sets up a test environment and returns the runner struct +func BeforeSuit() { + // Set up mock k8s API Client + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + cfg, err := testEnv.Start() + + if err != nil { + log.Fatalf("Failed to start test environment, cfg: %v error: %v", cfg, err) + } + + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + + k8sClient, err = k8sclient.New(cfg, k8sclient.Options{Scheme: scheme}) + if err != nil { + log.Fatalf("Failed to start k8s Client: %v", err) + } else if k8sClient == nil { + log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg) + } + + serverRunner = runserver.NewDefaultExtProcServerRunner() + // Adjust from defaults + serverRunner.PoolName = "vllm-llama2-7b-pool" + serverRunner.Scheme = scheme + serverRunner.Config = cfg + serverRunner.Datastore = backend.NewK8sDataStore() + + serverRunner.Setup() + + // Start the controller manager in go routine, not blocking + go func() { + serverRunner.StartManager() + }() +} + +func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { + t.Logf("Sending request: %v", req) + if err := client.Send(req); err != nil { + t.Logf("Failed to send request %+v: %v", req, err) + return nil, err + } + + res, err := client.Recv() + if err != nil { + t.Logf("Failed to receive: %v", err) + return nil, err + } + t.Logf("Received request %+v", res) + return res, err +} + +// readDocuments reads documents from file. +func readDocuments(fp string) ([][]byte, error) { + b, err := os.ReadFile(fp) + if err != nil { + return nil, err + } + + docs := [][]byte{} + reader := k8syaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(b))) + for { + // Read document + doc, err := reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + docs = append(docs, doc) + } + return docs, nil +} +func pointer(v int32) *int32 { + return &v +} diff --git a/test/testdata/inferencepool-with-model-hermetic.yaml b/test/testdata/inferencepool-with-model-hermetic.yaml new file mode 100644 index 000000000..8703c37af --- /dev/null +++ b/test/testdata/inferencepool-with-model-hermetic.yaml @@ -0,0 +1,30 @@ +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferencePool +metadata: + labels: + name: vllm-llama2-7b-pool +spec: + targetPortNumber: 8000 + selector: + app: vllm-llama2-7b-pool +--- +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: InferenceModel +metadata: + labels: + app.kubernetes.io/name: api + app.kubernetes.io/managed-by: kustomize + name: inferencemodel-sample + namespace: default +spec: + modelName: sql-lora + criticality: Critical + poolRef: + # this is the default val: + group: inference.networking.x-k8s.io + # this is the default val: + kind: InferencePool + name: vllm-llama2-7b-pool + targetModels: + - name: sql-lora-1fdg2 + weight: 100 \ No newline at end of file