|
1 | 1 | import requests |
| 2 | +import subprocess |
2 | 3 |
|
3 | 4 | from time import sleep |
4 | 5 |
|
|
16 | 17 | class TestRayClusterSDKKind: |
17 | 18 | def setup_method(self): |
18 | 19 | initialize_kubernetes_client(self) |
| 20 | + self.port_forward_process = None |
| 21 | + |
| 22 | + def cleanup_port_forward(self): |
| 23 | + if self.port_forward_process: |
| 24 | + self.port_forward_process.terminate() |
| 25 | + self.port_forward_process.wait(timeout=10) |
| 26 | + self.port_forward_process = None |
19 | 27 |
|
20 | 28 | def teardown_method(self): |
| 29 | + self.cleanup_port_forward() |
21 | 30 | delete_namespace(self) |
22 | 31 | delete_kueue_resources(self) |
23 | 32 |
|
@@ -67,46 +76,74 @@ def run_mnist_raycluster_sdk_kind( |
67 | 76 |
|
68 | 77 | self.assert_jobsubmit_withoutlogin_kind(cluster, accelerator, number_of_gpus) |
69 | 78 |
|
70 | | - assert_get_cluster_and_jobsubmit( |
71 | | - self, "mnist", accelerator="gpu", number_of_gpus=1 |
72 | | - ) |
| 79 | + # Note: assert_get_cluster_and_jobsubmit uses cluster.job_client which requires |
| 80 | + # the dashboard URL (HTTPRoute/Route). Since this is not available on KinD, |
| 81 | + # we skip this call. Job submission is already tested above with port-forwarding. |
| 82 | + # This function is tested on OpenShift in mnist_raycluster_sdk_test.py |
| 83 | + |
| 84 | + cluster.down() |
73 | 85 |
|
74 | 86 | # Assertions |
75 | 87 |
|
76 | 88 | def assert_jobsubmit_withoutlogin_kind(self, cluster, accelerator, number_of_gpus): |
77 | | - ray_dashboard = cluster.cluster_dashboard_uri() |
| 89 | + # Use port-forwarding to access the dashboard since HTTPRoute/Route is not available on KinD |
| 90 | + local_port = "8265" |
| 91 | + dashboard_port = "8265" |
| 92 | + cluster_name = cluster.config.name |
| 93 | + |
| 94 | + port_forward_cmd = [ |
| 95 | + "kubectl", |
| 96 | + "port-forward", |
| 97 | + "-n", |
| 98 | + self.namespace, |
| 99 | + f"svc/{cluster_name}-head-svc", |
| 100 | + f"{local_port}:{dashboard_port}", |
| 101 | + ] |
| 102 | + self.port_forward_process = subprocess.Popen( |
| 103 | + port_forward_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL |
| 104 | + ) |
| 105 | + |
| 106 | + # Wait for port-forward to be ready |
| 107 | + sleep(5) |
| 108 | + |
| 109 | + ray_dashboard = f"http://localhost:{local_port}" |
78 | 110 | client = RayJobClient(address=ray_dashboard, verify=False) |
79 | 111 |
|
80 | | - submission_id = client.submit_job( |
81 | | - entrypoint="python mnist.py", |
82 | | - runtime_env={ |
83 | | - "working_dir": "./tests/e2e/", |
84 | | - "pip": "./tests/e2e/mnist_pip_requirements.txt", |
85 | | - "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), |
86 | | - }, |
87 | | - entrypoint_num_gpus=number_of_gpus, |
88 | | - ) |
89 | | - print(f"Submitted job with ID: {submission_id}") |
90 | | - done = False |
91 | | - time = 0 |
92 | | - timeout = 900 |
93 | | - while not done: |
94 | | - status = client.get_job_status(submission_id) |
95 | | - if status.is_terminal(): |
96 | | - break |
97 | | - if not done: |
98 | | - print(status) |
99 | | - if timeout and time >= timeout: |
100 | | - raise TimeoutError(f"job has timed out after waiting {timeout}s") |
101 | | - sleep(5) |
102 | | - time += 5 |
103 | | - |
104 | | - logs = client.get_job_logs(submission_id) |
105 | | - print(logs) |
106 | | - |
107 | | - self.assert_job_completion(status) |
108 | | - |
109 | | - client.delete_job(submission_id) |
| 112 | + try: |
| 113 | + submission_id = client.submit_job( |
| 114 | + entrypoint="python mnist.py", |
| 115 | + runtime_env={ |
| 116 | + "working_dir": "./tests/e2e/", |
| 117 | + "pip": "./tests/e2e/mnist_pip_requirements.txt", |
| 118 | + "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), |
| 119 | + }, |
| 120 | + entrypoint_num_gpus=number_of_gpus, |
| 121 | + ) |
| 122 | + print(f"Submitted job with ID: {submission_id}") |
| 123 | + done = False |
| 124 | + time = 0 |
| 125 | + timeout = 900 |
| 126 | + while not done: |
| 127 | + status = client.get_job_status(submission_id) |
| 128 | + if status.is_terminal(): |
| 129 | + break |
| 130 | + if not done: |
| 131 | + print(status) |
| 132 | + if timeout and time >= timeout: |
| 133 | + raise TimeoutError( |
| 134 | + f"job has timed out after waiting {timeout}s" |
| 135 | + ) |
| 136 | + sleep(5) |
| 137 | + time += 5 |
| 138 | + |
| 139 | + logs = client.get_job_logs(submission_id) |
| 140 | + print(logs) |
| 141 | + |
| 142 | + self.assert_job_completion(status) |
| 143 | + |
| 144 | + client.delete_job(submission_id) |
| 145 | + finally: |
| 146 | + self.cleanup_port_forward() |
110 | 147 |
|
111 | 148 | def assert_job_completion(self, status): |
112 | 149 | if status == "SUCCEEDED": |
|
0 commit comments