diff --git a/pkg/controllers/multiclusterservice/mcs_controller.go b/pkg/controllers/multiclusterservice/mcs_controller.go index 200d625ca120..5173bfd5be83 100644 --- a/pkg/controllers/multiclusterservice/mcs_controller.go +++ b/pkg/controllers/multiclusterservice/mcs_controller.go @@ -95,6 +95,7 @@ func (c *MCSController) Reconcile(ctx context.Context, req controllerruntime.Req return } _ = c.updateMCSStatus(mcs, metav1.ConditionTrue, "ServiceAppliedSucceed", "Service is propagated to target clusters.") + c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonSyncWorkSucceed, "Service is propagated to target clusters.") }() if err = c.handleMCSCreateOrUpdate(ctx, mcs.DeepCopy()); err != nil { @@ -191,13 +192,13 @@ func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.Mu klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work) continue } - if !deleteAll && provisionClusters.Has(clusterName) { + + if !deleteAll && provisionClusters.Has(clusterName) && c.IsClusterReady(clusterName) { continue } if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) { - klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s", - work.Namespace, work.Name, err) + klog.Errorf("Error while deleting work(%s/%s): %v", work.Namespace, work.Name, err) return err } } @@ -275,13 +276,16 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ } func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error { - provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs) + provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs) if err != nil { klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err) return err } - for clusterName := range provisionCluster { + for clusterName := range provisionClusters { + if !c.IsClusterReady(clusterName) { + continue + } workMeta := metav1.ObjectMeta{ Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName), Namespace: names.GenerateExecutionSpaceName(clusterName), @@ -392,6 +396,16 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ }) } +func (c *MCSController) IsClusterReady(clusterName string) bool { + cluster := &clusterv1alpha1.Cluster{} + if err := c.Client.Get(context.TODO(), types.NamespacedName{Name: clusterName}, cluster); err != nil { + klog.ErrorS(err, "failed to get cluster object", "Name", clusterName) + return false + } + + return util.IsClusterReady(&cluster.Status) +} + // SetupWithManager creates a controller and register to controller manager. func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error { mcsPredicateFunc := predicate.Funcs{ @@ -460,6 +474,47 @@ func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). For(&networkingv1alpha1.MultiClusterService{}, builder.WithPredicates(mcsPredicateFunc)). Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(svcMapFunc), builder.WithPredicates(svcPredicateFunc)). + Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.clusterMapFunc())). WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions)}). Complete(c) } + +func (c *MCSController) clusterMapFunc() handler.MapFunc { + return func(ctx context.Context, a client.Object) []reconcile.Request { + var clusterName string + switch t := a.(type) { + case *clusterv1alpha1.Cluster: + clusterName = t.Name + default: + return nil + } + + klog.V(4).Infof("Begin to sync mcs with cluster %s.", clusterName) + mcsList := &networkingv1alpha1.MultiClusterServiceList{} + if err := c.Client.List(context.TODO(), mcsList, &client.ListOptions{}); err != nil { + klog.Errorf("Failed to list MultiClusterService, error: %v", err) + return nil + } + + var requests []reconcile.Request + for index := range mcsList.Items { + if !needSyncMultiClusterService(&mcsList.Items[index], clusterName) { + continue + } + + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: mcsList.Items[index].Namespace, + Name: mcsList.Items[index].Name}}) + } + + return requests + } +} + +func needSyncMultiClusterService(mcs *networkingv1alpha1.MultiClusterService, clusterName string) bool { + if len(mcs.Spec.ServiceProvisionClusters) == 0 || len(mcs.Spec.ServiceConsumptionClusters) == 0 { + return true + } + clusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...) + clusters.Insert(mcs.Spec.ServiceConsumptionClusters...) + return clusters.Has(clusterName) +} diff --git a/pkg/util/helper/mcs.go b/pkg/util/helper/mcs.go index c20e293e7354..be112ec3c268 100644 --- a/pkg/util/helper/mcs.go +++ b/pkg/util/helper/mcs.go @@ -108,26 +108,26 @@ func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiCluster func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) { provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...) + existClusters, err := util.GetClusterSet(client) + if err != nil { + klog.Errorf("Failed to get cluster set, Error: %v", err) + return nil, err + } if len(provisionClusters) == 0 { - var err error - provisionClusters, err = util.GetClusterSet(client) - if err != nil { - klog.Errorf("Failed to get cluster set, Error: %v", err) - return nil, err - } + return existClusters, nil } - return provisionClusters, nil + return provisionClusters.Intersection(existClusters), nil } func GetConsumptionClustres(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) { consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...) + existClusters, err := util.GetClusterSet(client) + if err != nil { + klog.Errorf("Failed to get cluster set, Error: %v", err) + return nil, err + } if len(consumptionClusters) == 0 { - var err error - consumptionClusters, err = util.GetClusterSet(client) - if err != nil { - klog.Errorf("Failed to get cluster set, Error: %v", err) - return nil, err - } + return existClusters, nil } - return consumptionClusters, nil + return consumptionClusters.Intersection(existClusters), nil }