Skip to content

Commit

Permalink
Merge pull request #4383 from jwcesign/clean-eps-pull-cluster
Browse files Browse the repository at this point in the history
feat: clean eps work of the provison cluster when pull clusters get unhealthy
  • Loading branch information
karmada-bot authored Dec 7, 2023
2 parents 58b1790 + 090c1f0 commit cb1a066
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand Down Expand Up @@ -92,19 +90,17 @@ func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req cont
return controllerruntime.Result{}, nil
}

if !work.DeletionTimestamp.IsZero() {
// The Provision Clusters' EndpointSlice will be deleted by mcs_controller, let's just ignore it
return controllerruntime.Result{}, nil
}

clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name)
return controllerruntime.Result{Requeue: true}, err
}

if !work.DeletionTimestamp.IsZero() {
if err := c.cleanWorkWithMCSDelete(work); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}

if err = c.buildResourceInformers(ctx, work, clusterName); err != nil {
return controllerruntime.Result{Requeue: true}, err
}
Expand All @@ -122,49 +118,6 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.
For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c)
}

func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alpha1.Work) error {
workList := &workv1alpha1.WorkList{}
if err := c.List(context.TODO(), workList, &client.ListOptions{
Namespace: work.Namespace,
LabelSelector: labels.SelectorFromSet(labels.Set{
util.MultiClusterServiceNameLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel),
util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel),
}),
}); err != nil {
klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err)
return err
}

var errs []error
for _, work := range workList.Items {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
continue
}
// We only care about the EndpointSlice work in provision clusters
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
continue
}

if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil {
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
if err := utilerrors.NewAggregate(errs); err != nil {
return err
}

if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) {
if err := c.Client.Update(context.Background(), work); err != nil {
klog.Errorf("Failed to remove finalizer %s for work %s/%s: %v",
util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}

return nil
}

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
Expand Down Expand Up @@ -210,13 +163,6 @@ func (c *EndpointSliceCollectController) buildResourceInformers(ctx context.Cont
return err
}

if controllerutil.AddFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) {
if err := c.Client.Update(ctx, work); err != nil {
klog.Errorf("Failed to add finalizer %s for work %s/%s: %v", util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err)
return err
}
}

return nil
}

Expand Down
72 changes: 63 additions & 9 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
Expand All @@ -46,6 +46,7 @@ import (

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
Expand Down Expand Up @@ -197,6 +198,11 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
continue
}

if err := c.cleanProvisionEndpointSliceWork(work.DeepCopy()); err != nil {
klog.Errorf("Failed to clean provision EndpointSlice work(%s/%s):%v", work.Namespace, work.Name, err)
return err
}

if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
return err
Expand All @@ -207,6 +213,50 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
return nil
}

func (c *MCSController) cleanProvisionEndpointSliceWork(work *workv1alpha1.Work) error {
workList := &workv1alpha1.WorkList{}
if err := c.List(context.TODO(), workList, &client.ListOptions{
Namespace: work.Namespace,
LabelSelector: labels.SelectorFromSet(labels.Set{
util.MultiClusterServiceNameLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNameLabel),
util.MultiClusterServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.MultiClusterServiceNamespaceLabel),
}),
}); err != nil {
klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err)
return err
}

var errs []error
for _, work := range workList.Items {
if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) {
continue
}
// This annotation is only added to the EndpointSlice work in consumption clusters' execution namespace
// Here we only care about the EndpointSlice work in provision clusters' execution namespace
if util.GetAnnotationValue(work.Annotations, util.EndpointSliceProvisionClusterAnnotation) != "" {
continue
}

if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil {
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
if err := utilerrors.NewAggregate(errs); err != nil {
return err
}

// TBD: This is needed because we add this finalizer in version 1.8.0, delete this in version 1.10.0
if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) {
if err := c.Update(context.TODO(), work); err != nil {
klog.Errorf("Failed to update work(%s/%s), Error: %v", work.Namespace, work.Name, err)
return err
}
}

return nil
}

func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
klog.V(4).InfoS("Begin to handle MultiClusterService create or update event",
"namespace", mcs.Namespace, "name", mcs.Name)
Expand Down Expand Up @@ -261,7 +311,7 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
return err
}

// 7. delete MultiClusterService work not in provision clusters
// 7. delete MultiClusterService work not in provision clusters and in the unready clusters
if err = c.deleteMultiClusterServiceWork(mcs, false); err != nil {
return err
}
Expand Down Expand Up @@ -320,27 +370,27 @@ func (c *MCSController) syncSVCWorkToClusters(
svc *corev1.Service,
) (sets.Set[string], error) {
syncClusters := sets.New[string]()
clusters := &clusterv1alpha1.ClusterList{}
err := c.Client.List(ctx, clusters)
clusters, err := util.GetClusterSet(c.Client)
if err != nil {
klog.ErrorS(err, "failed to list clusters")
return syncClusters, err
}

serverLocations := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
clientLocations := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
for _, cluster := range clusters.Items {
for clusterName := range clusters {
// if ServerLocations or ClientLocations are empty, we will sync work to the all clusters
if len(serverLocations) == 0 || len(clientLocations) == 0 ||
serverLocations.Has(cluster.Name) || clientLocations.Has(cluster.Name) {
syncClusters.Insert(cluster.Name)
serverLocations.Has(clusterName) || clientLocations.Has(clusterName) {
syncClusters.Insert(clusterName)
}
}

svcObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(svc)
if err != nil {
return syncClusters, err
}
Unstructured := &unstructured.Unstructured{Object: svcObj}

var errs []error
for clusterName := range syncClusters {
Expand All @@ -355,7 +405,11 @@ func (c *MCSController) syncSVCWorkToClusters(
},
}

if err = helper.CreateOrUpdateWork(c, workMeta, &unstructured.Unstructured{Object: svcObj}); err != nil {
// Add these two label as for work status synchronization
util.MergeLabel(Unstructured, workv1alpha1.WorkNameLabel, names.GenerateMCSWorkName(svc.Kind, svc.Name, svc.Namespace, clusterName))
util.MergeLabel(Unstructured, workv1alpha1.WorkNamespaceLabel, names.GenerateExecutionSpaceName(clusterName))

if err = helper.CreateOrUpdateWork(c, workMeta, Unstructured); err != nil {
klog.Errorf("Failed to create or update resource(%v/%v) in the given member cluster %s, err is %v",
workMeta.GetNamespace(), workMeta.GetName(), clusterName, err)
c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceWorkFailed, fmt.Sprintf(
Expand All @@ -365,7 +419,7 @@ func (c *MCSController) syncSVCWorkToClusters(
}
}
if len(errs) != 0 {
return syncClusters, errors.NewAggregate(errs)
return syncClusters, utilerrors.NewAggregate(errs)
}

return syncClusters, nil
Expand Down

0 comments on commit cb1a066

Please sign in to comment.