52
52
import requests
53
53
54
54
from kubernetes import config
55
+ from kubernetes .dynamic import DynamicClient
56
+ from kubernetes import client as k8s_client
57
+ from kubernetes .client .rest import ApiException
58
+
55
59
from kubernetes .client .rest import ApiException
56
60
import warnings
57
61
@@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
84
88
if is_notebook ():
85
89
cluster_up_down_buttons (self )
86
90
91
+ def get_dynamic_client (self ):
92
+ """Return a dynamic client, optionally mocked in tests."""
93
+ return DynamicClient (get_api_client ())
94
+
95
+ def config_check (self ):
96
+ """Return a dynamic client, optionally mocked in tests."""
97
+ return config_check ()
98
+
87
99
@property
88
100
def _client_headers (self ):
89
101
k8_client = get_api_client ()
@@ -130,7 +142,6 @@ def create_resource(self):
130
142
raise TypeError (
131
143
f"Namespace { self .config .namespace } is of type { type (self .config .namespace )} . Check your Kubernetes Authentication."
132
144
)
133
-
134
145
return build_ray_cluster (self )
135
146
136
147
# creates a new cluster with the provided or default spec
@@ -139,10 +150,11 @@ def up(self):
139
150
Applies the Cluster yaml, pushing the resource request onto
140
151
the Kueue localqueue.
141
152
"""
142
-
153
+ print (
154
+ "WARNING: The up() function is planned for deprecation in favor of apply()."
155
+ )
143
156
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
144
157
self ._throw_for_no_raycluster ()
145
-
146
158
namespace = self .config .namespace
147
159
148
160
try :
@@ -176,6 +188,52 @@ def up(self):
176
188
except Exception as e : # pragma: no cover
177
189
return _kube_api_error_handling (e )
178
190
191
+ # Applies a new cluster with the provided or default spec
192
+ def apply (self , force = False ):
193
+ """
194
+ Applies the Cluster yaml using server-side apply.
195
+ If 'force' is set to True, conflicts will be forced.
196
+ """
197
+ # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
198
+ self ._throw_for_no_raycluster ()
199
+ namespace = self .config .namespace
200
+
201
+ try :
202
+ self .config_check ()
203
+ api_instance = client .CustomObjectsApi (get_api_client ())
204
+ crds = self .get_dynamic_client ().resources
205
+ api_instance = crds .get (
206
+ api_version = "workload.codeflare.dev/v1beta2" , kind = "AppWrapper"
207
+ )
208
+ if self .config .appwrapper :
209
+ if self .config .write_to_file :
210
+ with open (self .resource_yaml ) as f :
211
+ aw = yaml .load (f , Loader = yaml .FullLoader )
212
+ api_instance .server_side_apply (
213
+ group = "workload.codeflare.dev" ,
214
+ version = "v1beta2" ,
215
+ namespace = namespace ,
216
+ plural = "appwrappers" ,
217
+ body = aw ,
218
+ )
219
+ else :
220
+ api_instance .server_side_apply (
221
+ group = "workload.codeflare.dev" ,
222
+ version = "v1beta2" ,
223
+ namespace = namespace ,
224
+ plural = "appwrappers" ,
225
+ body = self .resource_yaml ,
226
+ )
227
+ print (f"AppWrapper: '{ self .config .name } ' has successfully been created" )
228
+ else :
229
+ api_instance = crds .get (api_version = "ray.io/v1" , kind = "RayCluster" )
230
+ self ._component_resources_apply (namespace , api_instance )
231
+ print (
232
+ f"Ray Cluster: '{ self .config .name } ' has successfully been applied"
233
+ )
234
+ except Exception as e : # pragma: no cover
235
+ return _kube_api_error_handling (e )
236
+
179
237
def _throw_for_no_raycluster (self ):
180
238
api_instance = client .CustomObjectsApi (get_api_client ())
181
239
try :
@@ -204,7 +262,7 @@ def down(self):
204
262
resource_name = self .config .name
205
263
self ._throw_for_no_raycluster ()
206
264
try :
207
- config_check ()
265
+ self . config_check ()
208
266
api_instance = client .CustomObjectsApi (get_api_client ())
209
267
if self .config .appwrapper :
210
268
api_instance .delete_namespaced_custom_object (
@@ -507,6 +565,16 @@ def _component_resources_up(
507
565
else :
508
566
_create_resources (self .resource_yaml , namespace , api_instance )
509
567
568
+ def _component_resources_apply (
569
+ self , namespace : str , api_instance : client .CustomObjectsApi
570
+ ):
571
+ if self .config .write_to_file :
572
+ with open (self .resource_yaml ) as f :
573
+ ray_cluster = yaml .safe_load (f )
574
+ _apply_resources (ray_cluster , namespace , api_instance )
575
+ else :
576
+ _apply_resources (self .resource_yaml , namespace , api_instance )
577
+
510
578
def _component_resources_down (
511
579
self , namespace : str , api_instance : client .CustomObjectsApi
512
580
):
@@ -744,6 +812,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA
744
812
)
745
813
746
814
815
+ def _apply_resources (
816
+ yamls , namespace : str , api_instance : client .CustomObjectsApi , force = False
817
+ ):
818
+ api_instance .server_side_apply (
819
+ field_manager = "cluster-manager" ,
820
+ group = "ray.io" ,
821
+ version = "v1" ,
822
+ namespace = namespace ,
823
+ plural = "rayclusters" ,
824
+ body = yamls ,
825
+ force_conflicts = force , # Allow forcing conflicts if needed
826
+ )
827
+
828
+
747
829
def _check_aw_exists (name : str , namespace : str ) -> bool :
748
830
try :
749
831
config_check ()
0 commit comments