Skip to content

Commit

Permalink
Merge pull request #4343 from jwcesign/clean-eps-when-mcs-changes
Browse files Browse the repository at this point in the history
feat: clean the endpointslice if the current cluster is not in the mcs consumption clusters
  • Loading branch information
karmada-bot authored Nov 30, 2023
2 parents 8ef8911 + ffed3f3 commit 68d937e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alph
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
continue
}
// We only care about the EndpointSlice work in provision clusters
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
continue
}

if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil {
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
errs = append(errs, err)
Expand Down Expand Up @@ -334,7 +339,7 @@ func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceK
return err
}

if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
return nil
}

Expand Down Expand Up @@ -393,7 +398,7 @@ func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1
klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err)
return err
}
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue {
if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceDispatchControllerLabelValue {
continue
}
epsUnstructured, err := helper.ToUnstructured(eps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package multiclusterservice

import (
"context"
"strconv"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
Expand All @@ -36,7 +36,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
Expand Down Expand Up @@ -79,7 +81,7 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con

if !work.DeletionTimestamp.IsZero() {
if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil {
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s", work.Namespace, work.Name)
klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s:%v", work.Namespace, work.Name, err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
Expand Down Expand Up @@ -107,6 +109,10 @@ func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req con
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonDispatchEndpointSliceSucceed, "EndpointSlice are synced successfully")
}()

if err = c.cleanOrphanDispatchedEndpointSlice(ctx, mcs); err != nil {
return controllerruntime.Result{Requeue: true}, err
}

if err = c.syncEndpointSlice(ctx, work.DeepCopy(), mcs); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
Expand Down Expand Up @@ -143,19 +149,104 @@ func (c *EndpointsliceDispatchController) updateEndpointSliceSynced(mcs *network
func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime.Manager) error {
workPredicateFun := predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
// We only care about the EndpointSlice work from provision clusters
return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != "" &&
util.GetAnnotationValue(createEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != ""
// We only care about the EndpointSlice work from provision clusters
return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != "" &&
util.GetAnnotationValue(updateEvent.ObjectNew.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != ""
// We only care about the EndpointSlice work from provision clusters
return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != "" &&
util.GetAnnotationValue(deleteEvent.Object.GetAnnotations(), util.EndpointSliceProvisionClusterAnnotation) == ""
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
},
}
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).
Watches(&networkingv1alpha1.MultiClusterService{}, handler.EnqueueRequestsFromMapFunc(c.newMultiClusterServiceFunc())).
Complete(c)
}

func (c *EndpointsliceDispatchController) newMultiClusterServiceFunc() handler.MapFunc {
return func(ctx context.Context, a client.Object) []reconcile.Request {
var mcsName, mcsNamespace string
switch t := a.(type) {
case *networkingv1alpha1.MultiClusterService:
mcsNamespace = t.Namespace
mcsName = t.Name
default:
return nil
}

workList := &workv1alpha1.WorkList{}
if err := c.Client.List(context.TODO(), workList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
util.ServiceNameLabel: mcsName,
util.ServiceNamespaceLabel: mcsNamespace,
}),
}); err != nil {
klog.Errorf("Failed to list work, error: %v", err)
return nil
}

var requests []reconcile.Request
for _, work := range workList.Items {
// We only care about the EndpointSlice work from provision clusters
if util.GetLabelValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
continue
}

requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: work.Namespace, Name: work.Name}})
}
return requests
}
}

func (c *EndpointsliceDispatchController) cleanOrphanDispatchedEndpointSlice(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
workList := &workv1alpha1.WorkList{}
if err := c.Client.List(ctx, workList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
util.ServiceNameLabel: mcs.Name,
util.ServiceNamespaceLabel: mcs.Namespace,
})}); err != nil {
klog.Errorf("Failed to list works, error is: %v", err)
return err
}

for _, work := range workList.Items {
// We only care about the EndpointSlice work in consumption clusters
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) == "" {
continue
}

consumptionClusters, err := helper.GetConsumptionClustres(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get consumption clusters, error is: %v", err)
return err
}

cluster, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
return err
}

if consumptionClusters.Has(cluster) {
continue
}

if err := c.Client.Delete(ctx, work.DeepCopy()); err != nil {
klog.Errorf("Failed to delete work %s/%s, error is: %v", work.Namespace, work.Name, err)
return err
}
}

return nil
}

func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error {
Expand Down Expand Up @@ -205,12 +296,11 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context,
workv1alpha1.WorkNamespaceLabel: clusterNamespace,
workv1alpha1.WorkNameLabel: work.Name,
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
discoveryv1.LabelManagedBy: util.EndpointSliceControllerLabelValue,
discoveryv1.LabelManagedBy: util.EndpointSliceDispatchControllerLabelValue,
}
endpointSlice.Annotations = map[string]string{
// This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
util.EndPointSliceProvisionGenerationAnnotation: strconv.FormatInt(endpointSlice.Generation, 10),
util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster,
}

workMeta := metav1.ObjectMeta{
Expand All @@ -222,6 +312,8 @@ func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context,
},
Labels: map[string]string{
util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue,
util.ServiceNameLabel: mcs.Name,
util.ServiceNamespaceLabel: mcs.Namespace,
},
}
unstructuredEPS, err := helper.ToUnstructured(endpointSlice)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const (
// ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers.
ManagedByKarmadaLabelValue = "true"

// EndpointSliceControllerLabelValue indicates the endpointSlice are controlled by karmada-mcs-endpointslice-controller
EndpointSliceControllerLabelValue = "karmada-mcs-endpointslice-controller"
// EndpointSliceDispatchControllerLabelValue indicates the endpointSlice are controlled by Karmada
EndpointSliceDispatchControllerLabelValue = "endpointslice-dispatch-controller.karmada.io"

// RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g:
// resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/helper/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,16 @@ func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClu
}
return provisionClusters, nil
}

func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
if len(consumptionClusters) == 0 {
var err error
consumptionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
}
return consumptionClusters, nil
}

0 comments on commit 68d937e

Please sign in to comment.