Skip to content

Commit af610cc

Browse files
committed
enhance ensureWork to continue processing clusters after individual failures
Signed-off-by: chaosi-zju <[email protected]>
1 parent ce41488 commit af610cc

File tree

1 file changed

+33
-20
lines changed

1 file changed

+33
-20
lines changed

Diff for: pkg/controllers/binding/common.go

+33-20
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26+
"k8s.io/apimachinery/pkg/util/errors"
2627
"k8s.io/apimachinery/pkg/util/sets"
2728
"k8s.io/klog/v2"
2829
"k8s.io/utils/ptr"
@@ -45,19 +46,8 @@ func ensureWork(
4546
ctx context.Context, c client.Client, resourceInterpreter resourceinterpreter.ResourceInterpreter, workload *unstructured.Unstructured,
4647
overrideManager overridemanager.OverrideManager, binding metav1.Object, scope apiextensionsv1.ResourceScope,
4748
) error {
48-
var targetClusters []workv1alpha2.TargetCluster
49-
var bindingSpec workv1alpha2.ResourceBindingSpec
50-
switch scope {
51-
case apiextensionsv1.NamespaceScoped:
52-
bindingObj := binding.(*workv1alpha2.ResourceBinding)
53-
bindingSpec = bindingObj.Spec
54-
case apiextensionsv1.ClusterScoped:
55-
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
56-
bindingSpec = bindingObj.Spec
57-
}
58-
59-
targetClusters = bindingSpec.Clusters
60-
targetClusters = mergeTargetClusters(targetClusters, bindingSpec.RequiredBy)
49+
bindingSpec := getBindingSpec(binding, scope)
50+
targetClusters := mergeTargetClusters(bindingSpec.Clusters, bindingSpec.RequiredBy)
6151

6252
var jobCompletions []workv1alpha2.TargetCluster
6353
var err error
@@ -68,6 +58,7 @@ func ensureWork(
6858
}
6959
}
7060

61+
var errs []error
7162
for i := range targetClusters {
7263
targetCluster := targetClusters[i]
7364
clonedWorkload := workload.DeepCopy()
@@ -82,7 +73,8 @@ func ensureWork(
8273
if err != nil {
8374
klog.Errorf("Failed to revise replica for %s/%s/%s in cluster %s, err is: %v",
8475
workload.GetKind(), workload.GetNamespace(), workload.GetName(), targetCluster.Name, err)
85-
return err
76+
errs = append(errs, err)
77+
continue
8678
}
8779
}
8880

@@ -94,25 +86,29 @@ func ensureWork(
9486
if err = helper.ApplyReplica(clonedWorkload, int64(jobCompletions[i].Replicas), util.CompletionsField); err != nil {
9587
klog.Errorf("Failed to apply Completions for %s/%s/%s in cluster %s, err is: %v",
9688
clonedWorkload.GetKind(), clonedWorkload.GetNamespace(), clonedWorkload.GetName(), targetCluster.Name, err)
97-
return err
89+
errs = append(errs, err)
90+
continue
9891
}
9992
}
10093
}
10194

10295
// We should call ApplyOverridePolicies last, as override rules have the highest priority
10396
cops, ops, err := overrideManager.ApplyOverridePolicies(clonedWorkload, targetCluster.Name)
10497
if err != nil {
105-
klog.Errorf("Failed to apply overrides for %s/%s/%s, err is: %v", clonedWorkload.GetKind(), clonedWorkload.GetNamespace(), clonedWorkload.GetName(), err)
106-
return err
98+
klog.Errorf("Failed to apply overrides for %s/%s/%s in cluster %s, err is: %v",
99+
clonedWorkload.GetKind(), clonedWorkload.GetNamespace(), clonedWorkload.GetName(), targetCluster.Name, err)
100+
errs = append(errs, err)
101+
continue
107102
}
108103
workLabel := mergeLabel(clonedWorkload, binding, scope)
109104

110105
annotations := mergeAnnotations(clonedWorkload, binding, scope)
111106
annotations = mergeConflictResolution(clonedWorkload, bindingSpec.ConflictResolution, annotations)
112107
annotations, err = RecordAppliedOverrides(cops, ops, annotations)
113108
if err != nil {
114-
klog.Errorf("Failed to record appliedOverrides, Error: %v", err)
115-
return err
109+
klog.Errorf("Failed to record appliedOverrides in cluster %s, Error: %v", targetCluster.Name, err)
110+
errs = append(errs, err)
111+
continue
116112
}
117113

118114
if features.FeatureGate.Enabled(features.StatefulFailoverInjection) {
@@ -137,12 +133,29 @@ func ensureWork(
137133
ctrlutil.WithSuspendDispatching(shouldSuspendDispatching(bindingSpec.Suspension, targetCluster)),
138134
ctrlutil.WithPreserveResourcesOnDeletion(ptr.Deref(bindingSpec.PreserveResourcesOnDeletion, false)),
139135
); err != nil {
140-
return err
136+
errs = append(errs, err)
137+
continue
141138
}
142139
}
140+
if len(errs) > 0 {
141+
return errors.NewAggregate(errs)
142+
}
143143
return nil
144144
}
145145

146+
func getBindingSpec(binding metav1.Object, scope apiextensionsv1.ResourceScope) workv1alpha2.ResourceBindingSpec {
147+
var bindingSpec workv1alpha2.ResourceBindingSpec
148+
switch scope {
149+
case apiextensionsv1.NamespaceScoped:
150+
bindingObj := binding.(*workv1alpha2.ResourceBinding)
151+
bindingSpec = bindingObj.Spec
152+
case apiextensionsv1.ClusterScoped:
153+
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
154+
bindingSpec = bindingObj.Spec
155+
}
156+
return bindingSpec
157+
}
158+
146159
// injectReservedLabelState injects the reservedLabelState in to the failover to cluster.
147160
// We have the following restrictions on whether to perform injection operations:
148161
// 1. Only the scenario where an application is deployed in one cluster and migrated to

0 commit comments

Comments
 (0)