Skip to content

Commit

Permalink
Merge pull request #4360 from Rains6/dev-mcs-zhc
Browse files Browse the repository at this point in the history
feat: sync mcs work when cluster joined or unjoin
  • Loading branch information
karmada-bot authored Dec 5, 2023
2 parents 9c5b977 + 8277a98 commit 0b1681e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 19 deletions.
65 changes: 60 additions & 5 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req
return
}
_ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.")
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, "Service is propagated to target clusters.")
}()

if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil {
Expand Down Expand Up @@ -191,13 +192,13 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu
klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work)
continue
}
if !deleteAll && provisionClusters.Has(clusterName) {

if !deleteAll && provisionClusters.Has(clusterName) && c.IsClusterReady(clusterName) {
continue
}

if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s",
work.Namespace, work.Name, err)
klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err)
return err
}
}
Expand Down Expand Up @@ -275,13 +276,16 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ
}

func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs)
provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs)
if err != nil {
klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err)
return err
}

for clusterName := range provisionCluster {
for clusterName := range provisionClusters {
if !c.IsClusterReady(clusterName) {
continue
}
workMeta := metav1.ObjectMeta{
Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName),
Namespace: names.GenerateExecutionSpaceName(clusterName),
Expand Down Expand Up @@ -392,6 +396,16 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ
})
}

func (c *MCSController) IsClusterReady(clusterName string) bool {
cluster := &clusterv1alpha1.Cluster{}
if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: clusterName}, cluster); err != nil {
klog.ErrorS(err, "failed to get cluster object", "Name", clusterName)
return false
}

return util.IsClusterReady(&cluster.Status)
}

// SetupWithManager creates a controller and register to controller manager.
func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
mcsPredicateFunc := predicate.Funcs{
Expand Down Expand Up @@ -460,6 +474,47 @@ func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)).
Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)).
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.clusterMapFunc())).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}).
Complete(c)
}

func (c *MCSController) clusterMapFunc() handler.MapFunc {
return func(ctx context.Context, a client.Object) []reconcile.Request {
var clusterName string
switch t := a.(type) {
case *clusterv1alpha1.Cluster:
clusterName = t.Name
default:
return nil
}

klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName)
mcsList := &networkingv1alpha1.MultiClusterServiceList{}
if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil {
klog.Errorf("Failed to list MultiClusterService, error: %v", err)
return nil
}

var requests []reconcile.Request
for index := range mcsList.Items {
if !needSyncMultiClusterService(&mcsList.Items[index], clusterName) {
continue
}

requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mcsList.Items[index].Namespace,
Name: mcsList.Items[index].Name}})
}

return requests
}
}

func needSyncMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, clusterName string) bool {
if len(mcs.Spec.ServiceProvisionClusters) == 0 || len(mcs.Spec.ServiceConsumptionClusters) == 0 {
return true
}
clusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
clusters.Insert(mcs.Spec.ServiceConsumptionClusters...)
return clusters.Has(clusterName)
}
28 changes: 14 additions & 14 deletions pkg/util/helper/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,26 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster

func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...)
existClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
if len(provisionClusters) == 0 {
var err error
provisionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
return existClusters, nil
}
return provisionClusters, nil
return provisionClusters.Intersection(existClusters), nil
}

func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) {
consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...)
existClusters, err := util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
if len(consumptionClusters) == 0 {
var err error
consumptionClusters, err = util.GetClusterSet(client)
if err != nil {
klog.Errorf("Failed to get cluster set, Error: %v", err)
return nil, err
}
return existClusters, nil
}
return consumptionClusters, nil
return consumptionClusters.Intersection(existClusters), nil
}

0 comments on commit 0b1681e

Please sign in to comment.