From 090c1f0174bd6e5290575aa2aa93306c509aa3e5 Mon Sep 17 00:00:00 2001 From: jwcesign Date: Wed, 6 Dec 2023 13:44:59 +0800 Subject: [PATCH] feat: clean eps work of the provison cluster once the mcs work deleted Signed-off-by: jwcesign --- .../endpointslice_collect_controller.go | 64 ++--------------- .../multiclusterservice/mcs_controller.go | 72 ++++++++++++++++--- 2 files changed, 68 insertions(+), 68 deletions(-) diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 0e1292e5c7e4..ab4a283e5b28 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -31,13 +31,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -92,19 +90,17 @@ func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req cont return controllerruntime.Result{}, nil } + if !work.DeletionTimestamp.IsZero() { + // The Provision Clusters' EndpointSlice will be deleted by mcs_controller, let's just ignore it + return controllerruntime.Result{}, nil + } + clusterName, err := names.GetClusterName(work.Namespace) if err != nil { klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name) return controllerruntime.Result{Requeue: true}, err } - if !work.DeletionTimestamp.IsZero() { - if err := c.cleanWorkWithMCSDelete(work); err != nil { - return controllerruntime.Result{Requeue: true}, err - } - return controllerruntime.Result{}, nil - } - if err = c.buildResourceInformers(ctx, work, clusterName); err != nil { return controllerruntime.Result{Requeue: true}, err } @@ -122,49 +118,6 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime. For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c) } -func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alpha1.Work) error { - workList := &workv1alpha1.WorkList{} - if err := c.List(context.TODO(), workList, &client.ListOptions{ - Namespace: work.Namespace, - LabelSelector: labels.SelectorFromSet(labels.Set{ - util.MultiClusterServiceNameLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel), - util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel), - }), - }); err != nil { - klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err) - return err - } - - var errs []error - for _, work := range workList.Items { - if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) { - continue - } - // We only care about the EndpointSlice work in provision clusters - if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" { - continue - } - - if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil { - klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err) - errs = append(errs, err) - } - } - if err := utilerrors.NewAggregate(errs); err != nil { - return err - } - - if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) { - if err := c.Client.Update(context.Background(), work); err != nil { - klog.Errorf("Failed to remove finalizer %s for work %s/%s: %v", - util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err) - return err - } - } - - return nil -} - // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *EndpointSliceCollectController) RunWorkQueue() { workerOptions := util.Options{ @@ -210,13 +163,6 @@ func (c *EndpointSliceCollectController) buildResourceInformers(ctx context.Cont return err } - if controllerutil.AddFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) { - if err := c.Client.Update(ctx, work); err != nil { - klog.Errorf("Failed to add finalizer %s for work %s/%s: %v", util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err) - return err - } - } - return nil } diff --git a/pkg/controllers/multiclusterservice/mcs_controller.go b/pkg/controllers/multiclusterservice/mcs_controller.go index 5173bfd5be83..19c969dd9dde 100644 --- a/pkg/controllers/multiclusterservice/mcs_controller.go +++ b/pkg/controllers/multiclusterservice/mcs_controller.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/errors" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" @@ -46,6 +46,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" @@ -197,6 +198,11 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu continue } + if err := c.cleanProvisionEndpointSliceWork(work.DeepCopy()); err != nil { + klog.Errorf("Failed to clean provision EndpointSlice work(%s/%s):%v", work.Namespace, work.Name, err) + return err + } + if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) { klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err) return err @@ -207,6 +213,50 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu return nil } +func (c *MCSController) cleanProvisionEndpointSliceWork(work *workv1alpha1.Work) error { + workList := &workv1alpha1.WorkList{} + if err := c.List(context.TODO(), workList, &client.ListOptions{ + Namespace: work.Namespace, + LabelSelector: labels.SelectorFromSet(labels.Set{ + util.MultiClusterServiceNameLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel), + util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel), + }), + }); err != nil { + klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err) + return err + } + + var errs []error + for _, work := range workList.Items { + if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) { + continue + } + // This annotation is only added to the EndpointSlice work in consumption clusters' execution namespace + // Here we only care about the EndpointSlice work in provision clusters' execution namespace + if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" { + continue + } + + if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil { + klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err) + errs = append(errs, err) + } + } + if err := utilerrors.NewAggregate(errs); err != nil { + return err + } + + // TBD: This is needed because we add this finalizer in version 1.8.0, delete this in version 1.10.0 + if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) { + if err := c.Update(context.TODO(), work); err != nil { + klog.Errorf("Failed to update work(%s/%s), Error: %v", work.Namespace, work.Name, err) + return err + } + } + + return nil +} + func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error { klog.V(4).InfoS("Begin to handle MultiClusterService create or update event", "namespace", mcs.Namespace, "name", mcs.Name) @@ -261,7 +311,7 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ return err } - // 7. delete MultiClusterService work not in provision clusters + // 7. delete MultiClusterService work not in provision clusters and in the unready clusters if err = c.deleteMultiClusterServiceWork(mcs, false); err != nil { return err } @@ -320,8 +370,7 @@ func (c *MCSController) syncSVCWorkToClusters( svc *corev1.Service, ) (sets.Set[string], error) { syncClusters := sets.New[string]() - clusters := &clusterv1alpha1.ClusterList{} - err := c.Client.List(ctx, clusters) + clusters, err := util.GetClusterSet(c.Client) if err != nil { klog.ErrorS(err, "failed to list clusters") return syncClusters, err @@ -329,11 +378,11 @@ func (c *MCSController) syncSVCWorkToClusters( serverLocations := sets.New[string](mcs.Spec.ServiceProvisionClusters...) clientLocations := sets.New[string](mcs.Spec.ServiceConsumptionClusters...) - for _, cluster := range clusters.Items { + for clusterName := range clusters { // if ServerLocations or ClientLocations are empty, we will sync work to the all clusters if len(serverLocations) == 0 || len(clientLocations) == 0 || - serverLocations.Has(cluster.Name) || clientLocations.Has(cluster.Name) { - syncClusters.Insert(cluster.Name) + serverLocations.Has(clusterName) || clientLocations.Has(clusterName) { + syncClusters.Insert(clusterName) } } @@ -341,6 +390,7 @@ func (c *MCSController) syncSVCWorkToClusters( if err != nil { return syncClusters, err } + Unstructured := &unstructured.Unstructured{Object: svcObj} var errs []error for clusterName := range syncClusters { @@ -355,7 +405,11 @@ func (c *MCSController) syncSVCWorkToClusters( }, } - if err = helper.CreateOrUpdateWork(c, workMeta, &unstructured.Unstructured{Object: svcObj}); err != nil { + // Add these two label as for work status synchronization + util.MergeLabel(Unstructured, workv1alpha1.WorkNameLabel, names.GenerateMCSWorkName(svc.Kind, svc.Name, svc.Namespace, clusterName)) + util.MergeLabel(Unstructured, workv1alpha1.WorkNamespaceLabel, names.GenerateExecutionSpaceName(clusterName)) + + if err = helper.CreateOrUpdateWork(c, workMeta, Unstructured); err != nil { klog.Errorf("Failed to create or update resource(%v/%v) in the given member cluster %s, err is %v", workMeta.GetNamespace(), workMeta.GetName(), clusterName, err) c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceWorkFailed, fmt.Sprintf( @@ -365,7 +419,7 @@ func (c *MCSController) syncSVCWorkToClusters( } } if len(errs) != 0 { - return syncClusters, errors.NewAggregate(errs) + return syncClusters, utilerrors.NewAggregate(errs) } return syncClusters, nil