From 7d666e82c8b7223f2500e972177da9a36006bce1 Mon Sep 17 00:00:00 2001
From: catpineapple <1391869588@qq.com>
Date: Wed, 4 Sep 2024 17:55:47 +0800
Subject: [PATCH] add suspendReplicas for check resume cluster replicas
---
api/disaggregated/cluster/v1/types.go | 2 +
config/crd/bases/crds.yaml | 5 ++
....doris.com_dorisdisaggregatedclusters.yaml | 5 ++
doc/api.md | 2 +-
doc/disaggregated_cluster_api.md | 13 +++-
.../utils/disaggregated_ms/ms_http/http.go | 23 +++++-
.../disaggregated_ms/ms_http/metaservice.go | 6 +-
.../disaggregated_cluster_controller.go | 18 ++---
.../computeclusters/controller.go | 76 ++++++++++++-------
pkg/controller/sub_controller/events.go | 1 +
10 files changed, 106 insertions(+), 45 deletions(-)
diff --git a/api/disaggregated/cluster/v1/types.go b/api/disaggregated/cluster/v1/types.go
index 10627140..f96b6eb6 100644
--- a/api/disaggregated/cluster/v1/types.go
+++ b/api/disaggregated/cluster/v1/types.go
@@ -329,6 +329,8 @@ type ComputeClusterStatus struct {
AvailableStatus AvailableStatus `json:"availableStatus,omitempty"`
//ClusterId display the clusterId of compute cluster in meta.
ClusterId string `json:"clusterId,omitempty"`
+ //suspend replicas display the replicas of compute cluster before resume.
+ SuspendReplicas int32 `json:"suspendReplicas,omitempty"`
// replicas is the number of Pods created by the StatefulSet controller.
Replicas int32 `json:"replicas,omitempty"`
diff --git a/config/crd/bases/crds.yaml b/config/crd/bases/crds.yaml
index ed7419aa..9a0d5b0d 100644
--- a/config/crd/bases/crds.yaml
+++ b/config/crd/bases/crds.yaml
@@ -14081,6 +14081,11 @@ spec:
description: the statefulset of control this compute cluster
pods.
type: string
+ suspendReplicas:
+ description: suspend replicas display the replicas of compute
+ cluster before resume.
+ format: int32
+ type: integer
type: object
type: array
feStatus:
diff --git a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml
index c87fdc00..688f76f3 100644
--- a/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml
+++ b/config/crd/bases/disaggregated.cluster.doris.com_dorisdisaggregatedclusters.yaml
@@ -5635,6 +5635,11 @@ spec:
description: the statefulset of control this compute cluster
pods.
type: string
+ suspendReplicas:
+ description: suspend replicas display the replicas of compute
+ cluster before resume.
+ format: int32
+ type: integer
type: object
type: array
feStatus:
diff --git a/doc/api.md b/doc/api.md
index 835eeb1b..89e564bc 100644
--- a/doc/api.md
+++ b/doc/api.md
@@ -2593,5 +2593,5 @@ string
+
replicas
int32
@@ -1435,5 +1446,5 @@ string
Generated with gen-crd-api-reference-docs
-on git commit 1d205b5 .
+on git commit 711e5a4 .
diff --git a/pkg/common/utils/disaggregated_ms/ms_http/http.go b/pkg/common/utils/disaggregated_ms/ms_http/http.go
index 415283b5..dba150c1 100644
--- a/pkg/common/utils/disaggregated_ms/ms_http/http.go
+++ b/pkg/common/utils/disaggregated_ms/ms_http/http.go
@@ -25,6 +25,7 @@ import (
"io"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
+ "strings"
)
const (
@@ -205,12 +206,26 @@ func DropBENodes(endpoint, token, instanceID string, cluster Cluster) (*MSRespon
}
// suspend cluster
-func SuspendComputeCluster(endpoint, token, instanceID, clusterID string) (*MSResponse, error) {
- return SetClusterStatus(endpoint, token, instanceID, clusterID, "SUSPENDED")
+func SuspendComputeCluster(endpoint, token, instanceID, clusterID string) error {
+ response, err := SetClusterStatus(endpoint, token, instanceID, clusterID, "SUSPENDED")
+ if err != nil {
+ return fmt.Errorf("SuspendComputeCluster SetClusterStatus failed: %w", err)
+ }
+ if response.Code != SuccessCode && !strings.Contains(response.Msg, "original cluster is SUSPENDED") {
+ return fmt.Errorf("SuspendComputeCluster SetClusterStatus failed: %s", response.Msg)
+ }
+ return nil
}
-func ResumeComputeCluster(endpoint, token, instanceID, clusterID string) (*MSResponse, error) {
- return SetClusterStatus(endpoint, token, instanceID, clusterID, "NORMAL")
+func ResumeComputeCluster(endpoint, token, instanceID, clusterID string) error {
+ response, err := SetClusterStatus(endpoint, token, instanceID, clusterID, "NORMAL")
+ if err != nil {
+ return fmt.Errorf("ResumeComputeCluster SetClusterStatus failed: %w", err)
+ }
+ if response.Code != SuccessCode && !strings.Contains(response.Msg, "original cluster is NORMAL") {
+ return fmt.Errorf("ResumeComputeCluster SetClusterStatus failed: %s", response.Msg)
+ }
+ return nil
}
// SetClusterStatus resume cluster
diff --git a/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go b/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go
index 8bc57528..1e0584a3 100644
--- a/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go
+++ b/pkg/common/utils/disaggregated_ms/ms_http/metaservice.go
@@ -65,9 +65,13 @@ type MSRequest struct {
func (mr *MSResponse) MSResponseResultNodesToNodeInfos() ([]*NodeInfo, error) {
+ if mr.Code != SuccessCode {
+ return nil, errors.New("MSResponseResultNodesToNodeInfos response code is not OKļ¼code is: " + mr.Code + ", msg: " + mr.Msg + "")
+ }
+
nodes, ok := mr.Result["nodes"]
if !ok {
- return nil, errors.New("MSResponseResultNodes is not exist")
+ return nil, errors.New("MSResponseResultNodesToNodeInfos get nodes failed")
}
jsonStr, err := json.Marshal(nodes)
diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go
index 950fd5f2..ae0096d0 100644
--- a/pkg/controller/disaggregated_cluster_controller.go
+++ b/pkg/controller/disaggregated_cluster_controller.go
@@ -299,6 +299,7 @@ func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc
// when spec revert by operator should update cr or directly update status.
func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, preHv string) (ctrl.Result, error) {
postHv := hash.HashObject(ddc.Spec)
+ deepCopyDDC := ddc.DeepCopy()
if preHv != postHv {
var eddc dv1.DorisDisaggregatedCluster
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err == nil || !apierrors.IsNotFound(err) {
@@ -308,24 +309,19 @@ func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Conte
}
if err := dc.Update(ctx, ddc); err != nil {
klog.Errorf("disaggreatedClusterReconciler update DorisDisaggregatedCluster namespace %s name %s failed, err=%s", ddc.Namespace, ddc.Name, err.Error())
- return ctrl.Result{}, err
+ //return ctrl.Result{}, err
}
-
- //if cr updated, update cr. or update status.
- return ctrl.Result{}, nil
}
-
- return dc.updateDorisDisaggregatedClusterStatus(ctx, ddc)
+ return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC)
}
func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
var eddc dv1.DorisDisaggregatedCluster
- if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil {
- return ctrl.Result{}, err
- }
-
- ddc.Status.DeepCopyInto(&eddc.Status)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
+ if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil {
+ return err
+ }
+ ddc.Status.DeepCopyInto(&eddc.Status)
return dc.Status().Update(ctx, &eddc)
}); err != nil {
klog.Errorf("updateDorisDisaggregatedClusterStatus update status failed err: %s", err.Error())
diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computeclusters/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computeclusters/controller.go
index dc16f74f..1b68c1ab 100644
--- a/pkg/controller/sub_controller/disaggregated_cluster/computeclusters/controller.go
+++ b/pkg/controller/sub_controller/disaggregated_cluster/computeclusters/controller.go
@@ -165,13 +165,6 @@ func (dccs *DisaggregatedComputeClustersController) reconcileStatefulset(ctx con
return nil, err
}
- if err := k8s.ApplyStatefulSet(ctx, dccs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
- return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
- }); err != nil {
- klog.Errorf("disaggregatedComputeClustersController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
- return &sc.Event{Type: sc.EventWarning, Reason: sc.CCApplyResourceFailed, Message: err.Error()}, err
- }
-
var ccStatus *dv1.ComputeClusterStatus
for i := range cluster.Status.ComputeClusterStatuses {
@@ -180,25 +173,41 @@ func (dccs *DisaggregatedComputeClustersController) reconcileStatefulset(ctx con
break
}
}
+ scaleType := getScaleType(st, &est, ccStatus.Phase)
+
+ if scaleType == "resume" {
+ if ccStatus.SuspendReplicas != *(st.Spec.Replicas) {
+ errMessage := fmt.Sprintf("ResumeComputeCluster configuration is abnormal. The replicas of resumes(%d) is not equal to the replicas of suspends(%d).", *st.Spec.Replicas, ccStatus.SuspendReplicas)
+ return &sc.Event{
+ Type: sc.EventNormal,
+ Reason: sc.CCResumeReplicasInconsistency,
+ Message: errMessage,
+ }, errors.New(errMessage)
+ }
+ }
+
+ if err := k8s.ApplyStatefulSet(ctx, dccs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
+ return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
+ }); err != nil {
+ klog.Errorf("disaggregatedComputeClustersController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
+ return &sc.Event{Type: sc.EventWarning, Reason: sc.CCApplyResourceFailed, Message: err.Error()}, err
+ }
- //scaleType = "resume"
- if (*(st.Spec.Replicas) > *(est.Spec.Replicas) && *(est.Spec.Replicas) == 0) || ccStatus.Phase == dv1.ResumeFailed {
- response, err := ms_http.ResumeComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
- if response.Code != ms_http.SuccessCode {
+ switch scaleType {
+ case "resume":
+ err := ms_http.ResumeComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
+ ccStatus.SuspendReplicas = 0
+ if err != nil {
ccStatus.Phase = dv1.ResumeFailed
- jsonData, _ := json.Marshal(response)
- klog.Errorf("computeClusterSync ResumeComputeCluster response failed , response: %s", jsonData)
+ klog.Errorf("computeClusterSync ResumeComputeCluster response failed , err: %s", err.Error())
return &sc.Event{
Type: sc.EventNormal,
Reason: sc.CCResumeStatusRequestFailed,
- Message: "ResumeComputeCluster request of disaggregated BE failed: " + response.Msg,
+ Message: "ResumeComputeCluster request of disaggregated BE failed: " + err.Error(),
}, err
}
ccStatus.Phase = dv1.Scaling
- }
-
- //scaleType = "scaleDown"
- if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) > 0) || ccStatus.Phase == dv1.ScaleDownFailed {
+ case "scaleDown":
if err := dccs.dropCCFromHttpClient(cluster, cc); err != nil {
ccStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE failed, err:%s ", err.Error())
@@ -206,27 +215,39 @@ func (dccs *DisaggregatedComputeClustersController) reconcileStatefulset(ctx con
err
}
ccStatus.Phase = dv1.Scaling
- }
-
- //scaleType = "suspend"
- if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) == 0) || ccStatus.Phase == dv1.SuspendFailed {
- response, err := ms_http.SuspendComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
- if response.Code != ms_http.SuccessCode {
+ case "suspend":
+ err := ms_http.SuspendComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
+ if err != nil {
ccStatus.Phase = dv1.SuspendFailed
- jsonData, _ := json.Marshal(response)
- klog.Errorf("computeClusterSync SuspendComputeCluster response failed , response: %s", jsonData)
+ klog.Errorf("computeClusterSync SuspendComputeCluster response failed , err: %s", err.Error())
return &sc.Event{
Type: sc.EventNormal,
Reason: sc.CCSuspendStatusRequestFailed,
- Message: "SuspendComputeCluster request of disaggregated BE failed: " + response.Msg,
+ Message: "SuspendComputeCluster request of disaggregated BE failed: " + err.Error(),
}, err
}
+ ccStatus.SuspendReplicas = *est.Spec.Replicas
ccStatus.Phase = dv1.Suspended
}
return nil, nil
}
+func getScaleType(st, est *appv1.StatefulSet, phase dv1.Phase) string {
+ if (*(st.Spec.Replicas) > *(est.Spec.Replicas) && *(est.Spec.Replicas) == 0) || phase == dv1.ResumeFailed {
+ return "resume"
+ }
+
+ if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) > 0) || phase == dv1.ScaleDownFailed {
+ return "scaleDown"
+ }
+
+ if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) == 0) || phase == dv1.SuspendFailed {
+ return "suspend"
+ }
+ return ""
+}
+
// initial compute cluster status before sync resources. status changing with sync steps, and generate the last status by classify pods.
func (dccs *DisaggregatedComputeClustersController) initialCCStatus(ddc *dv1.DorisDisaggregatedCluster, cc *dv1.ComputeCluster) {
ccss := ddc.Status.ComputeClusterStatuses
@@ -247,6 +268,7 @@ func (dccs *DisaggregatedComputeClustersController) initialCCStatus(ddc *dv1.Dor
ccss[i].Phase == dv1.Scaling {
defaultStatus.Phase = ccss[i].Phase
}
+ defaultStatus.SuspendReplicas = ccss[i].SuspendReplicas
ccss[i] = defaultStatus
return
}
diff --git a/pkg/controller/sub_controller/events.go b/pkg/controller/sub_controller/events.go
index f302cdad..61f5de4c 100644
--- a/pkg/controller/sub_controller/events.go
+++ b/pkg/controller/sub_controller/events.go
@@ -55,6 +55,7 @@ var (
ComputeClustersEmpty EventReason = "CCsEmpty"
CCHTTPFailed EventReason = "CCHTTPResponseFailed"
CCSuspendStatusRequestFailed EventReason = "CCSuspendStatusRequestFailed"
+ CCResumeReplicasInconsistency EventReason = "CCSuspendReplicasInconsistency"
CCResumeStatusRequestFailed EventReason = "CCResumeStatusRequestFailed"
CCUniqueIdentifierDuplicate EventReason = "CCUniqueIdentifierDuplicate"
CCUniqueIdentifierNotMatchRegex EventReason = "CCUniqueIdentifierNotMatchRegex"
|