@@ -33,6 +33,7 @@ import (
33
33
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
34
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
35
35
"k8s.io/apimachinery/pkg/runtime"
36
+ "sigs.k8s.io/kueue/apis/kueue/v1beta1"
36
37
)
37
38
38
39
// Trains the MNIST dataset as a RayJob, executed by a Ray cluster
@@ -49,9 +50,15 @@ func TestMnistRayJobRayClusterGpu(t *testing.T) {
49
50
func runMnistRayJobRayCluster (t * testing.T , accelerator string , numberOfGpus int ) {
50
51
test := With (t )
51
52
52
- // Create a namespace and localqueue in that namespace
53
+ // Create a namespace
53
54
namespace := test .NewTestNamespace ()
54
- localQueue := CreateKueueLocalQueue (test , namespace .Name , "e2e-cluster-queue" )
55
+
56
+ // Create Kueue resources
57
+ resourceFlavor := CreateKueueResourceFlavor (test , v1beta1.ResourceFlavorSpec {})
58
+ defer test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
59
+ clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus )
60
+ defer test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
61
+ CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
55
62
56
63
// Create MNIST training script
57
64
mnist := constructMNISTConfigMap (test , namespace )
@@ -61,7 +68,6 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
61
68
62
69
// Create RayCluster and assign it to the localqueue
63
70
rayCluster := constructRayCluster (test , namespace , mnist , numberOfGpus )
64
- AssignToLocalQueue (rayCluster , localQueue )
65
71
rayCluster , err = test .Client ().Ray ().RayV1 ().RayClusters (namespace .Name ).Create (test .Ctx (), rayCluster , metav1.CreateOptions {})
66
72
test .Expect (err ).NotTo (HaveOccurred ())
67
73
test .T ().Logf ("Created RayCluster %s/%s successfully" , rayCluster .Namespace , rayCluster .Name )
@@ -78,8 +84,8 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
78
84
79
85
rayDashboardURL := getRayDashboardURL (test , rayCluster .Namespace , rayCluster .Name )
80
86
81
- test .T ().Logf ("Connecting to Ray cluster at: %s" , rayDashboardURL . String () )
82
- rayClient := NewRayClusterClient ( rayDashboardURL )
87
+ test .T ().Logf ("Connecting to Ray cluster at: %s" , rayDashboardURL )
88
+ rayClient := GetRayClusterClient ( test , rayDashboardURL , test . Config (). BearerToken )
83
89
84
90
test .T ().Logf ("Waiting for RayJob %s/%s to complete" , rayJob .Namespace , rayJob .Name )
85
91
test .Eventually (RayJob (test , rayJob .Namespace , rayJob .Name ), TestTimeoutLong ).
@@ -111,9 +117,15 @@ func TestMnistRayJobRayClusterAppWrapperGpu(t *testing.T) {
111
117
func runMnistRayJobRayClusterAppWrapper (t * testing.T , accelerator string , numberOfGpus int ) {
112
118
test := With (t )
113
119
114
- // Create a namespace and localqueue in that namespace
120
+ // Create a namespace
115
121
namespace := test .NewTestNamespace ()
116
- localQueue := CreateKueueLocalQueue (test , namespace .Name , "e2e-cluster-queue" )
122
+
123
+ // Create Kueue resources
124
+ resourceFlavor := CreateKueueResourceFlavor (test , v1beta1.ResourceFlavorSpec {})
125
+ defer test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
126
+ clusterQueue := createClusterQueue (test , resourceFlavor , numberOfGpus )
127
+ defer test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
128
+ localQueue := CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
117
129
118
130
// Create MNIST training script
119
131
mnist := constructMNISTConfigMap (test , namespace )
@@ -167,8 +179,8 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
167
179
168
180
rayDashboardURL := getRayDashboardURL (test , rayCluster .Namespace , rayCluster .Name )
169
181
170
- test .T ().Logf ("Connecting to Ray cluster at: %s" , rayDashboardURL . String () )
171
- rayClient := NewRayClusterClient ( rayDashboardURL )
182
+ test .T ().Logf ("Connecting to Ray cluster at: %s" , rayDashboardURL )
183
+ rayClient := GetRayClusterClient ( test , rayDashboardURL , test . Config (). BearerToken )
172
184
173
185
test .T ().Logf ("Waiting for RayJob %s/%s to complete" , rayJob .Namespace , rayJob .Name )
174
186
test .Eventually (RayJob (test , rayJob .Namespace , rayJob .Name ), TestTimeoutLong ).
@@ -374,7 +386,7 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
374
386
}
375
387
}
376
388
377
- func getRayDashboardURL (test Test , namespace , rayClusterName string ) url. URL {
389
+ func getRayDashboardURL (test Test , namespace , rayClusterName string ) string {
378
390
dashboardName := "ray-dashboard-" + rayClusterName
379
391
380
392
if IsOpenShift (test ) {
@@ -396,10 +408,10 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL {
396
408
return resp .StatusCode , nil
397
409
}, TestTimeoutShort ).Should (Not (Equal (503 )))
398
410
399
- return url.URL {
400
- Scheme : "https" ,
401
- Host : hostname ,
402
- }
411
+ dashboardUrl , _ := url .Parse ( "https://" + hostname )
412
+ test . T (). Logf ( "Ray-dashboard route : %s \n " , dashboardUrl . String ())
413
+
414
+ return dashboardUrl . String ()
403
415
}
404
416
405
417
ingress := GetIngress (test , namespace , dashboardName )
@@ -408,8 +420,41 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) url.URL {
408
420
test .Eventually (Ingress (test , ingress .Namespace , ingress .Name ), TestTimeoutShort ).
409
421
Should (WithTransform (LoadBalancerIngresses , HaveLen (1 )))
410
422
411
- return url.URL {
412
- Scheme : "http" ,
413
- Host : ingress .Spec .Rules [0 ].Host ,
423
+ hostname := ingress .Spec .Rules [0 ].Host
424
+ dashboardUrl , _ := url .Parse ("http://" + hostname )
425
+ test .T ().Logf ("Ray-dashboard route : %s\n " , dashboardUrl .String ())
426
+
427
+ return dashboardUrl .String ()
428
+ }
429
+
430
+ // Create ClusterQueue
431
+ func createClusterQueue (test Test , resourceFlavor * v1beta1.ResourceFlavor , numberOfGpus int ) * v1beta1.ClusterQueue {
432
+ cqSpec := v1beta1.ClusterQueueSpec {
433
+ NamespaceSelector : & metav1.LabelSelector {},
434
+ ResourceGroups : []v1beta1.ResourceGroup {
435
+ {
436
+ CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ), corev1 .ResourceName ("nvidia.com/gpu" )},
437
+ Flavors : []v1beta1.FlavorQuotas {
438
+ {
439
+ Name : v1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
440
+ Resources : []v1beta1.ResourceQuota {
441
+ {
442
+ Name : corev1 .ResourceCPU ,
443
+ NominalQuota : resource .MustParse ("8" ),
444
+ },
445
+ {
446
+ Name : corev1 .ResourceMemory ,
447
+ NominalQuota : resource .MustParse ("12Gi" ),
448
+ },
449
+ {
450
+ Name : corev1 .ResourceName ("nvidia.com/gpu" ),
451
+ NominalQuota : resource .MustParse (fmt .Sprint (numberOfGpus )),
452
+ },
453
+ },
454
+ },
455
+ },
456
+ },
457
+ },
414
458
}
459
+ return CreateKueueClusterQueue (test , cqSpec )
415
460
}
0 commit comments