Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor][fix](ddc)refactor clear resource code,fix service label error,use sts replicas… #318

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/common/utils/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Serv
return PatchClientObject(ctx, k8sclient, svc)
}

func ListServicesInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]corev1.Service, error) {
var svcList corev1.ServiceList
if err := k8sclient.List(ctx, &svcList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}

return svcList.Items, nil
}

func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]appv1.StatefulSet, error) {
var stsList appv1.StatefulSetList
if err := k8sclient.List(ctx, &stsList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}
return stsList.Items, nil
}

// ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset.
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error {
var est appv1.StatefulSet
Expand All @@ -93,6 +110,12 @@ func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.St
return err
}

func GetStatefulSet(ctx context.Context, k8sclient client.Client, namespace, name string) (*appv1.StatefulSet, error) {
var est appv1.StatefulSet
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &est)
return &est, err
}

func CreateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Info("Creating resource service ", "namespace ", object.GetNamespace(), " name ", object.GetName(), " kind ", object.GetObjectKind().GroupVersionKind().Kind)
if err := k8sclient.Create(ctx, object); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,79 +259,220 @@ func (dcgs *DisaggregatedComputeGroupsController) validateRegex(cgs []dv1.Comput
func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Context, obj client.Object) (bool, error) {
ddc := obj.(*dv1.DorisDisaggregatedCluster)

if !dcgs.feAvailable(ddc) {
return false, nil
}

var clearCGs []dv1.ComputeGroupStatus
var eCGs []dv1.ComputeGroupStatus

for i, cgs := range ddc.Status.ComputeGroupStatuses {
for _, cg := range ddc.Spec.ComputeGroups {
if cgs.UniqueId == cg.UniqueId {
eCGs = append(eCGs, ddc.Status.ComputeGroupStatuses[i])
goto NoNeedAppend
break
}
}

clearCGs = append(clearCGs, ddc.Status.ComputeGroupStatuses[i])
// no need clear should not append.
NoNeedAppend:
}

sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
//list the svcs and stss owner reference to dorisDisaggregatedCluster.
cls := dcgs.GetCG2LayerCommonSchedulerLabels(ddc.Name)
svcs, err := k8s.ListServicesInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
klog.Errorf("DisaggregatedComputeGroupsController ListServicesInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
stss, err := k8s.ListStatefulsetInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ListStatefulsetInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
defer sqlClient.Close()

for i := range clearCGs {
cgs := clearCGs[i]
cleared := true
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
cleared = false
klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
}
//clear unused service and statefulset.
delSvcNames := dcgs.findUnusedSvcs(svcs, ddc)
delStsNames, delUniqueIds := dcgs.findUnusedStssAndUniqueIds(stss, ddc)

if err = dcgs.clearCGInDorisMeta(ctx, delUniqueIds, ddc); err != nil {
return false, err
}
if err = dcgs.clearSvcs(ctx, delSvcNames, ddc); err != nil {
return false, err
}
if err = dcgs.clearStatefulsets(ctx, delStsNames, ddc); err != nil {
return false, err
}

if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
cleared = false
klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
//clear unused pvc
for i := range eCGs {
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear ComputeGroup reduced replicas PVC failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, eCGs[i].UniqueId, err.Error())
}
if !cleared {
eCGs = append(eCGs, clearCGs[i])
continue
}

for _, uniqueId := range delUniqueIds {
//new fake computeGroup status for clear all pvcs owner reference to deleted compute group.
fakeCgs := dv1.ComputeGroupStatus{
UniqueId: uniqueId,
}
// drop compute group
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
cgKeepAmount := int32(0)
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, fakeCgs)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear deleted compute group failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, uniqueId, err.Error())
}
}

ddc.Status.ComputeGroupStatuses = eCGs
return true, nil

//TODO: next pr remove the code
//sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
//if err != nil {
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
// return false, err
//}
//defer sqlClient.Close()
//
//for i := range clearCGs {
// cgs := clearCGs[i]
// cleared := true
// if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
// cleared = false
// klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
// }
//
// if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
// cleared = false
// klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
// }
// if !cleared {
// eCGs = append(eCGs, clearCGs[i])
// continue
// }
// // drop compute group
// cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
// cgKeepAmount := int32(0)
// err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
// if err != nil {
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
// }
//
//}
//
//for i := range eCGs {
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
// if err != nil {
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
// }
//}
//for i := range clearCGs {
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
// if err != nil {
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
// }
//}
//
//ddc.Status.ComputeGroupStatuses = eCGs
//
//return true, nil
}

func (dcgs *DisaggregatedComputeGroupsController) clearStatefulsets(ctx context.Context, stsNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range stsNames {
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear statefulset failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
return nil
}

for i := range eCGs {
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
func (dcgs *DisaggregatedComputeGroupsController) clearSvcs(ctx context.Context, svcNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range svcNames {
if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear service failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
for i := range clearCGs {
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
return nil
}

func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
if len(cgNames) == 0 {
return nil
}

sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
defer sqlClient.Close()

for _, name := range cgNames {
//clear cg, the keepAmount = 0
//confirm used the right cgName, as the cgName get from the uniqueid that '-' replaced by '_'.
cgName := strings.ReplaceAll(name, "-", "_")
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, 0)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
}

ddc.Status.ComputeGroupStatuses = eCGs
return nil
}

return true, nil
func (dcgs *DisaggregatedComputeGroupsController) findUnusedSvcs(svcs []corev1.Service, ddc *dv1.DorisDisaggregatedCluster) []string {
var unusedSvcNames []string
for i, _ := range svcs {
own := ownerReference2ddc(&svcs[i], ddc)
if !own {
//not owner reference to ddc, should skip the service.
continue
}

svcUniqueId := getUniqueIdFromClientObject(&svcs[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == svcUniqueId {
exist = true
break
}
}

if !exist {
unusedSvcNames = append(unusedSvcNames, svcs[i].Name)
}
}

return unusedSvcNames
}

func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*cgNames*/) {
var unusedStsNames []string
var unusedUniqueIds []string
for i, _ := range stss {
own := ownerReference2ddc(&stss[i], ddc)
if !own {
//not owner reference tto ddc should skip the statefulset.
continue
}

stsUniqueId := getUniqueIdFromClientObject(&stss[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == stsUniqueId {
exist = true
break
}
}
if !exist {
unusedStsNames = append(unusedStsNames, stss[i].Name)
unusedUniqueIds = append(unusedUniqueIds, stsUniqueId)
}
}

return unusedStsNames, unusedUniqueIds
}

// ClearStatefulsetUnusedPVCs
Expand Down Expand Up @@ -365,8 +506,17 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx
}

if cg != nil {
replicas := int(*cg.Replicas)
//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
replicas := 0
stsName := ddc.GetCGStatefulsetName(cg)
sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
//waiting next reconciling.
return nil
}
replicas = int(*sts.Spec.Replicas)

cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps)
paths, _ := dcgs.getCacheMaxSizeAndPaths(cvs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisag

ob := &svc.ObjectMeta
ob.Name = ddc.GetCGServiceName(cg)
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Namespace, uniqueId)
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)

spec := &svc.Spec
spec.Selector = dcgs.newCGPodsSelector(ddc.Name, uniqueId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ const (

// generate statefulset or service labels
func (dcgs *DisaggregatedComputeGroupsController) newCG2LayerSchedulerLabels(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
labels := dcgs.GetCG2LayerCommonSchedulerLabels(ddcName)
labels[dv1.DorisDisaggregatedComputeGroupUniqueId] = uniqueId
return labels
}

func (dcgs *DisaggregatedComputeGroupsController) GetCG2LayerCommonSchedulerLabels(ddcName string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedComputeGroupUniqueId: uniqueId,
dv1.DorisDisaggregatedOwnerReference: ddcName,
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedOwnerReference: ddcName,
}
}

func (dcgs *DisaggregatedComputeGroupsController) newCGPodsSelector(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,36 @@

package computegroups

import (
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// regex
var (
compute_group_name_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
compute_group_id_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
)

func ownerReference2ddc(obj client.Object, cluster *dv1.DorisDisaggregatedCluster) bool {
if obj == nil {
return false
}

ors := obj.GetOwnerReferences()
for _, or := range ors {
if or.Name == cluster.Name && or.UID == cluster.UID {
return true
}
}

return false
}

func getUniqueIdFromClientObject(obj client.Object) string {
if obj == nil {
return ""
}
labels := obj.GetLabels()
return labels[dv1.DorisDisaggregatedComputeGroupUniqueId]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (dfc *DisaggregatedFEController) newService(ddc *dv1.DorisDisaggregatedClus
svc := dfc.NewDefaultService(ddc)
om := &svc.ObjectMeta
om.Name = ddc.GetFEServiceName()
om.Labels = dfc.newFESchedulerLabels(ddc.Namespace)
om.Labels = dfc.newFESchedulerLabels(ddc.Name)

spec := &svc.Spec
spec.Selector = dfc.newFEPodsSelector(ddc.Name)
Expand Down
Loading