Skip to content

Commit 3fb2c6a

Browse files
committed
RB suspension: sheduler and controller change
Signed-off-by: Monokaix <[email protected]>
1 parent 9718d08 commit 3fb2c6a

11 files changed

+562
-0
lines changed

pkg/apis/work/v1alpha2/binding_types.go

+3
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ const (
385385
// FullyApplied represents the condition that the resource referencing by ResourceBinding or ClusterResourceBinding
386386
// has been applied to all scheduled clusters.
387387
FullyApplied string = "FullyApplied"
388+
389+
// Suspended represents the condition that the ResourceBinding or ClusterResourceBinding is suspended to schedule.
390+
Suspended string = "Suspended"
388391
)
389392

390393
// These are reasons for a binding's transition to a Scheduled condition.

pkg/controllers/binding/binding_controller.go

+5
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ func (c *ResourceBindingController) Reconcile(ctx context.Context, req controlle
8989
return c.removeFinalizer(ctx, binding)
9090
}
9191

92+
if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, binding, apiextensionsv1.NamespaceScoped); err != nil {
93+
klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(binding), "type", workv1alpha2.Suspended)
94+
return controllerruntime.Result{}, err
95+
}
96+
9297
return c.syncBinding(ctx, binding)
9398
}
9499

pkg/controllers/binding/binding_controller_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,20 @@ import (
2222
"reflect"
2323
"testing"
2424

25+
"github.com/stretchr/testify/assert"
2526
appsv1 "k8s.io/api/apps/v1"
2627
corev1 "k8s.io/api/core/v1"
28+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2729
"k8s.io/apimachinery/pkg/api/meta"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2931
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/runtime/schema"
3033
"k8s.io/apimachinery/pkg/types"
34+
"k8s.io/apimachinery/pkg/util/uuid"
3135
fakedynamic "k8s.io/client-go/dynamic/fake"
3236
"k8s.io/client-go/kubernetes/scheme"
3337
"k8s.io/client-go/tools/record"
38+
"k8s.io/utils/ptr"
3439
controllerruntime "sigs.k8s.io/controller-runtime"
3540
"sigs.k8s.io/controller-runtime/pkg/client"
3641
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -39,6 +44,7 @@ import (
3944
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
4045
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
4146
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
47+
"github.com/karmada-io/karmada/pkg/events"
4248
testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing"
4349
"github.com/karmada-io/karmada/pkg/util"
4450
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
@@ -445,3 +451,108 @@ func TestResourceBindingController_removeFinalizer(t *testing.T) {
445451
})
446452
}
447453
}
454+
455+
func TestUpdateBindingDispatchingConditionIfNeeded(t *testing.T) {
456+
tests := []struct {
457+
name string
458+
binding *workv1alpha2.ResourceBinding
459+
expectedCondition metav1.Condition
460+
expectedEventCount int
461+
expectEventMessage string
462+
}{
463+
{
464+
name: "Binding scheduling is suspended",
465+
binding: newRb(true, metav1.Condition{}),
466+
expectedCondition: metav1.Condition{
467+
Type: workv1alpha2.Suspended,
468+
Status: metav1.ConditionTrue,
469+
},
470+
expectedEventCount: 1,
471+
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage),
472+
},
473+
{
474+
name: "Binding scheduling is not suspended",
475+
binding: newRb(false, metav1.Condition{
476+
Type: workv1alpha2.Suspended,
477+
Status: metav1.ConditionTrue,
478+
Reason: SuspendedSchedulingConditionReason,
479+
Message: SuspendedSchedulingConditionMessage,
480+
}),
481+
expectedCondition: metav1.Condition{
482+
Type: workv1alpha2.Suspended,
483+
Status: metav1.ConditionFalse,
484+
},
485+
expectedEventCount: 1,
486+
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage),
487+
},
488+
{
489+
name: "Condition already matches, no update needed",
490+
binding: newRb(true, metav1.Condition{
491+
Type: workv1alpha2.Suspended,
492+
Status: metav1.ConditionTrue,
493+
Reason: SuspendedSchedulingConditionReason,
494+
Message: SuspendedSchedulingConditionMessage,
495+
}),
496+
expectedCondition: metav1.Condition{
497+
Type: workv1alpha2.Suspended,
498+
Status: metav1.ConditionTrue,
499+
},
500+
},
501+
}
502+
503+
for _, tt := range tests {
504+
t.Run(tt.name, func(t *testing.T) {
505+
eventRecorder := record.NewFakeRecorder(1)
506+
c := newResourceBindingController(tt.binding, eventRecorder)
507+
508+
updatedBinding := &workv1alpha2.ResourceBinding{}
509+
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
510+
511+
err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.NamespaceScoped)
512+
if err != nil {
513+
t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err)
514+
}
515+
516+
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
517+
assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.binding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status))
518+
assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events))
519+
if tt.expectEventMessage != "" {
520+
e := <-eventRecorder.Events
521+
assert.Equal(t, tt.expectEventMessage, e)
522+
}
523+
})
524+
}
525+
}
526+
527+
func newResourceBindingController(binding *workv1alpha2.ResourceBinding, eventRecord record.EventRecorder) ResourceBindingController {
528+
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
529+
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build()
530+
return ResourceBindingController{
531+
Client: fakeClient,
532+
EventRecorder: eventRecord,
533+
}
534+
}
535+
536+
func newRb(suspended bool, condition metav1.Condition) *workv1alpha2.ResourceBinding {
537+
return &workv1alpha2.ResourceBinding{
538+
TypeMeta: metav1.TypeMeta{
539+
Kind: workv1alpha2.ResourceKindResourceBinding,
540+
APIVersion: workv1alpha2.GroupVersion.Version,
541+
},
542+
ObjectMeta: metav1.ObjectMeta{
543+
Name: "test-rb",
544+
Namespace: "default",
545+
UID: uuid.NewUUID(),
546+
},
547+
Spec: workv1alpha2.ResourceBindingSpec{
548+
Suspension: &policyv1alpha1.Suspension{
549+
Scheduling: ptr.To(suspended),
550+
},
551+
},
552+
Status: workv1alpha2.ResourceBindingStatus{
553+
Conditions: []metav1.Condition{
554+
condition,
555+
},
556+
},
557+
}
558+
}

pkg/controllers/binding/cluster_resource_binding_controller.go

+5
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ func (c *ClusterResourceBindingController) Reconcile(ctx context.Context, req co
8989
return c.removeFinalizer(ctx, clusterResourceBinding)
9090
}
9191

92+
if err := updateBindingDispatchingConditionIfNeeded(ctx, c.Client, c.EventRecorder, clusterResourceBinding, apiextensionsv1.ClusterScoped); err != nil {
93+
klog.ErrorS(err, "Failed to update binding condition.", "name", klog.KObj(clusterResourceBinding), "type", workv1alpha2.Suspended)
94+
return controllerruntime.Result{}, err
95+
}
96+
9297
return c.syncBinding(ctx, clusterResourceBinding)
9398
}
9499

pkg/controllers/binding/cluster_resource_binding_controller_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,20 @@ import (
2222
"reflect"
2323
"testing"
2424

25+
"github.com/stretchr/testify/assert"
2526
appsv1 "k8s.io/api/apps/v1"
2627
corev1 "k8s.io/api/core/v1"
28+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2729
"k8s.io/apimachinery/pkg/api/meta"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2931
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/runtime/schema"
3033
"k8s.io/apimachinery/pkg/types"
34+
"k8s.io/apimachinery/pkg/util/uuid"
3135
fakedynamic "k8s.io/client-go/dynamic/fake"
3236
"k8s.io/client-go/kubernetes/scheme"
3337
"k8s.io/client-go/tools/record"
38+
"k8s.io/utils/ptr"
3439
controllerruntime "sigs.k8s.io/controller-runtime"
3540
"sigs.k8s.io/controller-runtime/pkg/client"
3641
"sigs.k8s.io/controller-runtime/pkg/client/fake"
@@ -39,6 +44,7 @@ import (
3944
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
4045
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
4146
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
47+
"github.com/karmada-io/karmada/pkg/events"
4248
testing2 "github.com/karmada-io/karmada/pkg/search/proxy/testing"
4349
"github.com/karmada-io/karmada/pkg/util"
4450
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
@@ -439,3 +445,108 @@ func TestClusterResourceBindingController_newOverridePolicyFunc(t *testing.T) {
439445
})
440446
}
441447
}
448+
449+
func TestUpdateClusterBindingDispatchingConditionIfNeeded(t *testing.T) {
450+
tests := []struct {
451+
name string
452+
binding *workv1alpha2.ClusterResourceBinding
453+
expectedCondition metav1.Condition
454+
expectedEventCount int
455+
expectEventMessage string
456+
}{
457+
{
458+
name: "Binding scheduling is suspended",
459+
binding: newCrb(true, metav1.Condition{}),
460+
expectedCondition: metav1.Condition{
461+
Type: workv1alpha2.Suspended,
462+
Status: metav1.ConditionTrue,
463+
},
464+
expectedEventCount: 1,
465+
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SuspendedSchedulingConditionMessage),
466+
},
467+
{
468+
name: "Binding scheduling is not suspended",
469+
binding: newCrb(false, metav1.Condition{
470+
Type: workv1alpha2.Suspended,
471+
Status: metav1.ConditionTrue,
472+
Reason: SuspendedSchedulingConditionReason,
473+
Message: SuspendedSchedulingConditionMessage,
474+
}),
475+
expectedCondition: metav1.Condition{
476+
Type: workv1alpha2.Suspended,
477+
Status: metav1.ConditionFalse,
478+
},
479+
expectedEventCount: 1,
480+
expectEventMessage: fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, events.EventReasonBindingScheduling, SchedulingConditionMessage),
481+
},
482+
{
483+
name: "Condition already matches, no update needed",
484+
binding: newCrb(true, metav1.Condition{
485+
Type: workv1alpha2.Suspended,
486+
Status: metav1.ConditionTrue,
487+
Reason: SuspendedSchedulingConditionReason,
488+
Message: SuspendedSchedulingConditionMessage,
489+
}),
490+
expectedCondition: metav1.Condition{
491+
Type: workv1alpha2.Suspended,
492+
Status: metav1.ConditionTrue,
493+
},
494+
},
495+
}
496+
497+
for _, tt := range tests {
498+
t.Run(tt.name, func(t *testing.T) {
499+
eventRecorder := record.NewFakeRecorder(1)
500+
c := newClusterResourceBindingController(tt.binding, eventRecorder)
501+
502+
updatedBinding := &workv1alpha2.ClusterResourceBinding{}
503+
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
504+
505+
err := updateBindingDispatchingConditionIfNeeded(context.Background(), c.Client, c.EventRecorder, tt.binding, apiextensionsv1.ClusterScoped)
506+
if err != nil {
507+
t.Errorf("updateBindingDispatchingConditionIfNeeded() returned an error: %v", err)
508+
}
509+
510+
assert.NoError(t, c.Get(context.Background(), types.NamespacedName{Name: tt.binding.Name, Namespace: tt.binding.Namespace}, updatedBinding))
511+
assert.True(t, meta.IsStatusConditionPresentAndEqual(tt.binding.Status.Conditions, tt.expectedCondition.Type, tt.expectedCondition.Status))
512+
assert.Equal(t, tt.expectedEventCount, len(eventRecorder.Events))
513+
if tt.expectEventMessage != "" {
514+
e := <-eventRecorder.Events
515+
assert.Equal(t, tt.expectEventMessage, e)
516+
}
517+
})
518+
}
519+
}
520+
521+
func newClusterResourceBindingController(binding *workv1alpha2.ClusterResourceBinding, eventRecord record.EventRecorder) ClusterResourceBindingController {
522+
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
523+
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(binding).WithStatusSubresource(binding).WithRESTMapper(restMapper).Build()
524+
return ClusterResourceBindingController{
525+
Client: fakeClient,
526+
EventRecorder: eventRecord,
527+
}
528+
}
529+
530+
func newCrb(suspended bool, condition metav1.Condition) *workv1alpha2.ClusterResourceBinding {
531+
return &workv1alpha2.ClusterResourceBinding{
532+
TypeMeta: metav1.TypeMeta{
533+
Kind: workv1alpha2.ResourceKindResourceBinding,
534+
APIVersion: workv1alpha2.GroupVersion.Version,
535+
},
536+
ObjectMeta: metav1.ObjectMeta{
537+
Name: "test-rb",
538+
Namespace: "default",
539+
UID: uuid.NewUUID(),
540+
},
541+
Spec: workv1alpha2.ResourceBindingSpec{
542+
Suspension: &policyv1alpha1.Suspension{
543+
Scheduling: ptr.To(suspended),
544+
},
545+
},
546+
Status: workv1alpha2.ResourceBindingStatus{
547+
Conditions: []metav1.Condition{
548+
condition,
549+
},
550+
},
551+
}
552+
}

0 commit comments

Comments
 (0)