7
7
from codeflare_sdk .common .kubernetes_cluster .kube_api_helpers import (
8
8
_kube_api_error_handling ,
9
9
)
10
+ from codeflare_sdk .ray .cluster .generate_yaml import is_openshift_cluster
10
11
11
12
12
13
def get_ray_image ():
@@ -65,19 +66,30 @@ def create_namespace(self):
65
66
return RuntimeError (e )
66
67
67
68
68
- def create_new_resource_flavor (self ):
69
- self .resource_flavor = f"test-resource-flavor-{ random_choice ()} "
70
- create_resource_flavor (self , self .resource_flavor )
69
+ def create_new_resource_flavor (self , num_flavors = 2 ):
70
+ self .resource_flavors = []
71
+ for i in range (num_flavors ):
72
+ default = i < 1
73
+ resource_flavor = f"test-resource-flavor-{ random_choice ()} "
74
+ create_resource_flavor (self , resource_flavor , default )
75
+ self .resource_flavors .append (resource_flavor )
71
76
72
77
73
- def create_new_cluster_queue (self ):
74
- self .cluster_queue = f"test-cluster-queue-{ random_choice ()} "
75
- create_cluster_queue (self , self .cluster_queue , self .resource_flavor )
78
+ def create_new_cluster_queue (self , num_queues = 2 ):
79
+ self .cluster_queues = []
80
+ for i in range (num_queues ):
81
+ cluster_queue_name = f"test-cluster-queue-{ random_choice ()} "
82
+ create_cluster_queue (self , cluster_queue_name , self .resource_flavors [i ])
83
+ self .cluster_queues .append (cluster_queue_name )
76
84
77
85
78
- def create_new_local_queue (self ):
79
- self .local_queue = f"test-local-queue-{ random_choice ()} "
80
- create_local_queue (self , self .cluster_queue , self .local_queue )
86
+ def create_new_local_queue (self , num_queues = 2 ):
87
+ self .local_queues = []
88
+ for i in range (num_queues ):
89
+ is_default = i == 0
90
+ local_queue_name = f"test-local-queue-{ random_choice ()} "
91
+ create_local_queue (self , self .cluster_queues [i ], local_queue_name , is_default )
92
+ self .local_queues .append (local_queue_name )
81
93
82
94
83
95
def create_namespace_with_name (self , namespace_name ):
@@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
132
144
{"name" : "memory" , "nominalQuota" : "36Gi" },
133
145
{"name" : "nvidia.com/gpu" , "nominalQuota" : 1 },
134
146
],
135
- }
147
+ },
136
148
],
137
149
}
138
150
],
@@ -161,11 +173,21 @@ def create_cluster_queue(self, cluster_queue, flavor):
161
173
self .cluster_queue = cluster_queue
162
174
163
175
164
- def create_resource_flavor (self , flavor ):
176
+ def create_resource_flavor (self , flavor , default = True ):
165
177
resource_flavor_json = {
166
178
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
167
179
"kind" : "ResourceFlavor" ,
168
180
"metadata" : {"name" : flavor },
181
+ "spec" : {
182
+ "nodeLabels" : {"worker-1" if default else "ingress-ready" : "true" },
183
+ "tolerations" : [
184
+ {
185
+ "key" : "node-role.kubernetes.io/control-plane" ,
186
+ "operator" : "Exists" ,
187
+ "effect" : "NoSchedule" ,
188
+ }
189
+ ],
190
+ },
169
191
}
170
192
171
193
try :
@@ -190,14 +212,14 @@ def create_resource_flavor(self, flavor):
190
212
self .resource_flavor = flavor
191
213
192
214
193
- def create_local_queue (self , cluster_queue , local_queue ):
215
+ def create_local_queue (self , cluster_queue , local_queue , is_default = True ):
194
216
local_queue_json = {
195
217
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
196
218
"kind" : "LocalQueue" ,
197
219
"metadata" : {
198
220
"namespace" : self .namespace ,
199
221
"name" : local_queue ,
200
- "annotations" : {"kueue.x-k8s.io/default-queue" : "true" },
222
+ "annotations" : {"kueue.x-k8s.io/default-queue" : str ( is_default ). lower () },
201
223
},
202
224
"spec" : {"clusterQueue" : cluster_queue },
203
225
}
@@ -235,25 +257,70 @@ def create_kueue_resources(self):
235
257
236
258
def delete_kueue_resources (self ):
237
259
# Delete if given cluster-queue exists
238
- try :
239
- self .custom_api .delete_cluster_custom_object (
240
- group = "kueue.x-k8s.io" ,
241
- plural = "clusterqueues" ,
242
- version = "v1beta1" ,
243
- name = self .cluster_queue ,
244
- )
245
- print (f"\n '{ self .cluster_queue } ' cluster-queue deleted" )
246
- except Exception as e :
247
- print (f"\n Error deleting cluster-queue '{ self .cluster_queue } ' : { e } " )
260
+ for cq in self .cluster_queues :
261
+ try :
262
+ self .custom_api .delete_cluster_custom_object (
263
+ group = "kueue.x-k8s.io" ,
264
+ plural = "clusterqueues" ,
265
+ version = "v1beta1" ,
266
+ name = cq ,
267
+ )
268
+ print (f"\n '{ cq } ' cluster-queue deleted" )
269
+ except Exception as e :
270
+ print (f"\n Error deleting cluster-queue '{ cq } ' : { e } " )
248
271
249
272
# Delete if given resource-flavor exists
273
+ for flavor in self .resource_flavors :
274
+ try :
275
+ self .custom_api .delete_cluster_custom_object (
276
+ group = "kueue.x-k8s.io" ,
277
+ plural = "resourceflavors" ,
278
+ version = "v1beta1" ,
279
+ name = flavor ,
280
+ )
281
+ print (f"'{ flavor } ' resource-flavor deleted" )
282
+ except Exception as e :
283
+ print (f"\n Error deleting resource-flavor '{ flavor } ': { e } " )
284
+
285
+
286
+ def get_pod_node (self , namespace , name ):
287
+ label_selector = f"ray.io/cluster={ name } "
288
+ if is_openshift_cluster ():
289
+ cluster_type = "openshift"
290
+ else :
291
+ cluster_type = "kind"
250
292
try :
251
- self .custom_api .delete_cluster_custom_object (
252
- group = "kueue.x-k8s.io" ,
253
- plural = "resourceflavors" ,
254
- version = "v1beta1" ,
255
- name = self .resource_flavor ,
256
- )
257
- print (f"'{ self .resource_flavor } ' resource-flavor deleted" )
293
+ if cluster_type == "openshift" :
294
+ args = [
295
+ "get" ,
296
+ "pod" ,
297
+ "-l" ,
298
+ label_selector ,
299
+ "-n" ,
300
+ namespace ,
301
+ "-o" ,
302
+ "jsonpath={.items[0].spec.nodeName}" ,
303
+ ]
304
+ node_name = run_oc_command (args )
305
+ if not node_name :
306
+ raise ValueError (
307
+ f"Unable to retrieve node name for pod '{ name } ' in namespace '{ namespace } '"
308
+ )
309
+ else :
310
+ pods = self .api_instance .list_namespaced_pod (
311
+ namespace , label_selector = label_selector
312
+ )
313
+ if not pods .items :
314
+ raise ValueError (
315
+ f"Unable to retrieve node name for pod '{ name } ' in namespace '{ namespace } '"
316
+ )
317
+ pod = pods .items [0 ]
318
+ node_name = pod .spec .node_name
319
+
320
+ return node_name
321
+
258
322
except Exception as e :
259
- print (f"\n Error deleting resource-flavor '{ self .resource_flavor } ' : { e } " )
323
+ print (
324
+ f"Error retrieving node name for pod '{ name } ' in namespace '{ namespace } ': { e } "
325
+ )
326
+ return None
0 commit comments