From ffed3f3b330c6e0a71fc4b9ea8e10f68879d36f5 Mon Sep 17 00:00:00 2001 From: jwcesign Date: Thu, 30 Nov 2023 20:10:12 +0800 Subject: [PATCH] feat: clean the endpointslice if the current cluster is not in the mcs consumption clusters Signed-off-by: jwcesign --- .../endpointslice_collect_controller.go | 9 +- .../endpointslice_dispatch_controller.go | 110 ++++++++++++++++-- pkg/util/constants.go | 4 +- pkg/util/helper/mcs.go | 13 +++ 4 files changed, 123 insertions(+), 13 deletions(-) diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 9bd24b56942b..c79d430bce26 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -140,6 +140,11 @@ func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alph if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) { continue } + // We only care about the EndpointSlice work in provision clusters + if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" { + continue + } + if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil { klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err) errs = append(errs, err) @@ -334,7 +339,7 @@ func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceK return err } - if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue { + if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue { return nil } @@ -393,7 +398,7 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1 klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err) return err } - if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue { + if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue { continue } epsUnstructured, err := helper.ToUnstructured(eps) diff --git a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go index c6c3ac05515b..ed16c0dc9273 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go @@ -18,7 +18,6 @@ package multiclusterservice import ( "context" - "strconv" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" @@ -26,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" @@ -36,7 +36,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" @@ -79,7 +81,7 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con if !work.DeletionTimestamp.IsZero() { if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil { - klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s", work.Namespace, work.Name) + klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s:%v", work.Namespace, work.Name, err) return controllerruntime.Result{Requeue: true}, err } return controllerruntime.Result{}, nil @@ -107,6 +109,10 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonDispatchEndpointSliceSucceed, "EndpointSlice are synced successfully") }() + if err = c.cleanOrphanDispatchedEndpointSlice(ctx, mcs); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + if err = c.syncEndpointSlice(ctx, work.DeepCopy(), mcs); err != nil { return controllerruntime.Result{Requeue: true}, err } @@ -143,19 +149,104 @@ func (c *EndpointsliceDispatchController) updateEndpointSliceSynced(mcs *network func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime.Manager) error { workPredicateFun := predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { - return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != "" + // We only care about the EndpointSlice work from provision clusters + return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != "" && + util.GetAnnotationValue(createEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == "" }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != "" + // We only care about the EndpointSlice work from provision clusters + return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != "" && + util.GetAnnotationValue(updateEvent.ObjectNew.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == "" }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != "" + // We only care about the EndpointSlice work from provision clusters + return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != "" && + util.GetAnnotationValue(deleteEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == "" }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false }, } - return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)). + Watches(&networkingv1alpha1.MultiClusterService{}, handler.EnqueueRequestsFromMapFunc(c.newMultiClusterServiceFunc())). + Complete(c) +} + +func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.MapFunc { + return func(ctx context.Context, a client.Object) []reconcile.Request { + var mcsName, mcsNamespace string + switch t := a.(type) { + case *networkingv1alpha1.MultiClusterService: + mcsNamespace = t.Namespace + mcsName = t.Name + default: + return nil + } + + workList := &workv1alpha1.WorkList{} + if err := c.Client.List(context.TODO(), workList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + util.ServiceNameLabel: mcsName, + util.ServiceNamespaceLabel: mcsNamespace, + }), + }); err != nil { + klog.Errorf("Failed to list work, error: %v", err) + return nil + } + + var requests []reconcile.Request + for _, work := range workList.Items { + // We only care about the EndpointSlice work from provision clusters + if util.GetLabelValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" { + continue + } + + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: work.Namespace, Name: work.Name}}) + } + return requests + } +} + +func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error { + workList := &workv1alpha1.WorkList{} + if err := c.Client.List(ctx, workList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{ + util.ServiceNameLabel: mcs.Name, + util.ServiceNamespaceLabel: mcs.Namespace, + })}); err != nil { + klog.Errorf("Failed to list works, error is: %v", err) + return err + } + + for _, work := range workList.Items { + // We only care about the EndpointSlice work in consumption clusters + if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) == "" { + continue + } + + consumptionClusters, err := helper.GetConsumptionClustres(c.Client, mcs) + if err != nil { + klog.Errorf("Failed to get consumption clusters, error is: %v", err) + return err + } + + cluster, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name) + return err + } + + if consumptionClusters.Has(cluster) { + continue + } + + if err := c.Client.Delete(ctx, work.DeepCopy()); err != nil { + klog.Errorf("Failed to delete work %s/%s, error is: %v", work.Namespace, work.Name, err) + return err + } + } + + return nil } func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error { @@ -205,12 +296,11 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, workv1alpha1.WorkNamespaceLabel: clusterNamespace, workv1alpha1.WorkNameLabel: work.Name, util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, - discoveryv1.LabelManagedBy: util.EndpointSliceControllerLabelValue, + discoveryv1.LabelManagedBy: util.EndpointSliceDispatchControllerLabelValue, } endpointSlice.Annotations = map[string]string{ // This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version - util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster, - util.EndPointSliceProvisionGenerationAnnotation: strconv.FormatInt(endpointSlice.Generation, 10), + util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster, } workMeta := metav1.ObjectMeta{ @@ -222,6 +312,8 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, }, Labels: map[string]string{ util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, + util.ServiceNameLabel: mcs.Name, + util.ServiceNamespaceLabel: mcs.Namespace, }, } unstructuredEPS, err := helper.ToUnstructured(endpointSlice) diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 71dfb1fc37a1..f0e9f98721a6 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -51,8 +51,8 @@ const ( // ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers. ManagedByKarmadaLabelValue = "true" - // EndpointSliceControllerLabelValue indicates the endpointSlice are controlled by karmada-mcs-endpointslice-controller - EndpointSliceControllerLabelValue = "karmada-mcs-endpointslice-controller" + // EndpointSliceDispatchControllerLabelValue indicates the endpointSlice are controlled by Karmada + EndpointSliceDispatchControllerLabelValue = "endpointslice-dispatch-controller.karmada.io" // RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g: // resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain diff --git a/pkg/util/helper/mcs.go b/pkg/util/helper/mcs.go index aeb627fe7d79..c20e293e7354 100644 --- a/pkg/util/helper/mcs.go +++ b/pkg/util/helper/mcs.go @@ -118,3 +118,16 @@ func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClu } return provisionClusters, nil } + +func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) { + consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...) + if len(consumptionClusters) == 0 { + var err error + consumptionClusters, err = util.GetClusterSet(client) + if err != nil { + klog.Errorf("Failed to get cluster set, Error: %v", err) + return nil, err + } + } + return consumptionClusters, nil +}