11// Package test contains e2e tests for the ext proc while faking the backend pods.
2- package test
2+ package integration
33
44import (
55 "bufio"
@@ -24,11 +24,11 @@ import (
2424 "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
2525 "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2626 runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
27+ extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test"
2728 "k8s.io/apimachinery/pkg/runtime"
2829 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2930 k8syaml "k8s.io/apimachinery/pkg/util/yaml"
3031 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
31- "k8s.io/client-go/rest"
3232 klog "k8s.io/klog/v2"
3333 k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
3434 "sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -40,6 +40,7 @@ const (
4040)
4141
4242var (
43+ runner * runserver.ExtProcServerRunner
4344 k8sClient k8sclient.Client
4445 testEnv * envtest.Environment
4546 scheme = runtime .NewScheme ()
@@ -57,7 +58,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
5758 }{
5859 {
5960 name : "success" ,
60- req : GenerateRequest ("my-model" ),
61+ req : extprocutils . GenerateRequest ("my-model" ),
6162 models : map [string ]* v1alpha1.InferenceModel {
6263 "my-model" : {
6364 Spec : v1alpha1.InferenceModelSpec {
@@ -75,7 +76,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
7576 // model being active, and has low KV cache.
7677 pods : []* backend.PodMetrics {
7778 {
78- Pod : FakePod (0 ),
79+ Pod : extprocutils . FakePod (0 ),
7980 Metrics : backend.Metrics {
8081 WaitingQueueSize : 0 ,
8182 KVCacheUsagePercent : 0.2 ,
@@ -86,7 +87,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
8687 },
8788 },
8889 {
89- Pod : FakePod (1 ),
90+ Pod : extprocutils . FakePod (1 ),
9091 Metrics : backend.Metrics {
9192 WaitingQueueSize : 0 ,
9293 KVCacheUsagePercent : 0.1 ,
@@ -97,7 +98,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
9798 },
9899 },
99100 {
100- Pod : FakePod (2 ),
101+ Pod : extprocutils . FakePod (2 ),
101102 Metrics : backend.Metrics {
102103 WaitingQueueSize : 10 ,
103104 KVCacheUsagePercent : 0.2 ,
@@ -172,7 +173,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
172173 }{
173174 {
174175 name : "success" ,
175- req : GenerateRequest ("sql-lora" ),
176+ req : extprocutils . GenerateRequest ("sql-lora" ),
176177 // pod-1 will be picked because it has relatively low queue size, with the requested
177178 // model being active, and has low KV cache.
178179 wantHeaders : []* configPb.HeaderValueOption {
@@ -194,29 +195,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
194195 },
195196 }
196197
197- // Set up mock k8s API Client
198- testEnv = & envtest.Environment {
199- CRDDirectoryPaths : []string {filepath .Join (".." , ".." , ".." , "config" , "crd" , "bases" )},
200- ErrorIfCRDPathMissing : true ,
201- }
202- cfg , err := testEnv .Start ()
203- if err != nil {
204- log .Fatalf ("Failed to start test environment, cfg: %v error: %v" , cfg , err )
205- }
206-
207- utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
208- utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
209-
210- k8sClient , err = k8sclient .New (cfg , k8sclient.Options {Scheme : scheme })
211- if err != nil {
212- log .Fatalf ("Failed to start k8s Client: %v" , err )
213- } else if k8sClient == nil {
214- log .Fatalf ("No error, but returned kubernetes client is nil, cfg: %v" , cfg )
215- }
216-
217198 pods := []* backend.PodMetrics {
218199 {
219- Pod : FakePod (0 ),
200+ Pod : extprocutils . FakePod (0 ),
220201 Metrics : backend.Metrics {
221202 WaitingQueueSize : 0 ,
222203 KVCacheUsagePercent : 0.2 ,
@@ -227,7 +208,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
227208 },
228209 },
229210 {
230- Pod : FakePod (1 ),
211+ Pod : extprocutils . FakePod (1 ),
231212 Metrics : backend.Metrics {
232213 WaitingQueueSize : 0 ,
233214 KVCacheUsagePercent : 0.1 ,
@@ -238,7 +219,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
238219 },
239220 },
240221 {
241- Pod : FakePod (2 ),
222+ Pod : extprocutils . FakePod (2 ),
242223 Metrics : backend.Metrics {
243224 WaitingQueueSize : 10 ,
244225 KVCacheUsagePercent : 0.2 ,
@@ -248,9 +229,13 @@ func TestKubeInferenceModelRequest(t *testing.T) {
248229 },
249230 },
250231 }
232+
233+ // Set up global k8sclient and extproc server runner with test environment config
234+ BeforeSuit ()
235+
251236 for _ , test := range tests {
252237 t .Run (test .name , func (t * testing.T ) {
253- client , cleanup := setUpHermeticServer (t , cfg , pods )
238+ client , cleanup := setUpHermeticServer (t , pods )
254239 t .Cleanup (cleanup )
255240 want := & extProcPb.ProcessingResponse {
256241 Response : & extProcPb.ProcessingResponse_RequestBody {
@@ -283,7 +268,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
283268
284269func setUpServer (t * testing.T , pods []* backend.PodMetrics , models map [string ]* v1alpha1.InferenceModel ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
285270 t .Logf ("Setting up ExtProc server" )
286- server := StartExtProc (port , time .Second , time .Second , pods , models )
271+ server := extprocutils . StartExtProc (port , time .Second , time .Second , pods , models )
287272
288273 address := fmt .Sprintf ("localhost:%v" , port )
289274 // Create a grpc connection
@@ -304,33 +289,13 @@ func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1
304289 }
305290}
306291
307- func setUpHermeticServer (t * testing.T , cfg * rest. Config , pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
292+ func setUpHermeticServer (t * testing.T , pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
308293 t .Logf ("Setting up hermetic ExtProc server" )
309294 klog .InitFlags (nil )
310295 flag .Parse ()
311296 // Configure klog verbosity levels to print ext proc logs.
312297 _ = flag .Lookup ("v" ).Value .Set ("3" )
313298
314- runner := & runserver.ExtProcServerRunner {
315- GrpcPort : port ,
316- TargetPodHeader : "target-pod" ,
317- PoolName : "vllm-llama2-7b-pool" ,
318- PoolNamespace : "default" ,
319- ServiceName : "" ,
320- Zone : "" ,
321- RefreshPodsInterval : 10 * time .Second ,
322- RefreshMetricsInterval : 50 * time .Millisecond ,
323- Scheme : scheme ,
324- Config : cfg ,
325- Datastore : backend .NewK8sDataStore (),
326- }
327- runner .Setup ()
328-
329- // Start the controller manager in go routine, not blocking
330- go func () {
331- runner .StartManager ()
332- }()
333-
334299 // Unmarshal CRDs from file into structs
335300 manifestsPath := filepath .Join ("." , "artifacts" , "inferencepool-with-model-hermetic.yaml" )
336301 docs , err := readDocuments (manifestsPath )
@@ -392,6 +357,51 @@ func setUpHermeticServer(t *testing.T, cfg *rest.Config, pods []*backend.PodMetr
392357 }
393358}
394359
360+ // Sets up a test environment and returns the runner struct
361+ func BeforeSuit () {
362+ // Set up mock k8s API Client
363+ testEnv = & envtest.Environment {
364+ CRDDirectoryPaths : []string {filepath .Join (".." , ".." , "config" , "crd" , "bases" )},
365+ ErrorIfCRDPathMissing : true ,
366+ }
367+ cfg , err := testEnv .Start ()
368+
369+ if err != nil {
370+ log .Fatalf ("Failed to start test environment, cfg: %v error: %v" , cfg , err )
371+ }
372+
373+ utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
374+ utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
375+
376+ k8sClient , err = k8sclient .New (cfg , k8sclient.Options {Scheme : scheme })
377+ if err != nil {
378+ log .Fatalf ("Failed to start k8s Client: %v" , err )
379+ } else if k8sClient == nil {
380+ log .Fatalf ("No error, but returned kubernetes client is nil, cfg: %v" , cfg )
381+ }
382+
383+ runner = & runserver.ExtProcServerRunner {
384+ GrpcPort : port ,
385+ TargetPodHeader : "target-pod" ,
386+ PoolName : "vllm-llama2-7b-pool" ,
387+ PoolNamespace : "default" ,
388+ ServiceName : "" ,
389+ Zone : "" ,
390+ RefreshPodsInterval : 10 * time .Second ,
391+ RefreshMetricsInterval : 50 * time .Millisecond ,
392+ Scheme : scheme ,
393+ Config : cfg ,
394+ Datastore : backend .NewK8sDataStore (),
395+ }
396+
397+ runner .Setup ()
398+
399+ // Start the controller manager in go routine, not blocking
400+ go func () {
401+ runner .StartManager ()
402+ }()
403+ }
404+
395405func sendRequest (t * testing.T , client extProcPb.ExternalProcessor_ProcessClient , req * extProcPb.ProcessingRequest ) (* extProcPb.ProcessingResponse , error ) {
396406 t .Logf ("Sending request: %v" , req )
397407 if err := client .Send (req ); err != nil {
0 commit comments