diff --git a/api/doris/v1/doriscluster_util.go b/api/doris/v1/doriscluster_util.go index 3ecfc74c..ecadd4e7 100644 --- a/api/doris/v1/doriscluster_util.go +++ b/api/doris/v1/doriscluster_util.go @@ -28,6 +28,9 @@ import ( const ( //ComponentsResourceHash the component hash ComponentResourceHash string = "app.doris.components/hash" + + FERestartAt string = "apache.doris.fe/restartedAt" + BERestartAt string = "apache.doris.be/restartedAt" ) // the labels key diff --git a/config/crd/bases/crds.yaml b/config/crd/bases/crds.yaml index 147a9ef6..4cd2c6eb 100644 --- a/config/crd/bases/crds.yaml +++ b/config/crd/bases/crds.yaml @@ -1094,6 +1094,14 @@ spec: type: string type: object type: object + enableWorkloadGroup: + description: |- + EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group. + Default value is 'false'. + Enabling it means that the container must be started in privileged mode. + Please confirm whether the host machine and k8s cluster allow it. + Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group + type: boolean envVars: description: cnEnvVars is a slice of environment variables that are added to the pods, the default is empty. diff --git a/config/crd/bases/doris.selectdb.com_dorisclusters.yaml b/config/crd/bases/doris.selectdb.com_dorisclusters.yaml index 6ba9a760..d7a17403 100644 --- a/config/crd/bases/doris.selectdb.com_dorisclusters.yaml +++ b/config/crd/bases/doris.selectdb.com_dorisclusters.yaml @@ -1094,6 +1094,14 @@ spec: type: string type: object type: object + enableWorkloadGroup: + description: |- + EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group. + Default value is 'false'. + Enabling it means that the container must be started in privileged mode. + Please confirm whether the host machine and k8s cluster allow it. + Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group + type: boolean envVars: description: cnEnvVars is a slice of environment variables that are added to the pods, the default is empty. diff --git a/doc/api.md b/doc/api.md index 917fcf63..5adda542 100644 --- a/doc/api.md +++ b/doc/api.md @@ -617,6 +617,21 @@ BaseSpec

the foundation spec for creating be software services.

+ + +enableWorkloadGroup
+ +bool + + + +

EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group. +Default value is ‘false’. +Enabling it means that the container must be started in privileged mode. +Please confirm whether the host machine and k8s cluster allow it. +Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group

+ +

BrokerSpec @@ -2604,5 +2619,5 @@ string

Generated with gen-crd-api-reference-docs -on git commit 54d1acc. +on git commit 1a70fe9.

diff --git a/doc/disaggregated_api.md b/doc/disaggregated_api.md index 7a7453c3..4b63b9b0 100644 --- a/doc/disaggregated_api.md +++ b/doc/disaggregated_api.md @@ -1441,5 +1441,5 @@ string

Generated with gen-crd-api-reference-docs -on git commit 54d1acc. +on git commit 7d27da0.

diff --git a/helm-charts/doris-operator/crds/doris.selectdb.com_dorisclusters.yaml b/helm-charts/doris-operator/crds/doris.selectdb.com_dorisclusters.yaml index 6ba9a760..d7a17403 100644 --- a/helm-charts/doris-operator/crds/doris.selectdb.com_dorisclusters.yaml +++ b/helm-charts/doris-operator/crds/doris.selectdb.com_dorisclusters.yaml @@ -1094,6 +1094,14 @@ spec: type: string type: object type: object + enableWorkloadGroup: + description: |- + EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group. + Default value is 'false'. + Enabling it means that the container must be started in privileged mode. + Please confirm whether the host machine and k8s cluster allow it. + Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group + type: boolean envVars: description: cnEnvVars is a slice of environment variables that are added to the pods, the default is empty. diff --git a/pkg/controller/sub_controller/be/controller.go b/pkg/controller/sub_controller/be/controller.go index 22c171d5..d6fcfa3f 100644 --- a/pkg/controller/sub_controller/be/controller.go +++ b/pkg/controller/sub_controller/be/controller.go @@ -54,6 +54,11 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error { if dcr.Spec.BeSpec == nil { return nil } + + var oldStatus v1.ComponentStatus + if dcr.Status.BEStatus != nil { + oldStatus = *(dcr.Status.BEStatus.DeepCopy()) + } be.InitStatus(dcr, v1.Component_BE) if !be.FeAvailable(dcr) { return nil @@ -83,6 +88,10 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error { return err } + if err = be.prepareStatefulsetApply(ctx, dcr, oldStatus); err != nil { + return err + } + st := be.buildBEStatefulSet(dcr) if !be.PrepareReconcileResources(ctx, dcr, v1.Component_BE) { klog.Infof("be controller sync preparing resource for reconciling namespace %s name %s!", dcr.Namespace, dcr.Name) @@ -109,7 +118,7 @@ func (be *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { return nil } - return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas) + return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas, v1.Component_BE) } func (be *Controller) ClearResources(ctx context.Context, dcr *v1.DorisCluster) (bool, error) { diff --git a/pkg/controller/sub_controller/be/prepare_modify.go b/pkg/controller/sub_controller/be/prepare_modify.go new file mode 100644 index 00000000..dc606e66 --- /dev/null +++ b/pkg/controller/sub_controller/be/prepare_modify.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package be + +import ( + "context" + v1 "github.com/apache/doris-operator/api/doris/v1" +) + +// prepareStatefulsetApply means Pre-operation and status control on the client side +func (be *Controller) prepareStatefulsetApply(ctx context.Context, dcr *v1.DorisCluster, oldStatus v1.ComponentStatus) error { + + // be rolling restart + // check 1: be Phase is Available + // check 2: be RestartTime is not empty and useful + // check 3: be RestartTime different from old(This condition does not need to be checked here. If it is allowed to pass, it will be processed idempotent when applying sts.) + if oldStatus.ComponentCondition.Phase == v1.Available && be.CheckRestartTimeAndInject(dcr, v1.Component_BE) { + dcr.Status.BEStatus.ComponentCondition.Phase = v1.Restarting + } + + //TODO check upgrade + + return nil +} diff --git a/pkg/controller/sub_controller/broker/controller.go b/pkg/controller/sub_controller/broker/controller.go index da90a17f..0e87a25a 100644 --- a/pkg/controller/sub_controller/broker/controller.go +++ b/pkg/controller/sub_controller/broker/controller.go @@ -113,7 +113,7 @@ func (bk *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { cluster.Status.BrokerStatus = bs bs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_Broker) - return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas) + return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas, v1.Component_Broker) } diff --git a/pkg/controller/sub_controller/cn/controller.go b/pkg/controller/sub_controller/cn/controller.go index 202b0c4f..27cfd936 100644 --- a/pkg/controller/sub_controller/cn/controller.go +++ b/pkg/controller/sub_controller/cn/controller.go @@ -145,7 +145,7 @@ func (cn *Controller) UpdateComponentStatus(cluster *dorisv1.DorisCluster) error replicas := *est.Spec.Replicas cs.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_CN) - return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas) + return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas, dorisv1.Component_CN) } // autoscaler represents start autoscaler or not. diff --git a/pkg/controller/sub_controller/events.go b/pkg/controller/sub_controller/events.go index d890adb3..3e3c5993 100644 --- a/pkg/controller/sub_controller/events.go +++ b/pkg/controller/sub_controller/events.go @@ -70,6 +70,7 @@ var ( MSServiceDeletedFailed EventReason = "MSServiceDeletedFailed" MSStatefulsetDeleteFailed EventReason = "MSStatefulsetDeleteFailed" FDBAddressNotConfiged EventReason = "FDBAddressNotConfiged" + RestartTimeInvalid EventReason = "RestartTimeInvalid" ) type Event struct { diff --git a/pkg/controller/sub_controller/fe/controller.go b/pkg/controller/sub_controller/fe/controller.go index 347dcf03..1f9f586c 100644 --- a/pkg/controller/sub_controller/fe/controller.go +++ b/pkg/controller/sub_controller/fe/controller.go @@ -57,7 +57,7 @@ func (fc *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error { return nil } - return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas) + return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas, v1.Component_FE) } // New construct a FeController. @@ -80,6 +80,10 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error klog.Info("fe Controller Sync ", "the fe component is not needed ", "namespace ", cluster.Namespace, " doris cluster name ", cluster.Name) return nil } + var oldStatus v1.ComponentStatus + if cluster.Status.FEStatus != nil { + oldStatus = *(cluster.Status.FEStatus.DeepCopy()) + } fc.InitStatus(cluster, v1.Component_FE) feSpec := cluster.Spec.FeSpec @@ -111,7 +115,7 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error return nil } - if err = fc.prepareStatefulsetApply(ctx, cluster); err != nil { + if err = fc.prepareStatefulsetApply(ctx, cluster, oldStatus); err != nil { return err } diff --git a/pkg/controller/sub_controller/fe/prepare_modify.go b/pkg/controller/sub_controller/fe/prepare_modify.go index c484c5dd..715c01c0 100644 --- a/pkg/controller/sub_controller/fe/prepare_modify.go +++ b/pkg/controller/sub_controller/fe/prepare_modify.go @@ -33,7 +33,7 @@ import ( ) // prepareStatefulsetApply means Pre-operation and status control on the client side -func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster) error { +func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster, oldStatus v1.ComponentStatus) error { var oldSt appv1.StatefulSet err := fc.K8sclient.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: v1.GenerateComponentStatefulSetName(cluster, v1.Component_FE)}, &oldSt) if err != nil { @@ -63,7 +63,15 @@ func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.D return nil } - //TODO check upgrade ,restart + // fe rolling restart + // check 1: fe Phase is Available + // check 2: fe RestartTime is not empty and useful + // check 3: fe RestartTime different from old(This condition does not need to be checked here. If it is allowed to pass, it will be processed idempotent when applying sts.) + if oldStatus.ComponentCondition.Phase == v1.Available && fc.CheckRestartTimeAndInject(cluster, v1.Component_FE) { + cluster.Status.FEStatus.ComponentCondition.Phase = v1.Restarting + } + + //TODO check upgrade return nil } diff --git a/pkg/controller/sub_controller/sub_controller.go b/pkg/controller/sub_controller/sub_controller.go index 91385939..4c93716e 100644 --- a/pkg/controller/sub_controller/sub_controller.go +++ b/pkg/controller/sub_controller/sub_controller.go @@ -56,21 +56,86 @@ type SubDefaultController struct { K8srecorder record.EventRecorder } +func (d *SubDefaultController) CheckRestartTimeAndInject(dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) bool { + var baseSpec *dorisv1.BaseSpec + var restartedAt string + var restartAnnotationsKey string + switch componentType { + case dorisv1.Component_FE: + baseSpec = &dcr.Spec.FeSpec.BaseSpec + restartedAt = dcr.Annotations[dorisv1.FERestartAt] + restartAnnotationsKey = dorisv1.FERestartAt + case dorisv1.Component_BE: + baseSpec = &dcr.Spec.BeSpec.BaseSpec + restartedAt = dcr.Annotations[dorisv1.BERestartAt] + restartAnnotationsKey = dorisv1.BERestartAt + default: + klog.Errorf("CheckRestartTimeAndInject dorisClusterName %s, namespace %s componentType %s not supported.", dcr.Name, dcr.Namespace, componentType) + } + + if restartedAt == "" { + return false + } + + // run shell: date +"%Y-%m-%dT%H:%M:%S%:z" + // "2024-11-21T11:08:56+08:00" + parseTime, err := time.Parse(time.RFC3339, restartedAt) + if err != nil { + checkErr := fmt.Errorf("CheckRestartTimeAndInject error: time format is incorrect. dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s , error: %s", dcr.Name, dcr.Namespace, componentType, restartedAt, err.Error()) + klog.Error(checkErr.Error()) + d.K8srecorder.Event(dcr, string(EventWarning), string(RestartTimeInvalid), checkErr.Error()) + return false + } + + effectiveStartTime := time.Now().Add(-10 * time.Minute) + + if effectiveStartTime.After(parseTime) { + klog.Errorf("CheckRestartTimeAndInject The time has expired, dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s : The time has expired, if you want to restart doris, please set a future time", dcr.Name, dcr.Namespace, componentType, restartedAt) + d.K8srecorder.Event(dcr, string(EventWarning), string(RestartTimeInvalid), fmt.Sprintf("the %s restart time is not effective. the 'restartedAt' %s can't be earlier than 10 minutes before the current time", componentType, restartedAt)) + return false + } + + // check passed, set annotations to doriscluster baseSpec + if baseSpec.Annotations == nil { + baseSpec.Annotations = make(map[string]string) + } + baseSpec.Annotations[restartAnnotationsKey] = restartedAt + return true +} + // UpdateStatus update the component status on src. -func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error { - return d.ClassifyPodsByStatus(namespace, status, labels, replicas) +func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error { + return d.ClassifyPodsByStatus(namespace, status, labels, replicas, componentType) } -func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error { +func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error { var podList corev1.PodList if err := d.K8sclient.List(context.Background(), &podList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil { return err } var creatings, readys, faileds []string + var firstRestartAnnotation, restartAnnotationsKey string podmap := make(map[string]corev1.Pod) + + if len(podList.Items) == 0 { + return nil + } + + switch componentType { + case dorisv1.Component_FE: + restartAnnotationsKey = dorisv1.FERestartAt + case dorisv1.Component_BE: + restartAnnotationsKey = dorisv1.BERestartAt + } + firstRestartAnnotation = podList.Items[0].Annotations[restartAnnotationsKey] + //get all pod status that controlled by st. + stsRollingRestartAnnotationsSameCheck := true for _, pod := range podList.Items { + if pod.Annotations[restartAnnotationsKey] != firstRestartAnnotation { + stsRollingRestartAnnotationsSameCheck = false + } podmap[pod.Name] = pod if ready := k8s.PodIsReady(&pod.Status); ready { readys = append(readys, pod.Name) @@ -81,7 +146,7 @@ func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *do } } - if len(readys) == int(replicas) { + if len(readys) == int(replicas) && stsRollingRestartAnnotationsSameCheck { status.ComponentCondition.Phase = dorisv1.Available } else if len(faileds) != 0 { status.ComponentCondition.Phase = dorisv1.HaveMemberFailed