52
52
import requests
53
53
54
54
from kubernetes import config
55
+ from kubernetes .dynamic import DynamicClient
56
+ from kubernetes import client as k8s_client
57
+ from kubernetes .client .rest import ApiException
58
+
55
59
from kubernetes .client .rest import ApiException
56
60
import warnings
57
61
@@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
84
88
if is_notebook ():
85
89
cluster_up_down_buttons (self )
86
90
91
+ def get_dynamic_client (self ): # pragma: no cover
92
+ """Return a dynamic client, optionally mocked in tests."""
93
+ return DynamicClient (get_api_client ())
94
+
95
+ def config_check (self ):
96
+ """Return a dynamic client, optionally mocked in tests."""
97
+ return config_check ()
98
+
87
99
@property
88
100
def _client_headers (self ):
89
101
k8_client = get_api_client ()
@@ -95,9 +107,7 @@ def _client_headers(self):
95
107
96
108
@property
97
109
def _client_verify_tls (self ):
98
- if not _is_openshift_cluster or not self .config .verify_tls :
99
- return False
100
- return True
110
+ return _is_openshift_cluster and self .config .verify_tls
101
111
102
112
@property
103
113
def job_client (self ):
@@ -121,7 +131,6 @@ def create_resource(self):
121
131
Called upon cluster object creation, creates an AppWrapper yaml based on
122
132
the specifications of the ClusterConfiguration.
123
133
"""
124
-
125
134
if self .config .namespace is None :
126
135
self .config .namespace = get_current_namespace ()
127
136
if self .config .namespace is None :
@@ -130,7 +139,6 @@ def create_resource(self):
130
139
raise TypeError (
131
140
f"Namespace { self .config .namespace } is of type { type (self .config .namespace )} . Check your Kubernetes Authentication."
132
141
)
133
-
134
142
return build_ray_cluster (self )
135
143
136
144
# creates a new cluster with the provided or default spec
@@ -139,10 +147,11 @@ def up(self):
139
147
Applies the Cluster yaml, pushing the resource request onto
140
148
the Kueue localqueue.
141
149
"""
142
-
150
+ print (
151
+ "WARNING: The up() function is planned for deprecation in favor of apply()."
152
+ )
143
153
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
144
154
self ._throw_for_no_raycluster ()
145
-
146
155
namespace = self .config .namespace
147
156
148
157
try :
@@ -176,6 +185,52 @@ def up(self):
176
185
except Exception as e : # pragma: no cover
177
186
return _kube_api_error_handling (e )
178
187
188
+ # Applies a new cluster with the provided or default spec
189
+ def apply (self , force = False ):
190
+ """
191
+ Applies the Cluster yaml using server-side apply.
192
+ If 'force' is set to True, conflicts will be forced.
193
+ """
194
+ # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
195
+ self ._throw_for_no_raycluster ()
196
+ namespace = self .config .namespace
197
+
198
+ try :
199
+ self .config_check ()
200
+ api_instance = client .CustomObjectsApi (get_api_client ())
201
+ crds = self .get_dynamic_client ().resources
202
+ api_instance = crds .get (
203
+ api_version = "workload.codeflare.dev/v1beta2" , kind = "AppWrapper"
204
+ )
205
+ if self .config .appwrapper :
206
+ if self .config .write_to_file :
207
+ with open (self .resource_yaml ) as f :
208
+ aw = yaml .load (f , Loader = yaml .FullLoader )
209
+ api_instance .server_side_apply (
210
+ group = "workload.codeflare.dev" ,
211
+ version = "v1beta2" ,
212
+ namespace = namespace ,
213
+ plural = "appwrappers" ,
214
+ body = aw ,
215
+ )
216
+ else :
217
+ api_instance .server_side_apply (
218
+ group = "workload.codeflare.dev" ,
219
+ version = "v1beta2" ,
220
+ namespace = namespace ,
221
+ plural = "appwrappers" ,
222
+ body = self .resource_yaml ,
223
+ )
224
+ print (f"AppWrapper: '{ self .config .name } ' has successfully been created" )
225
+ else :
226
+ api_instance = crds .get (api_version = "ray.io/v1" , kind = "RayCluster" )
227
+ self ._component_resources_apply (namespace , api_instance )
228
+ print (
229
+ f"Ray Cluster: '{ self .config .name } ' has successfully been applied"
230
+ )
231
+ except Exception as e : # pragma: no cover
232
+ return _kube_api_error_handling (e )
233
+
179
234
def _throw_for_no_raycluster (self ):
180
235
api_instance = client .CustomObjectsApi (get_api_client ())
181
236
try :
@@ -204,7 +259,7 @@ def down(self):
204
259
resource_name = self .config .name
205
260
self ._throw_for_no_raycluster ()
206
261
try :
207
- config_check ()
262
+ self . config_check ()
208
263
api_instance = client .CustomObjectsApi (get_api_client ())
209
264
if self .config .appwrapper :
210
265
api_instance .delete_namespaced_custom_object (
@@ -507,6 +562,16 @@ def _component_resources_up(
507
562
else :
508
563
_create_resources (self .resource_yaml , namespace , api_instance )
509
564
565
+ def _component_resources_apply (
566
+ self , namespace : str , api_instance : client .CustomObjectsApi
567
+ ):
568
+ if self .config .write_to_file :
569
+ with open (self .resource_yaml ) as f :
570
+ ray_cluster = yaml .safe_load (f )
571
+ _apply_resources (ray_cluster , namespace , api_instance )
572
+ else :
573
+ _apply_resources (self .resource_yaml , namespace , api_instance )
574
+
510
575
def _component_resources_down (
511
576
self , namespace : str , api_instance : client .CustomObjectsApi
512
577
):
@@ -744,6 +809,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA
744
809
)
745
810
746
811
812
+ def _apply_resources (
813
+ yamls , namespace : str , api_instance : client .CustomObjectsApi , force = False
814
+ ):
815
+ api_instance .server_side_apply (
816
+ field_manager = "cluster-manager" ,
817
+ group = "ray.io" ,
818
+ version = "v1" ,
819
+ namespace = namespace ,
820
+ plural = "rayclusters" ,
821
+ body = yamls ,
822
+ force_conflicts = force , # Allow forcing conflicts if needed
823
+ )
824
+
825
+
747
826
def _check_aw_exists (name : str , namespace : str ) -> bool :
748
827
try :
749
828
config_check ()
0 commit comments