52
52
import requests
53
53
54
54
from kubernetes import config
55
+ from kubernetes .dynamic import DynamicClient
56
+ from kubernetes import client as k8s_client
57
+ from kubernetes .client .rest import ApiException
58
+
55
59
from kubernetes .client .rest import ApiException
56
60
import warnings
57
61
@@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
84
88
if is_notebook ():
85
89
cluster_up_down_buttons (self )
86
90
91
+ def get_dynamic_client (self ):
92
+ """Return a dynamic client, optionally mocked in tests."""
93
+ return DynamicClient (get_api_client ())
94
+
95
+ def config_check (self ):
96
+ """Return a dynamic client, optionally mocked in tests."""
97
+ return config_check ()
98
+
87
99
@property
88
100
def _client_headers (self ):
89
101
k8_client = get_api_client ()
@@ -95,9 +107,7 @@ def _client_headers(self):
95
107
96
108
@property
97
109
def _client_verify_tls (self ):
98
- if not _is_openshift_cluster or not self .config .verify_tls :
99
- return False
100
- return True
110
+ return _is_openshift_cluster and self .config .verify_tls
101
111
102
112
@property
103
113
def job_client (self ):
@@ -130,7 +140,6 @@ def create_resource(self):
130
140
raise TypeError (
131
141
f"Namespace { self .config .namespace } is of type { type (self .config .namespace )} . Check your Kubernetes Authentication."
132
142
)
133
-
134
143
return build_ray_cluster (self )
135
144
136
145
# creates a new cluster with the provided or default spec
@@ -139,10 +148,11 @@ def up(self):
139
148
Applies the Cluster yaml, pushing the resource request onto
140
149
the Kueue localqueue.
141
150
"""
142
-
151
+ print (
152
+ "WARNING: The up() function is planned for deprecation in favor of apply()."
153
+ )
143
154
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
144
155
self ._throw_for_no_raycluster ()
145
-
146
156
namespace = self .config .namespace
147
157
148
158
try :
@@ -176,6 +186,52 @@ def up(self):
176
186
except Exception as e : # pragma: no cover
177
187
return _kube_api_error_handling (e )
178
188
189
+ # Applies a new cluster with the provided or default spec
190
+ def apply (self , force = False ):
191
+ """
192
+ Applies the Cluster yaml using server-side apply.
193
+ If 'force' is set to True, conflicts will be forced.
194
+ """
195
+ # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
196
+ self ._throw_for_no_raycluster ()
197
+ namespace = self .config .namespace
198
+
199
+ try :
200
+ self .config_check ()
201
+ api_instance = client .CustomObjectsApi (get_api_client ())
202
+ crds = self .get_dynamic_client ().resources
203
+ api_instance = crds .get (
204
+ api_version = "workload.codeflare.dev/v1beta2" , kind = "AppWrapper"
205
+ )
206
+ if self .config .appwrapper :
207
+ if self .config .write_to_file :
208
+ with open (self .resource_yaml ) as f :
209
+ aw = yaml .load (f , Loader = yaml .FullLoader )
210
+ api_instance .server_side_apply (
211
+ group = "workload.codeflare.dev" ,
212
+ version = "v1beta2" ,
213
+ namespace = namespace ,
214
+ plural = "appwrappers" ,
215
+ body = aw ,
216
+ )
217
+ else :
218
+ api_instance .server_side_apply (
219
+ group = "workload.codeflare.dev" ,
220
+ version = "v1beta2" ,
221
+ namespace = namespace ,
222
+ plural = "appwrappers" ,
223
+ body = self .resource_yaml ,
224
+ )
225
+ print (f"AppWrapper: '{ self .config .name } ' has successfully been created" )
226
+ else :
227
+ api_instance = crds .get (api_version = "ray.io/v1" , kind = "RayCluster" )
228
+ self ._component_resources_apply (namespace , api_instance )
229
+ print (
230
+ f"Ray Cluster: '{ self .config .name } ' has successfully been applied"
231
+ )
232
+ except Exception as e : # pragma: no cover
233
+ return _kube_api_error_handling (e )
234
+
179
235
def _throw_for_no_raycluster (self ):
180
236
api_instance = client .CustomObjectsApi (get_api_client ())
181
237
try :
@@ -204,7 +260,7 @@ def down(self):
204
260
resource_name = self .config .name
205
261
self ._throw_for_no_raycluster ()
206
262
try :
207
- config_check ()
263
+ self . config_check ()
208
264
api_instance = client .CustomObjectsApi (get_api_client ())
209
265
if self .config .appwrapper :
210
266
api_instance .delete_namespaced_custom_object (
@@ -507,6 +563,16 @@ def _component_resources_up(
507
563
else :
508
564
_create_resources (self .resource_yaml , namespace , api_instance )
509
565
566
+ def _component_resources_apply (
567
+ self , namespace : str , api_instance : client .CustomObjectsApi
568
+ ):
569
+ if self .config .write_to_file :
570
+ with open (self .resource_yaml ) as f :
571
+ ray_cluster = yaml .safe_load (f )
572
+ _apply_resources (ray_cluster , namespace , api_instance )
573
+ else :
574
+ _apply_resources (self .resource_yaml , namespace , api_instance )
575
+
510
576
def _component_resources_down (
511
577
self , namespace : str , api_instance : client .CustomObjectsApi
512
578
):
@@ -744,6 +810,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA
744
810
)
745
811
746
812
813
+ def _apply_resources (
814
+ yamls , namespace : str , api_instance : client .CustomObjectsApi , force = False
815
+ ):
816
+ api_instance .server_side_apply (
817
+ field_manager = "cluster-manager" ,
818
+ group = "ray.io" ,
819
+ version = "v1" ,
820
+ namespace = namespace ,
821
+ plural = "rayclusters" ,
822
+ body = yamls ,
823
+ force_conflicts = force , # Allow forcing conflicts if needed
824
+ )
825
+
826
+
747
827
def _check_aw_exists (name : str , namespace : str ) -> bool :
748
828
try :
749
829
config_check ()
0 commit comments