Skip to content

Commit

Permalink
add suspendReplicas for check resume cluster replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
catpineapple committed Sep 4, 2024
1 parent 711e5a4 commit 7d666e8
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 45 deletions.
2 changes: 2 additions & 0 deletions api/disaggregated/cluster/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ type ComputeClusterStatus struct {
AvailableStatus AvailableStatus `json:"availableStatus,omitempty"`
//ClusterId display the clusterId of compute cluster in meta.
ClusterId string `json:"clusterId,omitempty"`
//suspend replicas display the replicas of compute cluster before resume.
SuspendReplicas int32 `json:"suspendReplicas,omitempty"`

// replicas is the number of Pods created by the StatefulSet controller.
Replicas int32 `json:"replicas,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14081,6 +14081,11 @@ spec:
description: the statefulset of control this compute cluster
pods.
type: string
suspendReplicas:
description: suspend replicas display the replicas of compute
cluster before resume.
format: int32
type: integer
type: object
type: array
feStatus:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5635,6 +5635,11 @@ spec:
description: the statefulset of control this compute cluster
pods.
type: string
suspendReplicas:
description: suspend replicas display the replicas of compute
cluster before resume.
format: int32
type: integer
type: object
type: array
feStatus:
Expand Down
2 changes: 1 addition & 1 deletion doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2593,5 +2593,5 @@ string
<hr/>
<p><em>
Generated with <code>gen-crd-api-reference-docs</code>
on git commit <code>1d205b5</code>.
on git commit <code>711e5a4</code>.
</em></p>
13 changes: 12 additions & 1 deletion doc/disaggregated_cluster_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,17 @@ string
</tr>
<tr>
<td>
<code>suspendReplicas</code><br/>
<em>
int32
</em>
</td>
<td>
<p>suspend replicas display the replicas of compute cluster before resume.</p>
</td>
</tr>
<tr>
<td>
<code>replicas</code><br/>
<em>
int32
Expand Down Expand Up @@ -1435,5 +1446,5 @@ string
<hr/>
<p><em>
Generated with <code>gen-crd-api-reference-docs</code>
on git commit <code>1d205b5</code>.
on git commit <code>711e5a4</code>.
</em></p>
23 changes: 19 additions & 4 deletions pkg/common/utils/disaggregated_ms/ms_http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
"strings"
)

const (
Expand Down Expand Up @@ -205,12 +206,26 @@ func DropBENodes(endpoint, token, instanceID string, cluster Cluster) (*MSRespon
}

// suspend cluster
func SuspendComputeCluster(endpoint, token, instanceID, clusterID string) (*MSResponse, error) {
return SetClusterStatus(endpoint, token, instanceID, clusterID, "SUSPENDED")
func SuspendComputeCluster(endpoint, token, instanceID, clusterID string) error {
response, err := SetClusterStatus(endpoint, token, instanceID, clusterID, "SUSPENDED")
if err != nil {
return fmt.Errorf("SuspendComputeCluster SetClusterStatus failed: %w", err)
}
if response.Code != SuccessCode && !strings.Contains(response.Msg, "original cluster is SUSPENDED") {
return fmt.Errorf("SuspendComputeCluster SetClusterStatus failed: %s", response.Msg)
}
return nil
}

func ResumeComputeCluster(endpoint, token, instanceID, clusterID string) (*MSResponse, error) {
return SetClusterStatus(endpoint, token, instanceID, clusterID, "NORMAL")
func ResumeComputeCluster(endpoint, token, instanceID, clusterID string) error {
response, err := SetClusterStatus(endpoint, token, instanceID, clusterID, "NORMAL")
if err != nil {
return fmt.Errorf("ResumeComputeCluster SetClusterStatus failed: %w", err)
}
if response.Code != SuccessCode && !strings.Contains(response.Msg, "original cluster is NORMAL") {
return fmt.Errorf("ResumeComputeCluster SetClusterStatus failed: %s", response.Msg)
}
return nil
}

// SetClusterStatus resume cluster
Expand Down
6 changes: 5 additions & 1 deletion pkg/common/utils/disaggregated_ms/ms_http/metaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ type MSRequest struct {

func (mr *MSResponse) MSResponseResultNodesToNodeInfos() ([]*NodeInfo, error) {

if mr.Code != SuccessCode {
return nil, errors.New("MSResponseResultNodesToNodeInfos response code is not OK,code is: " + mr.Code + ", msg: " + mr.Msg + "")
}

nodes, ok := mr.Result["nodes"]
if !ok {
return nil, errors.New("MSResponseResultNodes is not exist")
return nil, errors.New("MSResponseResultNodesToNodeInfos get nodes failed")
}

jsonStr, err := json.Marshal(nodes)
Expand Down
18 changes: 7 additions & 11 deletions pkg/controller/disaggregated_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (dc *DisaggregatedClusterReconciler) reconcileSub(ctx context.Context, ddc
// when spec revert by operator should update cr or directly update status.
func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster, preHv string) (ctrl.Result, error) {
postHv := hash.HashObject(ddc.Spec)
deepCopyDDC := ddc.DeepCopy()
if preHv != postHv {
var eddc dv1.DorisDisaggregatedCluster
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err == nil || !apierrors.IsNotFound(err) {
Expand All @@ -308,24 +309,19 @@ func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Conte
}
if err := dc.Update(ctx, ddc); err != nil {
klog.Errorf("disaggreatedClusterReconciler update DorisDisaggregatedCluster namespace %s name %s failed, err=%s", ddc.Namespace, ddc.Name, err.Error())
return ctrl.Result{}, err
//return ctrl.Result{}, err
}

//if cr updated, update cr. or update status.
return ctrl.Result{}, nil
}

return dc.updateDorisDisaggregatedClusterStatus(ctx, ddc)
return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC)
}

func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
var eddc dv1.DorisDisaggregatedCluster
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil {
return ctrl.Result{}, err
}

ddc.Status.DeepCopyInto(&eddc.Status)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := dc.Get(ctx, types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}, &eddc); err != nil {
return err
}
ddc.Status.DeepCopyInto(&eddc.Status)
return dc.Status().Update(ctx, &eddc)
}); err != nil {
klog.Errorf("updateDorisDisaggregatedClusterStatus update status failed err: %s", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ func (dccs *DisaggregatedComputeClustersController) reconcileStatefulset(ctx con
return nil, err
}

if err := k8s.ApplyStatefulSet(ctx, dccs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
}); err != nil {
klog.Errorf("disaggregatedComputeClustersController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CCApplyResourceFailed, Message: err.Error()}, err
}

var ccStatus *dv1.ComputeClusterStatus

for i := range cluster.Status.ComputeClusterStatuses {
Expand All @@ -180,53 +173,81 @@ func (dccs *DisaggregatedComputeClustersController) reconcileStatefulset(ctx con
break
}
}
scaleType := getScaleType(st, &est, ccStatus.Phase)

if scaleType == "resume" {
if ccStatus.SuspendReplicas != *(st.Spec.Replicas) {
errMessage := fmt.Sprintf("ResumeComputeCluster configuration is abnormal. The replicas of resumes(%d) is not equal to the replicas of suspends(%d).", *st.Spec.Replicas, ccStatus.SuspendReplicas)
return &sc.Event{
Type: sc.EventNormal,
Reason: sc.CCResumeReplicasInconsistency,
Message: errMessage,
}, errors.New(errMessage)
}
}

if err := k8s.ApplyStatefulSet(ctx, dccs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
return resource.StatefulsetDeepEqualWithOmitKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, true, false)
}); err != nil {
klog.Errorf("disaggregatedComputeClustersController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CCApplyResourceFailed, Message: err.Error()}, err
}

//scaleType = "resume"
if (*(st.Spec.Replicas) > *(est.Spec.Replicas) && *(est.Spec.Replicas) == 0) || ccStatus.Phase == dv1.ResumeFailed {
response, err := ms_http.ResumeComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
if response.Code != ms_http.SuccessCode {
switch scaleType {
case "resume":
err := ms_http.ResumeComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
ccStatus.SuspendReplicas = 0
if err != nil {
ccStatus.Phase = dv1.ResumeFailed
jsonData, _ := json.Marshal(response)
klog.Errorf("computeClusterSync ResumeComputeCluster response failed , response: %s", jsonData)
klog.Errorf("computeClusterSync ResumeComputeCluster response failed , err: %s", err.Error())
return &sc.Event{
Type: sc.EventNormal,
Reason: sc.CCResumeStatusRequestFailed,
Message: "ResumeComputeCluster request of disaggregated BE failed: " + response.Msg,
Message: "ResumeComputeCluster request of disaggregated BE failed: " + err.Error(),
}, err
}
ccStatus.Phase = dv1.Scaling
}

//scaleType = "scaleDown"
if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) > 0) || ccStatus.Phase == dv1.ScaleDownFailed {
case "scaleDown":
if err := dccs.dropCCFromHttpClient(cluster, cc); err != nil {
ccStatus.Phase = dv1.ScaleDownFailed
klog.Errorf("ScaleDownBE failed, err:%s ", err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CCHTTPFailed, Message: err.Error()},
err
}
ccStatus.Phase = dv1.Scaling
}

//scaleType = "suspend"
if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) == 0) || ccStatus.Phase == dv1.SuspendFailed {
response, err := ms_http.SuspendComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
if response.Code != ms_http.SuccessCode {
case "suspend":
err := ms_http.SuspendComputeCluster(cluster.Status.MetaServiceStatus.MetaServiceEndpoint, cluster.Status.MetaServiceStatus.MsToken, cluster.Status.InstanceId, cc.ClusterId)
if err != nil {
ccStatus.Phase = dv1.SuspendFailed
jsonData, _ := json.Marshal(response)
klog.Errorf("computeClusterSync SuspendComputeCluster response failed , response: %s", jsonData)
klog.Errorf("computeClusterSync SuspendComputeCluster response failed , err: %s", err.Error())
return &sc.Event{
Type: sc.EventNormal,
Reason: sc.CCSuspendStatusRequestFailed,
Message: "SuspendComputeCluster request of disaggregated BE failed: " + response.Msg,
Message: "SuspendComputeCluster request of disaggregated BE failed: " + err.Error(),
}, err
}
ccStatus.SuspendReplicas = *est.Spec.Replicas
ccStatus.Phase = dv1.Suspended
}

return nil, nil
}

func getScaleType(st, est *appv1.StatefulSet, phase dv1.Phase) string {
if (*(st.Spec.Replicas) > *(est.Spec.Replicas) && *(est.Spec.Replicas) == 0) || phase == dv1.ResumeFailed {
return "resume"
}

if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) > 0) || phase == dv1.ScaleDownFailed {
return "scaleDown"
}

if (*(st.Spec.Replicas) < *(est.Spec.Replicas) && *(st.Spec.Replicas) == 0) || phase == dv1.SuspendFailed {
return "suspend"
}
return ""
}

// initial compute cluster status before sync resources. status changing with sync steps, and generate the last status by classify pods.
func (dccs *DisaggregatedComputeClustersController) initialCCStatus(ddc *dv1.DorisDisaggregatedCluster, cc *dv1.ComputeCluster) {
ccss := ddc.Status.ComputeClusterStatuses
Expand All @@ -247,6 +268,7 @@ func (dccs *DisaggregatedComputeClustersController) initialCCStatus(ddc *dv1.Dor
ccss[i].Phase == dv1.Scaling {
defaultStatus.Phase = ccss[i].Phase
}
defaultStatus.SuspendReplicas = ccss[i].SuspendReplicas
ccss[i] = defaultStatus
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/sub_controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ComputeClustersEmpty EventReason = "CCsEmpty"
CCHTTPFailed EventReason = "CCHTTPResponseFailed"
CCSuspendStatusRequestFailed EventReason = "CCSuspendStatusRequestFailed"
CCResumeReplicasInconsistency EventReason = "CCSuspendReplicasInconsistency"
CCResumeStatusRequestFailed EventReason = "CCResumeStatusRequestFailed"
CCUniqueIdentifierDuplicate EventReason = "CCUniqueIdentifierDuplicate"
CCUniqueIdentifierNotMatchRegex EventReason = "CCUniqueIdentifierNotMatchRegex"
Expand Down

0 comments on commit 7d666e8

Please sign in to comment.