From fdfb6c76373e54a92eaddfe0bcf324a42af59bdd Mon Sep 17 00:00:00 2001 From: David Xia Date: Fri, 29 Mar 2024 13:36:31 -0400 Subject: [PATCH] feat: record last state transition times ## Problem Statement My ML platform team runs the kuberay ray-operator. We want to measure the time it takes for RayCluster's to transition from their initial "unhealthy" state to some other state. This metric is important for us because our users want their RayClusters to start in a timely manner. It seems like neither the ray-operator nor RayClusters provide this info currently. ## Design Add a new `.status.stateTransitionTimes` field to the `RayCluster` custom resource. This field is a `map[ClusterState]*metav1.Time` that indicates the time of the last state transition for each state. This field is updated whenever the `.status.state` changes. * [original discussion doc](https://docs.google.com/document/d/14yPSZ9iLk7a0qEg14rNWr60Btz0HEeQ3oWKP-GN9QTM) * [related Slack thread](https://ray-distributed.slack.com/archives/C01CKH05XBN/p1709321264762029) * [example input and output RayClusters](https://gist.github.com/davidxia/205d2b23202356a2d3172c51e0912f35) --- .../crds/ray.io_rayclusters.yaml | 5 ++ .../kuberay-operator/crds/ray.io_rayjobs.yaml | 5 ++ .../crds/ray.io_rayservices.yaml | 10 ++++ ray-operator/apis/ray/v1/raycluster_types.go | 2 + .../apis/ray/v1/zz_generated.deepcopy.go | 8 +++ .../config/crd/bases/ray.io_rayclusters.yaml | 5 ++ .../config/crd/bases/ray.io_rayjobs.yaml | 5 ++ .../config/crd/bases/ray.io_rayservices.yaml | 10 ++++ .../controllers/ray/raycluster_controller.go | 7 +++ .../ray/raycluster_controller_test.go | 9 ++++ .../ray/raycluster_controller_unit_test.go | 52 +++++++++++++++++++ .../controllers/ray/suite_helpers_test.go | 10 ++++ .../ray/v1/rayclusterstatus.go | 43 ++++++++++----- 13 files changed, 157 insertions(+), 14 deletions(-) diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 8e37ee7b6e4..ca1269e8fec 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -7122,6 +7122,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object served: true diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index f4703dd49d0..23c2d172955 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -10430,6 +10430,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object reason: type: string diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 260c597697f..d96cb93584a 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -7324,6 +7324,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object lastUpdateTime: @@ -7425,6 +7430,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object serviceStatus: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index e66f95375f9..033eea4642a 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -140,6 +140,8 @@ type RayClusterStatus struct { // LastUpdateTime indicates last update timestamp for this cluster status. // +nullable LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` + // StateTransitionTimes indicates the time of the last state transition for each state. + StateTransitionTimes map[ClusterState]*metav1.Time `json:"stateTransitionTimes,omitempty"` // Service Endpoints Endpoints map[string]string `json:"endpoints,omitempty"` // Head info diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index 75cd1cfeb5c..5cb947a4e0f 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -264,6 +265,13 @@ func (in *RayClusterStatus) DeepCopyInto(out *RayClusterStatus) { in, out := &in.LastUpdateTime, &out.LastUpdateTime *out = (*in).DeepCopy() } + if in.StateTransitionTimes != nil { + in, out := &in.StateTransitionTimes, &out.StateTransitionTimes + *out = make(map[ClusterState]*metav1.Time, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } if in.Endpoints != nil { in, out := &in.Endpoints, &out.Endpoints *out = make(map[string]string, len(*in)) diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 8e37ee7b6e4..ca1269e8fec 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -7122,6 +7122,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object served: true diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index f4703dd49d0..23c2d172955 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -10430,6 +10430,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object reason: type: string diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 260c597697f..d96cb93584a 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -7324,6 +7324,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object lastUpdateTime: @@ -7425,6 +7430,11 @@ spec: type: string state: type: string + stateTransitionTimes: + additionalProperties: + format: date-time + type: string + type: object type: object type: object serviceStatus: diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index ece82dbba00..d6a37f4b961 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1252,6 +1252,13 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra timeNow := metav1.Now() newInstance.Status.LastUpdateTime = &timeNow + if instance.Status.State != newInstance.Status.State { + if newInstance.Status.StateTransitionTimes == nil { + newInstance.Status.StateTransitionTimes = make(map[rayv1.ClusterState]*metav1.Time) + } + newInstance.Status.StateTransitionTimes[newInstance.Status.State] = &timeNow + } + return newInstance, nil } diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index 191b9e9fa1e..e88be9af2d6 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -470,6 +470,15 @@ var _ = Context("Inside the default namespace", func() { getClusterState(ctx, namespace, rayCluster.Name), time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) }) + + It("RayCluster's .status.stateTransitionTimes should include a time for ready state", func() { + Eventually( + func() *metav1.Time { + status := getClusterStatus(ctx, namespace, rayCluster.Name)() + return status.StateTransitionTimes[rayv1.Ready] + }, + time.Second*3, time.Millisecond*500).Should(Not(BeNil())) + }) }) Describe("RayCluster with a multi-host worker group", func() { diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 2b67435a5b9..e3f5af0a58e 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -1629,6 +1629,7 @@ func TestCalculateStatus(t *testing.T) { }, Status: corev1.PodStatus{ PodIP: headNodeIP, + Phase: corev1.PodRunning, }, } runtimeObjects := []runtime.Object{headPod, headService} @@ -1650,6 +1651,57 @@ func TestCalculateStatus(t *testing.T) { assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP) assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP) assert.Equal(t, headService.Name, newInstance.Status.Head.ServiceName) + assert.NotNil(t, newInstance.Status.StateTransitionTimes, "Cluster state transition timestamp should be created") + assert.Equal(t, newInstance.Status.LastUpdateTime, newInstance.Status.StateTransitionTimes[rayv1.Ready]) +} + +func TestStateTransitionTimes_NoStateChange(t *testing.T) { + setupTest(t) + + // Create a new scheme with CRDs, Pod, Service schemes. + newScheme := runtime.NewScheme() + _ = rayv1.AddToScheme(newScheme) + _ = corev1.AddToScheme(newScheme) + + // Mock data + headServiceIP := "aaa.bbb.ccc.ddd" + headService, err := common.BuildServiceForHeadPod(context.Background(), *testRayCluster, nil, nil) + assert.Nil(t, err, "Failed to build head service.") + headService.Spec.ClusterIP = headServiceIP + // headService.Spec.cont + headPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "headNode", + Namespace: namespaceStr, + Labels: map[string]string{ + utils.RayClusterLabelKey: instanceName, + utils.RayNodeTypeLabelKey: string(rayv1.HeadNode), + }, + }, + Status: corev1.PodStatus{ + PodIP: headNodeIP, + Phase: corev1.PodRunning, + }, + } + runtimeObjects := []runtime.Object{headPod, headService} + + // Initialize a fake client with newScheme and runtimeObjects. + fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build() + ctx := context.Background() + + // Initialize a RayCluster reconciler. + r := &RayClusterReconciler{ + Client: fakeClient, + Recorder: &record.FakeRecorder{}, + Scheme: scheme.Scheme, + } + + preUpdateTime := metav1.Now() + testRayCluster.Status.State = rayv1.Ready + testRayCluster.Status.StateTransitionTimes = map[rayv1.ClusterState]*metav1.Time{rayv1.Ready: &preUpdateTime} + newInstance, err := r.calculateStatus(ctx, testRayCluster) + assert.Nil(t, err) + assert.Equal(t, preUpdateTime, *newInstance.Status.StateTransitionTimes[rayv1.Ready], "Cluster state transition timestamp should not be updated") } func Test_TerminatedWorkers_NoAutoscaler(t *testing.T) { diff --git a/ray-operator/controllers/ray/suite_helpers_test.go b/ray-operator/controllers/ray/suite_helpers_test.go index f20ba90b3ed..025a7cf3112 100644 --- a/ray-operator/controllers/ray/suite_helpers_test.go +++ b/ray-operator/controllers/ray/suite_helpers_test.go @@ -49,6 +49,16 @@ func getClusterState(ctx context.Context, namespace string, clusterName string) } } +func getClusterStatus(ctx context.Context, namespace string, clusterName string) func() rayv1.RayClusterStatus { + return func() rayv1.RayClusterStatus { + var cluster rayv1.RayCluster + if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, &cluster); err != nil { + log.Fatal(err) + } + return cluster.Status + } +} + func isAllPodsRunningByFilters(ctx context.Context, podlist corev1.PodList, opt ...client.ListOption) bool { err := k8sClient.List(ctx, &podlist, opt...) gomega.Expect(err).ShouldNot(gomega.HaveOccurred(), "failed to list Pods") diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterstatus.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterstatus.go index 101e2ae84c7..e22ce2075c6 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterstatus.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterstatus.go @@ -11,20 +11,21 @@ import ( // RayClusterStatusApplyConfiguration represents an declarative configuration of the RayClusterStatus type for use // with apply. type RayClusterStatusApplyConfiguration struct { - State *v1.ClusterState `json:"state,omitempty"` - AvailableWorkerReplicas *int32 `json:"availableWorkerReplicas,omitempty"` - DesiredWorkerReplicas *int32 `json:"desiredWorkerReplicas,omitempty"` - MinWorkerReplicas *int32 `json:"minWorkerReplicas,omitempty"` - MaxWorkerReplicas *int32 `json:"maxWorkerReplicas,omitempty"` - DesiredCPU *resource.Quantity `json:"desiredCPU,omitempty"` - DesiredMemory *resource.Quantity `json:"desiredMemory,omitempty"` - DesiredGPU *resource.Quantity `json:"desiredGPU,omitempty"` - DesiredTPU *resource.Quantity `json:"desiredTPU,omitempty"` - LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` - Endpoints map[string]string `json:"endpoints,omitempty"` - Head *HeadInfoApplyConfiguration `json:"head,omitempty"` - Reason *string `json:"reason,omitempty"` - ObservedGeneration *int64 `json:"observedGeneration,omitempty"` + State *v1.ClusterState `json:"state,omitempty"` + AvailableWorkerReplicas *int32 `json:"availableWorkerReplicas,omitempty"` + DesiredWorkerReplicas *int32 `json:"desiredWorkerReplicas,omitempty"` + MinWorkerReplicas *int32 `json:"minWorkerReplicas,omitempty"` + MaxWorkerReplicas *int32 `json:"maxWorkerReplicas,omitempty"` + DesiredCPU *resource.Quantity `json:"desiredCPU,omitempty"` + DesiredMemory *resource.Quantity `json:"desiredMemory,omitempty"` + DesiredGPU *resource.Quantity `json:"desiredGPU,omitempty"` + DesiredTPU *resource.Quantity `json:"desiredTPU,omitempty"` + LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"` + StateTransitionTimes map[v1.ClusterState]*metav1.Time `json:"stateTransitionTimes,omitempty"` + Endpoints map[string]string `json:"endpoints,omitempty"` + Head *HeadInfoApplyConfiguration `json:"head,omitempty"` + Reason *string `json:"reason,omitempty"` + ObservedGeneration *int64 `json:"observedGeneration,omitempty"` } // RayClusterStatusApplyConfiguration constructs an declarative configuration of the RayClusterStatus type for use with @@ -113,6 +114,20 @@ func (b *RayClusterStatusApplyConfiguration) WithLastUpdateTime(value metav1.Tim return b } +// WithStateTransitionTimes puts the entries into the StateTransitionTimes field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the StateTransitionTimes field, +// overwriting an existing map entries in StateTransitionTimes field with the same key. +func (b *RayClusterStatusApplyConfiguration) WithStateTransitionTimes(entries map[v1.ClusterState]*metav1.Time) *RayClusterStatusApplyConfiguration { + if b.StateTransitionTimes == nil && len(entries) > 0 { + b.StateTransitionTimes = make(map[v1.ClusterState]*metav1.Time, len(entries)) + } + for k, v := range entries { + b.StateTransitionTimes[k] = v + } + return b +} + // WithEndpoints puts the entries into the Endpoints field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, the entries provided by each call will be put on the Endpoints field,