36
36
RayClusterStatus ,
37
37
)
38
38
39
+ from kubernetes import client , config
40
+
41
+ import yaml
42
+
39
43
40
44
class Cluster :
41
45
"""
@@ -110,8 +114,17 @@ def up(self):
110
114
"""
111
115
namespace = self .config .namespace
112
116
try :
113
- with oc .project (namespace ):
114
- oc .invoke ("apply" , ["-f" , self .app_wrapper_yaml ])
117
+ config .load_kube_config ()
118
+ api_instance = client .CustomObjectsApi ()
119
+ with open (self .app_wrapper_yaml ) as f :
120
+ aw = yaml .load (f , Loader = yaml .FullLoader )
121
+ api_instance .create_namespaced_custom_object (
122
+ group = "mcad.ibm.com" ,
123
+ version = "v1beta1" ,
124
+ namespace = namespace ,
125
+ plural = "appwrappers" ,
126
+ body = aw ,
127
+ )
115
128
except oc .OpenShiftPythonException as osp : # pragma: no cover
116
129
error_msg = osp .result .err ()
117
130
if "Unauthorized" in error_msg :
@@ -127,8 +140,15 @@ def down(self):
127
140
"""
128
141
namespace = self .config .namespace
129
142
try :
130
- with oc .project (namespace ):
131
- oc .invoke ("delete" , ["AppWrapper" , self .app_wrapper_name ])
143
+ config .load_kube_config ()
144
+ api_instance = client .CustomObjectsApi ()
145
+ api_instance .delete_namespaced_custom_object (
146
+ group = "mcad.ibm.com" ,
147
+ version = "v1beta1" ,
148
+ namespace = namespace ,
149
+ plural = "appwrappers" ,
150
+ name = self .app_wrapper_name ,
151
+ )
132
152
except oc .OpenShiftPythonException as osp : # pragma: no cover
133
153
error_msg = osp .result .err ()
134
154
if (
@@ -322,14 +342,16 @@ def list_all_queued(namespace: str, print_to_console: bool = True):
322
342
323
343
324
344
def _app_wrapper_status (name , namespace = "default" ) -> Optional [AppWrapper ]:
325
- cluster = None
326
345
try :
327
- with oc .project (namespace ), oc .timeout (10 * 60 ):
328
- cluster = oc .selector (f"appwrapper/{ name } " ).object ()
346
+ config .load_kube_config ()
347
+ api_instance = client .CustomObjectsApi ()
348
+ aws = api_instance .list_namespaced_custom_object (
349
+ group = "mcad.ibm.com" ,
350
+ version = "v1beta1" ,
351
+ namespace = namespace ,
352
+ plural = "appwrappers" ,
353
+ )
329
354
except oc .OpenShiftPythonException as osp : # pragma: no cover
330
- msg = osp .msg
331
- if "Expected a single object, but selected 0" in msg :
332
- return cluster
333
355
error_msg = osp .result .err ()
334
356
if not (
335
357
'the server doesn\' t have a resource type "appwrapper"' in error_msg
@@ -339,21 +361,23 @@ def _app_wrapper_status(name, namespace="default") -> Optional[AppWrapper]:
339
361
):
340
362
raise osp
341
363
342
- if cluster :
343
- return _map_to_app_wrapper ( cluster )
344
-
345
- return cluster
364
+ for aw in aws [ "items" ] :
365
+ if aw [ "metadata" ][ "name" ] == name :
366
+ return _map_to_app_wrapper ( aw )
367
+ return None
346
368
347
369
348
370
def _ray_cluster_status (name , namespace = "default" ) -> Optional [RayCluster ]:
349
- cluster = None
350
371
try :
351
- with oc .project (namespace ), oc .timeout (10 * 60 ):
352
- cluster = oc .selector (f"rayclusters/{ name } " ).object ()
372
+ config .load_kube_config ()
373
+ api_instance = client .CustomObjectsApi ()
374
+ rcs = api_instance .list_namespaced_custom_object (
375
+ group = "ray.io" ,
376
+ version = "v1alpha1" ,
377
+ namespace = namespace ,
378
+ plural = "rayclusters" ,
379
+ )
353
380
except oc .OpenShiftPythonException as osp : # pragma: no cover
354
- msg = osp .msg
355
- if "Expected a single object, but selected 0" in msg :
356
- return cluster
357
381
error_msg = osp .result .err ()
358
382
if not (
359
383
'the server doesn\' t have a resource type "rayclusters"' in error_msg
@@ -363,17 +387,23 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]:
363
387
):
364
388
raise osp
365
389
366
- if cluster :
367
- return _map_to_ray_cluster ( cluster )
368
-
369
- return cluster
390
+ for rc in rcs [ "items" ] :
391
+ if rc [ "metadata" ][ "name" ] == name :
392
+ return _map_to_ray_cluster ( rc )
393
+ return None
370
394
371
395
372
396
def _get_ray_clusters (namespace = "default" ) -> List [RayCluster ]:
373
397
list_of_clusters = []
374
398
try :
375
- with oc .project (namespace ), oc .timeout (10 * 60 ):
376
- ray_clusters = oc .selector ("rayclusters" ).objects ()
399
+ config .load_kube_config ()
400
+ api_instance = client .CustomObjectsApi ()
401
+ rcs = api_instance .list_namespaced_custom_object (
402
+ group = "ray.io" ,
403
+ version = "v1alpha1" ,
404
+ namespace = namespace ,
405
+ plural = "rayclusters" ,
406
+ )
377
407
except oc .OpenShiftPythonException as osp : # pragma: no cover
378
408
error_msg = osp .result .err ()
379
409
if (
@@ -388,8 +418,8 @@ def _get_ray_clusters(namespace="default") -> List[RayCluster]:
388
418
else :
389
419
raise osp
390
420
391
- for cluster in ray_clusters :
392
- list_of_clusters .append (_map_to_ray_cluster (cluster ))
421
+ for rc in rcs [ "items" ] :
422
+ list_of_clusters .append (_map_to_ray_cluster (rc ))
393
423
return list_of_clusters
394
424
395
425
@@ -399,8 +429,14 @@ def _get_app_wrappers(
399
429
list_of_app_wrappers = []
400
430
401
431
try :
402
- with oc .project (namespace ), oc .timeout (10 * 60 ):
403
- app_wrappers = oc .selector ("appwrappers" ).objects ()
432
+ config .load_kube_config ()
433
+ api_instance = client .CustomObjectsApi ()
434
+ aws = api_instance .list_namespaced_custom_object (
435
+ group = "mcad.ibm.com" ,
436
+ version = "v1beta1" ,
437
+ namespace = namespace ,
438
+ plural = "appwrappers" ,
439
+ )
404
440
except oc .OpenShiftPythonException as osp : # pragma: no cover
405
441
error_msg = osp .result .err ()
406
442
if (
@@ -415,7 +451,7 @@ def _get_app_wrappers(
415
451
else :
416
452
raise osp
417
453
418
- for item in app_wrappers :
454
+ for item in aws [ "items" ] :
419
455
app_wrapper = _map_to_app_wrapper (item )
420
456
if filter and app_wrapper .status in filter :
421
457
list_of_app_wrappers .append (app_wrapper )
@@ -425,48 +461,46 @@ def _get_app_wrappers(
425
461
return list_of_app_wrappers
426
462
427
463
428
- def _map_to_ray_cluster (cluster ) -> Optional [RayCluster ]:
429
- cluster_model = cluster .model
430
- if type (cluster_model .status .state ) == oc .model .MissingModel :
431
- status = RayClusterStatus .UNKNOWN
464
+ def _map_to_ray_cluster (rc ) -> Optional [RayCluster ]:
465
+ if "state" in rc ["status" ]:
466
+ status = RayClusterStatus (rc ["status" ]["state" ].lower ())
432
467
else :
433
- status = RayClusterStatus ( cluster_model . status . state . lower ())
468
+ status = RayClusterStatus . UNKNOWN
434
469
435
- with oc .project (cluster . namespace () ), oc .timeout (10 * 60 ):
470
+ with oc .project (rc [ "metadata" ][ " namespace" ] ), oc .timeout (10 * 60 ):
436
471
route = (
437
- oc .selector (f"route/ray-dashboard-{ cluster . name () } " )
472
+ oc .selector (f"route/ray-dashboard-{ rc [ 'metadata' ][ ' name' ] } " )
438
473
.object ()
439
474
.model .spec .host
440
475
)
441
476
442
477
return RayCluster (
443
- name = cluster . name () ,
478
+ name = rc [ "metadata" ][ " name" ] ,
444
479
status = status ,
445
480
# for now we are not using autoscaling so same replicas is fine
446
- min_workers = cluster_model . spec . workerGroupSpecs [0 ]. replicas ,
447
- max_workers = cluster_model . spec . workerGroupSpecs [0 ]. replicas ,
448
- worker_mem_max = cluster_model . spec . workerGroupSpecs [0 ]
449
- . template . spec . containers [ 0 ]
450
- . resources . limits . memory ,
451
- worker_mem_min = cluster_model . spec . workerGroupSpecs [0 ]
452
- . template . spec . containers [ 0 ]
453
- . resources . requests . memory ,
454
- worker_cpu = cluster_model . spec . workerGroupSpecs [0 ]
455
- . template . spec . containers [ 0 ]
456
- . resources . limits . cpu ,
481
+ min_workers = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ " replicas" ] ,
482
+ max_workers = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ " replicas" ] ,
483
+ worker_mem_max = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][
484
+ " containers"
485
+ ][ 0 ][ " resources" ][ " limits" ][ " memory" ] ,
486
+ worker_mem_min = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][
487
+ " containers"
488
+ ][ 0 ][ " resources" ][ " requests" ][ " memory" ] ,
489
+ worker_cpu = rc [ " spec" ][ " workerGroupSpecs" ] [0 ][ "template" ][ "spec" ][ "containers" ][
490
+ 0
491
+ ][ " resources" ][ " limits" ][ " cpu" ] ,
457
492
worker_gpu = 0 , # hard to detect currently how many gpus, can override it with what the user asked for
458
- namespace = cluster . namespace () ,
493
+ namespace = rc [ "metadata" ][ " namespace" ] ,
459
494
dashboard = route ,
460
495
)
461
496
462
497
463
- def _map_to_app_wrapper (cluster ) -> AppWrapper :
464
- cluster_model = cluster .model
498
+ def _map_to_app_wrapper (aw ) -> AppWrapper :
465
499
return AppWrapper (
466
- name = cluster . name () ,
467
- status = AppWrapperStatus (cluster_model . status . state .lower ()),
468
- can_run = cluster_model . status . canrun ,
469
- job_state = cluster_model . status . queuejobstate ,
500
+ name = aw [ "metadata" ][ " name" ] ,
501
+ status = AppWrapperStatus (aw [ " status" ][ " state" ] .lower ()),
502
+ can_run = aw [ " status" ][ " canrun" ] ,
503
+ job_state = aw [ " status" ][ " queuejobstate" ] ,
470
504
)
471
505
472
506
0 commit comments