Skip to content

e2e test for heterogenous cluster #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ jobs:

- name: Setup and start KinD cluster
uses: ./common/github-actions/kind
with:
worker-nodes: 1

- name: Install NVidia GPU operator for KinD
uses: ./common/github-actions/nvidia-gpu-operator
Expand Down Expand Up @@ -102,6 +104,8 @@ jobs:
kubectl create clusterrolebinding sdk-user-localqueue-creator --clusterrole=localqueue-creator --user=sdk-user
kubectl create clusterrole list-secrets --verb=get,list --resource=secrets
kubectl create clusterrolebinding sdk-user-list-secrets --clusterrole=list-secrets --user=sdk-user
kubectl create clusterrole pod-creator --verb=get,list --resource=pods
kubectl create clusterrolebinding sdk-user-pod-creator --clusterrole=pod-creator --user=sdk-user
kubectl config use-context sdk-user

- name: Run e2e tests
Expand Down
74 changes: 74 additions & 0 deletions tests/e2e/heterogeneous_clusters_kind_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from time import sleep
import time
from codeflare_sdk import (
Cluster,
ClusterConfiguration,
)

from codeflare_sdk.common.kueue.kueue import list_local_queues

import pytest

from support import *


@pytest.mark.skip(reason="Skipping heterogenous cluster kind test")
@pytest.mark.kind
class TestHeterogeneousClustersKind:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

@pytest.mark.nvidia_gpu
def test_heterogeneous_clusters(self):
create_namespace(self)
create_kueue_resources(self, 2)
self.run_heterogeneous_clusters()

def run_heterogeneous_clusters(
self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
for flavor in self.resource_flavors:
node_labels = (
get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
)
expected_nodes = get_nodes_by_label(self, node_labels)

print(f"Expected nodes: {expected_nodes}")
cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
queue_name = queues[0]["name"] if queues else None
print(f"Using flavor: {flavor}, Queue: {queue_name}")
cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
head_memory_requests=2,
head_memory_limits=2,
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
worker_extended_resource_requests={
gpu_resource_name: number_of_gpus
},
write_to_file=True,
verify_tls=False,
local_queue=queue_name,
)
)
cluster.up()
sleep(5)
node_name = get_pod_node(self, self.namespace, cluster_name)
print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
sleep(5)
assert (
node_name in expected_nodes
), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
cluster.down()
77 changes: 77 additions & 0 deletions tests/e2e/heterogeneous_clusters_oauth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from time import sleep
import time
from codeflare_sdk import (
Cluster,
ClusterConfiguration,
TokenAuthentication,
)

from codeflare_sdk.common.kueue.kueue import list_local_queues

import pytest

from support import *


@pytest.mark.openshift
class TestHeterogeneousClustersOauth:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)
delete_kueue_resources(self)

def test_heterogeneous_clusters(self):
create_namespace(self)
create_kueue_resources(self, 2)
self.run_heterogeneous_clusters()

def run_heterogeneous_clusters(
self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
ray_image = get_ray_image()

auth = TokenAuthentication(
token=run_oc_command(["whoami", "--show-token=true"]),
server=run_oc_command(["whoami", "--show-server=true"]),
skip_tls=True,
)
auth.login()

for flavor in self.resource_flavors:
node_labels = (
get_flavor_spec(self, flavor).get("spec", {}).get("nodeLabels", {})
)
expected_nodes = get_nodes_by_label(self, node_labels)

print(f"Expected nodes: {expected_nodes}")
cluster_name = f"test-ray-cluster-li-{flavor[-5:]}"
queues = list_local_queues(namespace=self.namespace, flavors=[flavor])
queue_name = queues[0]["name"] if queues else None
print(f"Using flavor: {flavor}, Queue: {queue_name}")
cluster = Cluster(
ClusterConfiguration(
namespace=self.namespace,
name=cluster_name,
num_workers=1,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
image=ray_image,
verify_tls=False,
local_queue=queue_name,
)
)
cluster.up()
sleep(5)
node_name = get_pod_node(self, self.namespace, cluster_name)
print(f"Cluster {cluster_name}-{flavor} is running on node: {node_name}")
sleep(5)
assert (
node_name in expected_nodes
), f"Node {node_name} is not in the expected nodes for flavor {flavor}."
cluster.down()
143 changes: 110 additions & 33 deletions tests/e2e/support.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import random
import string
Expand Down Expand Up @@ -65,19 +66,30 @@ def create_namespace(self):
return RuntimeError(e)


def create_new_resource_flavor(self):
self.resource_flavor = f"test-resource-flavor-{random_choice()}"
create_resource_flavor(self, self.resource_flavor)
def create_new_resource_flavor(self, num_flavors):
self.resource_flavors = []
for i in range(num_flavors):
default = i < 1
resource_flavor = f"test-resource-flavor-{random_choice()}"
create_resource_flavor(self, resource_flavor, default)
self.resource_flavors.append(resource_flavor)


def create_new_cluster_queue(self):
self.cluster_queue = f"test-cluster-queue-{random_choice()}"
create_cluster_queue(self, self.cluster_queue, self.resource_flavor)
def create_new_cluster_queue(self, num_queues):
self.cluster_queues = []
for i in range(num_queues):
cluster_queue_name = f"test-cluster-queue-{random_choice()}"
create_cluster_queue(self, cluster_queue_name, self.resource_flavors[i])
self.cluster_queues.append(cluster_queue_name)


def create_new_local_queue(self):
self.local_queue = f"test-local-queue-{random_choice()}"
create_local_queue(self, self.cluster_queue, self.local_queue)
def create_new_local_queue(self, num_queues):
self.local_queues = []
for i in range(num_queues):
is_default = i == 0
local_queue_name = f"test-local-queue-{random_choice()}"
create_local_queue(self, self.cluster_queues[i], local_queue_name, is_default)
self.local_queues.append(local_queue_name)


def create_namespace_with_name(self, namespace_name):
Expand Down Expand Up @@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
{"name": "memory", "nominalQuota": "36Gi"},
{"name": "nvidia.com/gpu", "nominalQuota": 1},
],
}
},
],
}
],
Expand Down Expand Up @@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor):
self.cluster_queue = cluster_queue


def create_resource_flavor(self, flavor):
def create_resource_flavor(self, flavor, default=True):
worker_label, worker_value = os.getenv("WORKER_LABEL", "worker-1=true").split("=")
control_label, control_value = os.getenv(
"CONTROL_LABEL", "ingress-ready=true"
).split("=")
toleration_key = os.getenv(
"TOLERATION_KEY", "node-role.kubernetes.io/control-plane"
)

node_labels = (
{worker_label: worker_value} if default else {control_label: control_value}
)

resource_flavor_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "ResourceFlavor",
"metadata": {"name": flavor},
"spec": {
"nodeLabels": node_labels,
"tolerations": [
{
"key": toleration_key,
"operator": "Exists",
"effect": "NoSchedule",
}
],
},
}

try:
Expand All @@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor):
self.resource_flavor = flavor


def create_local_queue(self, cluster_queue, local_queue):
def create_local_queue(self, cluster_queue, local_queue, is_default=True):
local_queue_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "LocalQueue",
"metadata": {
"namespace": self.namespace,
"name": local_queue,
"annotations": {"kueue.x-k8s.io/default-queue": "true"},
"annotations": {"kueue.x-k8s.io/default-queue": str(is_default).lower()},
},
"spec": {"clusterQueue": cluster_queue},
}
Expand Down Expand Up @@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue):
self.local_queue = local_queue


def create_kueue_resources(self):
def create_kueue_resources(self, resource_ammount=1):
print("creating Kueue resources ...")
create_new_resource_flavor(self)
create_new_cluster_queue(self)
create_new_local_queue(self)
create_new_resource_flavor(self, resource_ammount)
create_new_cluster_queue(self, resource_ammount)
create_new_local_queue(self, resource_ammount)


def delete_kueue_resources(self):
# Delete if given cluster-queue exists
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="clusterqueues",
version="v1beta1",
name=self.cluster_queue,
)
print(f"\n'{self.cluster_queue}' cluster-queue deleted")
except Exception as e:
print(f"\nError deleting cluster-queue '{self.cluster_queue}' : {e}")
for cq in self.cluster_queues:
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="clusterqueues",
version="v1beta1",
name=cq,
)
print(f"\n'{cq}' cluster-queue deleted")
except Exception as e:
print(f"\nError deleting cluster-queue '{cq}' : {e}")

# Delete if given resource-flavor exists
for flavor in self.resource_flavors:
try:
self.custom_api.delete_cluster_custom_object(
group="kueue.x-k8s.io",
plural="resourceflavors",
version="v1beta1",
name=flavor,
)
print(f"'{flavor}' resource-flavor deleted")
except Exception as e:
print(f"\nError deleting resource-flavor '{flavor}': {e}")


def get_pod_node(self, namespace, name):
label_selector = f"ray.io/cluster={name}"
pods = self.api_instance.list_namespaced_pod(
namespace, label_selector=label_selector
)
if not pods.items:
raise ValueError(
f"Unable to retrieve node name for pod '{name}' in namespace '{namespace}'"
)
pod = pods.items[0]
node_name = pod.spec.node_name
if node_name is None:
raise ValueError(
f"No node selected for pod '{name}' in namespace '{namespace}'"
)
return node_name


def get_flavor_spec(self, flavor_name):
try:
self.custom_api.delete_cluster_custom_object(
flavor = self.custom_api.get_cluster_custom_object(
group="kueue.x-k8s.io",
plural="resourceflavors",
version="v1beta1",
name=self.resource_flavor,
plural="resourceflavors",
name=flavor_name,
)
print(f"'{self.resource_flavor}' resource-flavor deleted")
except Exception as e:
print(f"\nError deleting resource-flavor '{self.resource_flavor}' : {e}")
return flavor
except client.exceptions.ApiException as e:
if e.status == 404:
print(f"ResourceFlavor '{flavor_name}' not found.")
else:
print(f"Error retrieving ResourceFlavor '{flavor_name}': {e}")
raise


def get_nodes_by_label(self, node_labels):
label_selector = ",".join(f"{k}={v}" for k, v in node_labels.items())
nodes = self.api_instance.list_node(label_selector=label_selector)
return [node.metadata.name for node in nodes.items]