-
Notifications
You must be signed in to change notification settings - Fork 56
feat(RHOAIENG-26480): Run RayJobs against existing RayClusters #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ name: Coverage Badge | |
|
||
on: | ||
push: | ||
branches: [ main ] | ||
branches: [ main, ray-jobs-feature ] | ||
|
||
jobs: | ||
report: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ on: | |
branches: | ||
- main | ||
- 'release-*' | ||
- ray-jobs-feature | ||
paths-ignore: | ||
- 'docs/**' | ||
- '**.adoc' | ||
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
AWManager, | ||
AppWrapperStatus, | ||
RayJobClient, | ||
RayJob, | ||
) | ||
|
||
from .common.widgets import view_clusters | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .rayjob import RayJob |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
""" | ||
RayJob client for submitting and managing Ray jobs using the odh-kuberay-client. | ||
""" | ||
|
||
import logging | ||
from typing import Dict, Any, Optional | ||
from odh_kuberay_client.kuberay_job_api import RayjobApi | ||
|
||
# Set up logging | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class RayJob: | ||
""" | ||
A client for managing Ray jobs using the KubeRay operator. | ||
This class provides a simplified interface for submitting and managing | ||
Ray jobs in a Kubernetes cluster with the KubeRay operator installed. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
job_name: str, | ||
cluster_name: str, | ||
namespace: str = "default", | ||
entrypoint: str = "None", | ||
runtime_env: Optional[Dict[str, Any]] = None, | ||
): | ||
""" | ||
Initialize a RayJob instance. | ||
Args: | ||
name: The name for the Ray job | ||
namespace: The Kubernetes namespace to submit the job to (default: "default") | ||
cluster_name: The name of the Ray cluster to submit the job to | ||
**kwargs: Additional configuration options | ||
""" | ||
self.name = job_name | ||
self.namespace = namespace | ||
self.cluster_name = cluster_name | ||
self.entrypoint = entrypoint | ||
self.runtime_env = runtime_env | ||
|
||
# Initialize the KubeRay job API client | ||
self._api = RayjobApi() | ||
|
||
logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}") | ||
|
||
def submit( | ||
self, | ||
) -> str: | ||
""" | ||
Submit the Ray job to the Kubernetes cluster. | ||
Args: | ||
entrypoint: The Python script or command to run | ||
runtime_env: Ray runtime environment configuration (optional) | ||
Returns: | ||
The job ID/name if submission was successful | ||
Raises: | ||
RuntimeError: If the job has already been submitted or submission fails | ||
""" | ||
# Build the RayJob custom resource | ||
rayjob_cr = self._build_rayjob_cr( | ||
entrypoint=self.entrypoint, | ||
runtime_env=self.runtime_env, | ||
) | ||
|
||
# Submit the job | ||
logger.info( | ||
f"Submitting RayJob {self.name} to RayCluster {self.cluster_name} in namespace {self.namespace}" | ||
) | ||
result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr) | ||
|
||
if result: | ||
logger.info(f"Successfully submitted RayJob {self.name}") | ||
return self.name | ||
else: | ||
raise RuntimeError(f"Failed to submit RayJob {self.name}") | ||
|
||
def _build_rayjob_cr( | ||
self, | ||
entrypoint: str, | ||
runtime_env: Optional[Dict[str, Any]] = None, | ||
) -> Dict[str, Any]: | ||
""" | ||
Build the RayJob custom resource specification. | ||
This creates a minimal RayJob CR that can be extended later. | ||
""" | ||
# Basic RayJob custom resource structure | ||
rayjob_cr = { | ||
"apiVersion": "ray.io/v1", | ||
"kind": "RayJob", | ||
"metadata": { | ||
"name": self.name, | ||
"namespace": self.namespace, | ||
}, | ||
"spec": { | ||
"entrypoint": entrypoint, | ||
"clusterSelector": {"ray.io/cluster": self.cluster_name}, | ||
}, | ||
} | ||
|
||
# Add runtime environment if specified | ||
if runtime_env: | ||
rayjob_cr["spec"]["runtimeEnvYAML"] = str(runtime_env) | ||
|
||
return rayjob_cr |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
# Copyright 2024 IBM, Red Hat | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import pytest | ||
from unittest.mock import MagicMock | ||
from codeflare_sdk.ray.rayjobs.rayjob import RayJob | ||
|
||
|
||
def test_rayjob_submit_success(mocker): | ||
"""Test successful RayJob submission.""" | ||
# Mock kubernetes config loading | ||
mocker.patch("kubernetes.config.load_kube_config") | ||
|
||
# Mock the RayjobApi class entirely | ||
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") | ||
mock_api_instance = MagicMock() | ||
mock_api_class.return_value = mock_api_instance | ||
|
||
# Configure the mock to return success when submit is called | ||
mock_api_instance.submit.return_value = {"metadata": {"name": "test-rayjob"}} | ||
|
||
# Create RayJob instance | ||
rayjob = RayJob( | ||
job_name="test-rayjob", | ||
cluster_name="test-ray-cluster", | ||
namespace="test-namespace", | ||
entrypoint="python -c 'print(\"hello world\")'", | ||
runtime_env={"pip": ["requests"]}, | ||
) | ||
|
||
# Submit the job | ||
job_id = rayjob.submit() | ||
|
||
# Assertions | ||
assert job_id == "test-rayjob" | ||
|
||
# Verify the API was called with correct parameters | ||
mock_api_instance.submit_job.assert_called_once() | ||
call_args = mock_api_instance.submit_job.call_args | ||
|
||
# Check the namespace parameter | ||
assert call_args.kwargs["k8s_namespace"] == "test-namespace" | ||
|
||
# Check the job custom resource | ||
job_cr = call_args.kwargs["job"] | ||
assert job_cr["metadata"]["name"] == "test-rayjob" | ||
assert job_cr["metadata"]["namespace"] == "test-namespace" | ||
assert job_cr["spec"]["entrypoint"] == "python -c 'print(\"hello world\")'" | ||
assert job_cr["spec"]["clusterSelector"]["ray.io/cluster"] == "test-ray-cluster" | ||
assert job_cr["spec"]["runtimeEnvYAML"] == "{'pip': ['requests']}" | ||
|
||
|
||
def test_rayjob_submit_failure(mocker): | ||
"""Test RayJob submission failure.""" | ||
# Mock kubernetes config loading | ||
mocker.patch("kubernetes.config.load_kube_config") | ||
|
||
# Mock the RayjobApi class entirely | ||
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") | ||
mock_api_instance = MagicMock() | ||
mock_api_class.return_value = mock_api_instance | ||
|
||
# Configure the mock to return failure (False/None) when submit_job is called | ||
mock_api_instance.submit_job.return_value = None | ||
|
||
# Create a RayJob instance | ||
rayjob = RayJob( | ||
job_name="test-rayjob", | ||
cluster_name="test-ray-cluster", | ||
namespace="default", | ||
entrypoint="python script.py", | ||
runtime_env={"pip": ["numpy"]}, | ||
) | ||
|
||
# Test that RuntimeError is raised on failure | ||
with pytest.raises(RuntimeError, match="Failed to submit RayJob test-rayjob"): | ||
rayjob.submit() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ spec: | |
controller-tools.k8s.io: '1.0' | ||
key1: value1 | ||
key2: value2 | ||
ray.io/cluster: aw-all-params | ||
name: aw-all-params | ||
namespace: ns | ||
spec: | ||
|
@@ -38,6 +39,7 @@ spec: | |
rayStartParams: | ||
block: 'true' | ||
dashboard-host: 0.0.0.0 | ||
dashboard-port: '8265' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this one is being removed elsewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's added in the new RayJob logic so it has to be here for the unit test assertions I think! |
||
num-gpus: '1' | ||
resources: '"{\"TPU\": 2}"' | ||
serviceType: ClusterIP | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this log probably needs an update or to be moved. the RayJob doesn't get initialized it get's submitted/applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I just have that for initial clarity as we've initialised the object but not submitted it. I can probably remove the logger stuff as is really for now until we decide on an output format?