Skip to content

Commit 11dd0cb

Browse files
committed
Adds RayCluster.apply()
- Adds RayCluster.apply() implementation - Adds e2e tests for apply - Adds unit tests for apply
1 parent be9763a commit 11dd0cb

File tree

6 files changed

+330
-10
lines changed

6 files changed

+330
-10
lines changed

CONTRIBUTING.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk
7676

7777
### Local e2e Testing
7878

79-
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md)
79+
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst)
8080

8181
#### Code Coverage
8282

src/codeflare_sdk/common/utils/unit_test_support.py

+58-3
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727

2828

2929
def createClusterConfig():
30+
config = createClusterConfigWithNumWorkers()
31+
return config
32+
33+
34+
def createClusterConfigWithNumWorkers(num_workers=2):
3035
config = ClusterConfiguration(
3136
name="unit-test-cluster",
3237
namespace="ns",
33-
num_workers=2,
38+
num_workers=num_workers,
3439
worker_cpu_requests=3,
3540
worker_cpu_limits=4,
3641
worker_memory_requests=5,
@@ -41,13 +46,21 @@ def createClusterConfig():
4146
return config
4247

4348

44-
def createClusterWithConfig(mocker):
49+
def createClusterWithConfigAndNumWorkers(mocker, num_workers=2, dynamic_client=None):
4550
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
4651
mocker.patch(
4752
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
4853
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
4954
)
50-
cluster = Cluster(createClusterConfig())
55+
cluster = Cluster(createClusterConfigWithNumWorkers(num_workers))
56+
mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client)
57+
mocker.patch.object(cluster, "down", return_value=None)
58+
mocker.patch.object(cluster, "config_check", return_value=None)
59+
return cluster
60+
61+
62+
def createClusterWithConfig(mock_config):
63+
cluster = createClusterWithConfigAndNumWorkers(mock_config)
5164
return cluster
5265

5366

@@ -383,6 +396,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N
383396
return mock_ingress
384397

385398

399+
# Global dictionary to maintain state in the mock
400+
cluster_state = {}
401+
402+
403+
# The mock side_effect function for server_side_apply
404+
def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs):
405+
# Simulate the behavior of server_side_apply:
406+
# Update a mock state that represents the cluster's current configuration.
407+
# Stores the state in a global dictionary for simplicity.
408+
409+
global cluster_state
410+
411+
if not resource or not body or not name or not namespace:
412+
raise ValueError("Missing required parameters for server_side_apply")
413+
414+
# Extract worker count from the body if it exists
415+
try:
416+
worker_count = (
417+
body["spec"]["workerGroupSpecs"][0]["replicas"]
418+
if "spec" in body and "workerGroupSpecs" in body["spec"]
419+
else None
420+
)
421+
except KeyError:
422+
worker_count = None
423+
424+
# Apply changes to the cluster_state mock
425+
cluster_state[name] = {
426+
"namespace": namespace,
427+
"worker_count": worker_count,
428+
"body": body,
429+
}
430+
431+
# Return a response that mimics the behavior of a successful apply
432+
return {
433+
"status": "success",
434+
"applied": True,
435+
"name": name,
436+
"namespace": namespace,
437+
"worker_count": worker_count,
438+
}
439+
440+
386441
@patch.dict("os.environ", {"NB_PREFIX": "test-prefix"})
387442
def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster:
388443
mocker.patch(

src/codeflare_sdk/ray/cluster/cluster.py

+57-3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
import requests
5353

5454
from kubernetes import config
55+
from kubernetes.dynamic import DynamicClient
56+
from kubernetes import client as k8s_client
57+
from kubernetes.client.rest import ApiException
58+
5559
from kubernetes.client.rest import ApiException
5660
import warnings
5761

@@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
8488
if is_notebook():
8589
cluster_up_down_buttons(self)
8690

91+
def get_dynamic_client(self):
92+
"""Return a dynamic client, optionally mocked in tests."""
93+
return DynamicClient(get_api_client())
94+
95+
def config_check(self):
96+
"""Return a dynamic client, optionally mocked in tests."""
97+
return config_check()
98+
8799
@property
88100
def _client_headers(self):
89101
k8_client = get_api_client()
@@ -139,10 +151,8 @@ def up(self):
139151
Applies the Cluster yaml, pushing the resource request onto
140152
the Kueue localqueue.
141153
"""
142-
143154
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
144155
self._throw_for_no_raycluster()
145-
146156
namespace = self.config.namespace
147157

148158
try:
@@ -176,6 +186,50 @@ def up(self):
176186
except Exception as e: # pragma: no cover
177187
return _kube_api_error_handling(e)
178188

189+
def apply(self, force=False):
190+
"""
191+
Applies the Cluster yaml using server-side apply.
192+
If 'force' is set to True, conflicts will be forced.
193+
"""
194+
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
195+
self._throw_for_no_raycluster()
196+
namespace = self.config.namespace
197+
# Ensure Kubernetes configuration is loaded
198+
try:
199+
self.config_check()
200+
# Get the RayCluster custom resource definition
201+
api = self.get_dynamic_client().resources.get(
202+
api_version="ray.io/v1", kind="RayCluster"
203+
)
204+
except Exception as e:
205+
raise RuntimeError("Failed to get RayCluster resource: " + str(e))
206+
207+
# Read the YAML file and parse it into a dictionary
208+
try:
209+
with open(self.resource_yaml, "r") as f:
210+
resource_body = yaml.safe_load(f)
211+
except FileNotFoundError:
212+
raise RuntimeError(f"Resource YAML file '{self.resource_yaml}' not found.")
213+
except yaml.YAMLError as e:
214+
raise ValueError(f"Failed to parse resource YAML: {e}")
215+
216+
# Extract the resource name from the metadata
217+
resource_name = resource_body.get("metadata", {}).get("name")
218+
if not resource_name:
219+
raise ValueError("The resource must have a 'metadata.name' field.")
220+
try:
221+
# Use server-side apply
222+
resp = api.server_side_apply(
223+
body=resource_body,
224+
name=resource_name,
225+
namespace=self.config.namespace,
226+
field_manager="cluster-manager",
227+
force_conflicts=force, # Allow forcing conflicts if needed
228+
)
229+
print(f"Cluster '{self.config.name}' applied successfully.")
230+
except ApiException as e:
231+
raise RuntimeError(f"Failed to apply cluster: {e.reason}")
232+
179233
def _throw_for_no_raycluster(self):
180234
api_instance = client.CustomObjectsApi(get_api_client())
181235
try:
@@ -204,7 +258,7 @@ def down(self):
204258
resource_name = self.config.name
205259
self._throw_for_no_raycluster()
206260
try:
207-
config_check()
261+
self.config_check()
208262
api_instance = client.CustomObjectsApi(get_api_client())
209263
if self.config.appwrapper:
210264
api_instance.delete_namespaced_custom_object(

src/codeflare_sdk/ray/cluster/test_cluster.py

+42-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from codeflare_sdk.common.utils.unit_test_support import (
2222
createClusterWithConfig,
23+
createClusterWithConfigAndNumWorkers,
2324
arg_check_del_effect,
2425
ingress_retrieval,
2526
arg_check_apply_effect,
@@ -29,6 +30,7 @@
2930
get_obj_none,
3031
get_ray_obj_with_status,
3132
get_aw_obj_with_status,
33+
mock_server_side_apply,
3234
)
3335
from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster
3436
from pathlib import Path
@@ -67,11 +69,48 @@ def test_cluster_up_down(mocker):
6769
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
6870
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
6971
)
70-
cluster = cluster = createClusterWithConfig(mocker)
72+
cluster = createClusterWithConfig(mocker)
7173
cluster.up()
7274
cluster.down()
7375

7476

77+
def test_cluster_apply_scale_up_scale_down(mocker):
78+
mock_dynamic_client = mocker.Mock()
79+
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
80+
mocker.patch(
81+
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
82+
)
83+
mocker.patch(
84+
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
85+
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
86+
)
87+
88+
# Initialize test
89+
initial_num_workers = 1
90+
scaled_up_num_workers = 2
91+
92+
# Step 1: Create cluster with initial workers
93+
cluster = createClusterWithConfigAndNumWorkers(
94+
mocker, initial_num_workers, dynamic_client=mock_dynamic_client
95+
)
96+
cluster.apply()
97+
98+
# Step 2: Scale up the cluster
99+
cluster = createClusterWithConfigAndNumWorkers(
100+
mocker, scaled_up_num_workers, dynamic_client=mock_dynamic_client
101+
)
102+
cluster.apply()
103+
104+
# Step 3: Scale down the cluster
105+
cluster = createClusterWithConfigAndNumWorkers(
106+
mocker, initial_num_workers, dynamic_client=mock_dynamic_client
107+
)
108+
cluster.apply()
109+
110+
# Tear down
111+
cluster.down()
112+
113+
75114
def test_cluster_up_down_no_mcad(mocker):
76115
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
77116
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
@@ -117,7 +156,7 @@ def test_cluster_uris(mocker):
117156
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
118157
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
119158
)
120-
cluster = cluster = createClusterWithConfig(mocker)
159+
cluster = createClusterWithConfig(mocker)
121160
mocker.patch(
122161
"kubernetes.client.NetworkingV1Api.list_namespaced_ingress",
123162
return_value=ingress_retrieval(
@@ -159,7 +198,7 @@ def ray_addr(self, *args):
159198
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
160199
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
161200
)
162-
cluster = cluster = createClusterWithConfig(mocker)
201+
cluster = createClusterWithConfig(mocker)
163202
mocker.patch(
164203
"ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url",
165204
return_value="None",

0 commit comments

Comments
 (0)