|
52 | 52 | import requests
|
53 | 53 |
|
54 | 54 | from kubernetes import config
|
| 55 | +from kubernetes.dynamic import DynamicClient |
| 56 | +from kubernetes import client as k8s_client |
| 57 | +from kubernetes.client.rest import ApiException |
| 58 | + |
55 | 59 | from kubernetes.client.rest import ApiException
|
56 | 60 | import warnings
|
57 | 61 |
|
@@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
|
84 | 88 | if is_notebook():
|
85 | 89 | cluster_up_down_buttons(self)
|
86 | 90 |
|
| 91 | + def get_dynamic_client(self): |
| 92 | + """Return a dynamic client, optionally mocked in tests.""" |
| 93 | + return DynamicClient(get_api_client()) |
| 94 | + |
| 95 | + def config_check(self): |
| 96 | + """Return a dynamic client, optionally mocked in tests.""" |
| 97 | + return config_check() |
| 98 | + |
87 | 99 | @property
|
88 | 100 | def _client_headers(self):
|
89 | 101 | k8_client = get_api_client()
|
@@ -139,10 +151,11 @@ def up(self):
|
139 | 151 | Applies the Cluster yaml, pushing the resource request onto
|
140 | 152 | the Kueue localqueue.
|
141 | 153 | """
|
142 |
| - |
| 154 | + print( |
| 155 | + "WARNING: The up() function is planned for deprecation in favor of apply()." |
| 156 | + ) |
143 | 157 | # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
|
144 | 158 | self._throw_for_no_raycluster()
|
145 |
| - |
146 | 159 | namespace = self.config.namespace
|
147 | 160 |
|
148 | 161 | try:
|
@@ -176,6 +189,52 @@ def up(self):
|
176 | 189 | except Exception as e: # pragma: no cover
|
177 | 190 | return _kube_api_error_handling(e)
|
178 | 191 |
|
| 192 | + # Applies a new cluster with the provided or default spec |
| 193 | + def apply(self, force=False): |
| 194 | + """ |
| 195 | + Applies the Cluster yaml using server-side apply. |
| 196 | + If 'force' is set to True, conflicts will be forced. |
| 197 | + """ |
| 198 | + # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError |
| 199 | + self._throw_for_no_raycluster() |
| 200 | + namespace = self.config.namespace |
| 201 | + |
| 202 | + try: |
| 203 | + self.config_check() |
| 204 | + api_instance = client.CustomObjectsApi(get_api_client()) |
| 205 | + crds = self.get_dynamic_client().resources |
| 206 | + api_instance = crds.get( |
| 207 | + api_version="workload.codeflare.dev/v1beta2", kind="AppWrapper" |
| 208 | + ) |
| 209 | + if self.config.appwrapper: |
| 210 | + if self.config.write_to_file: |
| 211 | + with open(self.resource_yaml) as f: |
| 212 | + aw = yaml.load(f, Loader=yaml.FullLoader) |
| 213 | + api_instance.server_side_apply( |
| 214 | + group="workload.codeflare.dev", |
| 215 | + version="v1beta2", |
| 216 | + namespace=namespace, |
| 217 | + plural="appwrappers", |
| 218 | + body=aw, |
| 219 | + ) |
| 220 | + else: |
| 221 | + api_instance.server_side_apply( |
| 222 | + group="workload.codeflare.dev", |
| 223 | + version="v1beta2", |
| 224 | + namespace=namespace, |
| 225 | + plural="appwrappers", |
| 226 | + body=self.resource_yaml, |
| 227 | + ) |
| 228 | + print(f"AppWrapper: '{self.config.name}' has successfully been created") |
| 229 | + else: |
| 230 | + api_instance = crds.get(api_version="ray.io/v1", kind="RayCluster") |
| 231 | + self._component_resources_apply(namespace, api_instance, force) |
| 232 | + print( |
| 233 | + f"Ray Cluster: '{self.config.name}' has successfully been applied" |
| 234 | + ) |
| 235 | + except Exception as e: # pragma: no cover |
| 236 | + return _kube_api_error_handling(e) |
| 237 | + |
179 | 238 | def _throw_for_no_raycluster(self):
|
180 | 239 | api_instance = client.CustomObjectsApi(get_api_client())
|
181 | 240 | try:
|
@@ -204,7 +263,7 @@ def down(self):
|
204 | 263 | resource_name = self.config.name
|
205 | 264 | self._throw_for_no_raycluster()
|
206 | 265 | try:
|
207 |
| - config_check() |
| 266 | + self.config_check() |
208 | 267 | api_instance = client.CustomObjectsApi(get_api_client())
|
209 | 268 | if self.config.appwrapper:
|
210 | 269 | api_instance.delete_namespaced_custom_object(
|
@@ -507,6 +566,16 @@ def _component_resources_up(
|
507 | 566 | else:
|
508 | 567 | _create_resources(self.resource_yaml, namespace, api_instance)
|
509 | 568 |
|
| 569 | + def _component_resources_apply( |
| 570 | + self, namespace: str, api_instance: client.CustomObjectsApi |
| 571 | + ): |
| 572 | + if self.config.write_to_file: |
| 573 | + with open(self.resource_yaml) as f: |
| 574 | + ray_cluster = yaml.safe_load(f) |
| 575 | + _apply_resources(ray_cluster, namespace, api_instance) |
| 576 | + else: |
| 577 | + _apply_resources(self.resource_yaml, namespace, api_instance) |
| 578 | + |
510 | 579 | def _component_resources_down(
|
511 | 580 | self, namespace: str, api_instance: client.CustomObjectsApi
|
512 | 581 | ):
|
@@ -744,6 +813,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA
|
744 | 813 | )
|
745 | 814 |
|
746 | 815 |
|
| 816 | +def _apply_resources( |
| 817 | + yamls, namespace: str, api_instance: client.CustomObjectsApi, force=False |
| 818 | +): |
| 819 | + api_instance.server_side_apply( |
| 820 | + field_manager="cluster-manager", |
| 821 | + group="ray.io", |
| 822 | + version="v1", |
| 823 | + namespace=namespace, |
| 824 | + plural="rayclusters", |
| 825 | + body=yamls, |
| 826 | + force_conflicts=force, # Allow forcing conflicts if needed |
| 827 | + ) |
| 828 | + |
| 829 | + |
747 | 830 | def _check_aw_exists(name: str, namespace: str) -> bool:
|
748 | 831 | try:
|
749 | 832 | config_check()
|
|
0 commit comments