diff --git a/apis/config/scheme/scheme_test.go b/apis/config/scheme/scheme_test.go index 8b13e166f..b29fc65c1 100644 --- a/apis/config/scheme/scheme_test.go +++ b/apis/config/scheme/scheme_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults" "sigs.k8s.io/scheduler-plugins/apis/config" - "sigs.k8s.io/scheduler-plugins/apis/config/v1" + v1 "sigs.k8s.io/scheduler-plugins/apis/config/v1" "sigs.k8s.io/scheduler-plugins/apis/config/v1beta2" "sigs.k8s.io/scheduler-plugins/apis/config/v1beta3" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" @@ -67,6 +67,7 @@ profiles: - name: Coscheduling args: permitWaitingTimeSeconds: 10 + podGroupBackoffSeconds: 0 deniedPGExpirationTimeSeconds: 3 - name: NodeResourcesAllocatable args: @@ -473,6 +474,7 @@ kind: KubeSchedulerConfiguration profiles: - schedulerName: scheduler-plugins pluginConfig: + - name: Coscheduling # Test argument defaulting logic - name: TopologicalSort args: namespaces: @@ -489,6 +491,12 @@ profiles: SchedulerName: "scheduler-plugins", Plugins: defaults.PluginsV1, PluginConfig: []schedconfig.PluginConfig{ + { + Name: coscheduling.Name, + Args: &config.CoschedulingArgs{ + PermitWaitingTimeSeconds: 60, + }, + }, { Name: topologicalsort.Name, Args: &config.TopologicalSortArgs{ @@ -729,6 +737,7 @@ profiles: apiVersion: kubescheduler.config.k8s.io/v1beta2 kind: CoschedulingArgs permitWaitingTimeSeconds: 10 + podGroupBackoffSeconds: 0 name: Coscheduling - args: apiVersion: kubescheduler.config.k8s.io/v1beta2 @@ -782,6 +791,7 @@ profiles: Name: coscheduling.Name, Args: &config.CoschedulingArgs{ PermitWaitingTimeSeconds: 10, + PodGroupBackoffSeconds: 20, }, }, { @@ -886,6 +896,7 @@ profiles: apiVersion: kubescheduler.config.k8s.io/v1beta3 kind: CoschedulingArgs permitWaitingTimeSeconds: 10 + podGroupBackoffSeconds: 20 name: Coscheduling - args: apiVersion: kubescheduler.config.k8s.io/v1beta3 @@ -1071,6 +1082,7 @@ profiles: apiVersion: kubescheduler.config.k8s.io/v1 kind: CoschedulingArgs permitWaitingTimeSeconds: 10 + podGroupBackoffSeconds: 0 name: Coscheduling - args: apiVersion: kubescheduler.config.k8s.io/v1 diff --git a/apis/config/types.go b/apis/config/types.go index df9884d84..959c80fd8 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -30,6 +30,8 @@ type CoschedulingArgs struct { // PermitWaitingTimeSeconds is the waiting timeout in seconds. PermitWaitingTimeSeconds int64 + // PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again. + PodGroupBackoffSeconds int64 } // ModeType is a "string" type. diff --git a/apis/config/v1/defaults.go b/apis/config/v1/defaults.go index a876c4ef4..6f1b4df5b 100644 --- a/apis/config/v1/defaults.go +++ b/apis/config/v1/defaults.go @@ -28,6 +28,7 @@ import ( var ( defaultPermitWaitingTimeSeconds int64 = 60 + defaultPodGroupBackoffSeconds int64 = 0 defaultNodeResourcesAllocatableMode = Least @@ -100,6 +101,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) { if obj.PermitWaitingTimeSeconds == nil { obj.PermitWaitingTimeSeconds = &defaultPermitWaitingTimeSeconds } + if obj.PodGroupBackoffSeconds == nil { + obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds + } } // SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable. diff --git a/apis/config/v1/defaults_test.go b/apis/config/v1/defaults_test.go index 39318201e..f964eab8d 100644 --- a/apis/config/v1/defaults_test.go +++ b/apis/config/v1/defaults_test.go @@ -40,15 +40,18 @@ func TestSchedulingDefaults(t *testing.T) { config: &CoschedulingArgs{}, expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(0), }, }, { name: "set non default CoschedulingArgs", config: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, }, { diff --git a/apis/config/v1/types.go b/apis/config/v1/types.go index 3f89288fd..a0e92d4bb 100644 --- a/apis/config/v1/types.go +++ b/apis/config/v1/types.go @@ -30,6 +30,8 @@ type CoschedulingArgs struct { // PermitWaitingTimeSeconds is the waiting timeout in seconds. PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"` + // PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again. + PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"` } // ModeType is a type "string". diff --git a/apis/config/v1/zz_generated.conversion.go b/apis/config/v1/zz_generated.conversion.go index a56c6b4cf..4c8388b74 100644 --- a/apis/config/v1/zz_generated.conversion.go +++ b/apis/config/v1/zz_generated.conversion.go @@ -177,6 +177,9 @@ func autoConvert_v1_CoschedulingArgs_To_config_CoschedulingArgs(in *Coscheduling if err := metav1.Convert_Pointer_int64_To_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := metav1.Convert_Pointer_int64_To_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } return nil } @@ -189,6 +192,9 @@ func autoConvert_config_CoschedulingArgs_To_v1_CoschedulingArgs(in *config.Cosch if err := metav1.Convert_int64_To_Pointer_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := metav1.Convert_int64_To_Pointer_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } return nil } diff --git a/apis/config/v1/zz_generated.deepcopy.go b/apis/config/v1/zz_generated.deepcopy.go index 0eace5b01..be4875698 100644 --- a/apis/config/v1/zz_generated.deepcopy.go +++ b/apis/config/v1/zz_generated.deepcopy.go @@ -36,6 +36,11 @@ func (in *CoschedulingArgs) DeepCopyInto(out *CoschedulingArgs) { *out = new(int64) **out = **in } + if in.PodGroupBackoffSeconds != nil { + in, out := &in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds + *out = new(int64) + **out = **in + } return } diff --git a/apis/config/v1beta2/defaults.go b/apis/config/v1beta2/defaults.go index 37bbf7ee0..74efd1bca 100644 --- a/apis/config/v1beta2/defaults.go +++ b/apis/config/v1beta2/defaults.go @@ -27,6 +27,7 @@ import ( var ( defaultPermitWaitingTimeSeconds int64 = 60 + defaultPodGroupBackoffSeconds int64 = 0 defaultDeniedPGExpirationTimeSeconds int64 = 20 defaultNodeResourcesAllocatableMode = Least @@ -85,6 +86,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) { if obj.DeniedPGExpirationTimeSeconds == nil { obj.DeniedPGExpirationTimeSeconds = &defaultDeniedPGExpirationTimeSeconds } + if obj.PodGroupBackoffSeconds == nil { + obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds + } } // SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable. diff --git a/apis/config/v1beta2/defaults_test.go b/apis/config/v1beta2/defaults_test.go index 3d219bcb4..275da94fe 100644 --- a/apis/config/v1beta2/defaults_test.go +++ b/apis/config/v1beta2/defaults_test.go @@ -41,6 +41,7 @@ func TestSchedulingDefaults(t *testing.T) { expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(20), + PodGroupBackoffSeconds: pointer.Int64Ptr(0), }, }, { @@ -48,10 +49,12 @@ func TestSchedulingDefaults(t *testing.T) { config: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(10), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), DeniedPGExpirationTimeSeconds: pointer.Int64Ptr(10), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, }, { diff --git a/apis/config/v1beta2/types.go b/apis/config/v1beta2/types.go index b52b35b7a..c469ee66a 100644 --- a/apis/config/v1beta2/types.go +++ b/apis/config/v1beta2/types.go @@ -30,6 +30,8 @@ type CoschedulingArgs struct { // PermitWaitingTimeSeconds is the waiting timeout in seconds. PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"` + // PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again. + PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"` // DeniedPGExpirationTimeSeconds is the expiration time of the denied podgroup store. DeniedPGExpirationTimeSeconds *int64 `json:"deniedPGExpirationTimeSeconds,omitempty"` } diff --git a/apis/config/v1beta2/zz_generated.conversion.go b/apis/config/v1beta2/zz_generated.conversion.go index 22ca6a31d..cef8bc8ed 100644 --- a/apis/config/v1beta2/zz_generated.conversion.go +++ b/apis/config/v1beta2/zz_generated.conversion.go @@ -137,6 +137,9 @@ func autoConvert_v1beta2_CoschedulingArgs_To_config_CoschedulingArgs(in *Cosched if err := v1.Convert_Pointer_int64_To_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := v1.Convert_Pointer_int64_To_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } // WARNING: in.DeniedPGExpirationTimeSeconds requires manual conversion: does not exist in peer-type return nil } @@ -145,6 +148,9 @@ func autoConvert_config_CoschedulingArgs_To_v1beta2_CoschedulingArgs(in *config. if err := v1.Convert_int64_To_Pointer_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := v1.Convert_int64_To_Pointer_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } return nil } diff --git a/apis/config/v1beta2/zz_generated.deepcopy.go b/apis/config/v1beta2/zz_generated.deepcopy.go index 92c0cfa26..f6618215f 100644 --- a/apis/config/v1beta2/zz_generated.deepcopy.go +++ b/apis/config/v1beta2/zz_generated.deepcopy.go @@ -36,6 +36,11 @@ func (in *CoschedulingArgs) DeepCopyInto(out *CoschedulingArgs) { *out = new(int64) **out = **in } + if in.PodGroupBackoffSeconds != nil { + in, out := &in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds + *out = new(int64) + **out = **in + } if in.DeniedPGExpirationTimeSeconds != nil { in, out := &in.DeniedPGExpirationTimeSeconds, &out.DeniedPGExpirationTimeSeconds *out = new(int64) diff --git a/apis/config/v1beta3/defaults.go b/apis/config/v1beta3/defaults.go index 65419a0ae..eed0492ff 100644 --- a/apis/config/v1beta3/defaults.go +++ b/apis/config/v1beta3/defaults.go @@ -28,9 +28,9 @@ import ( ) var ( - defaultPermitWaitingTimeSeconds int64 = 60 - - defaultNodeResourcesAllocatableMode = Least + defaultPermitWaitingTimeSeconds int64 = 60 + defaultPodGroupBackoffSeconds int64 = 0 + defaultNodeResourcesAllocatableMode = Least // defaultResourcesToWeightMap is used to set the default resourceToWeight map for CPU and memory // used by the NodeResourcesAllocatable scoring plugin. @@ -101,6 +101,9 @@ func SetDefaults_CoschedulingArgs(obj *CoschedulingArgs) { if obj.PermitWaitingTimeSeconds == nil { obj.PermitWaitingTimeSeconds = &defaultPermitWaitingTimeSeconds } + if obj.PodGroupBackoffSeconds == nil { + obj.PodGroupBackoffSeconds = &defaultPodGroupBackoffSeconds + } } // SetDefaults_NodeResourcesAllocatableArgs sets the defaults parameters for NodeResourceAllocatable. diff --git a/apis/config/v1beta3/defaults_test.go b/apis/config/v1beta3/defaults_test.go index 82e9566ed..9813b416d 100644 --- a/apis/config/v1beta3/defaults_test.go +++ b/apis/config/v1beta3/defaults_test.go @@ -40,15 +40,18 @@ func TestSchedulingDefaults(t *testing.T) { config: &CoschedulingArgs{}, expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(0), }, }, { name: "set non default CoschedulingArgs", config: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, expect: &CoschedulingArgs{ PermitWaitingTimeSeconds: pointer.Int64Ptr(60), + PodGroupBackoffSeconds: pointer.Int64Ptr(20), }, }, { diff --git a/apis/config/v1beta3/types.go b/apis/config/v1beta3/types.go index d5bc00082..4c3b2fc9f 100644 --- a/apis/config/v1beta3/types.go +++ b/apis/config/v1beta3/types.go @@ -30,6 +30,8 @@ type CoschedulingArgs struct { // PermitWaitingTimeSeconds is the waiting timeout in seconds. PermitWaitingTimeSeconds *int64 `json:"permitWaitingTimeSeconds,omitempty"` + // PodGroupBackoffSeconds is the backoff time in seconds before a pod group can be scheduled again. + PodGroupBackoffSeconds *int64 `json:"podGroupBackoffSeconds,omitempty"` } // ModeType is a type "string". diff --git a/apis/config/v1beta3/zz_generated.conversion.go b/apis/config/v1beta3/zz_generated.conversion.go index d24f789a5..5ed3b6a64 100644 --- a/apis/config/v1beta3/zz_generated.conversion.go +++ b/apis/config/v1beta3/zz_generated.conversion.go @@ -177,6 +177,9 @@ func autoConvert_v1beta3_CoschedulingArgs_To_config_CoschedulingArgs(in *Cosched if err := v1.Convert_Pointer_int64_To_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := v1.Convert_Pointer_int64_To_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } return nil } @@ -189,6 +192,9 @@ func autoConvert_config_CoschedulingArgs_To_v1beta3_CoschedulingArgs(in *config. if err := v1.Convert_int64_To_Pointer_int64(&in.PermitWaitingTimeSeconds, &out.PermitWaitingTimeSeconds, s); err != nil { return err } + if err := v1.Convert_int64_To_Pointer_int64(&in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds, s); err != nil { + return err + } return nil } diff --git a/apis/config/v1beta3/zz_generated.deepcopy.go b/apis/config/v1beta3/zz_generated.deepcopy.go index cc2a636aa..377aef4c9 100644 --- a/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/apis/config/v1beta3/zz_generated.deepcopy.go @@ -36,6 +36,11 @@ func (in *CoschedulingArgs) DeepCopyInto(out *CoschedulingArgs) { *out = new(int64) **out = **in } + if in.PodGroupBackoffSeconds != nil { + in, out := &in.PodGroupBackoffSeconds, &out.PodGroupBackoffSeconds + *out = new(int64) + **out = **in + } return } diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index 6b8466f1c..cedddd093 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -61,6 +61,7 @@ type Manager interface { DeletePermittedPodGroup(string) CalculateAssignedPods(string, string) int ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) + BackoffPodGroup(string, time.Duration) } // PodGroupManager defines the scheduling operation called @@ -74,6 +75,8 @@ type PodGroupManager struct { scheduleTimeout *time.Duration // permittedPG stores the podgroup name which has passed the pre resource check. permittedPG *gochache.Cache + // backedOffPG stores the podgorup name which failed scheudling recently. + backedOffPG *gochache.Cache // pgLister is podgroup lister pgLister pglister.PodGroupLister // podLister is pod lister @@ -91,10 +94,18 @@ func NewPodGroupManager(pgClient pgclientset.Interface, snapshotSharedLister fra pgLister: pgInformer.Lister(), podLister: podInformer.Lister(), permittedPG: gochache.New(3*time.Second, 3*time.Second), + backedOffPG: gochache.New(10*time.Second, 10*time.Second), } return pgMgr } +func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { + if backoff == time.Duration(0) { + return + } + pgMgr.backedOffPG.Add(pgName, nil, backoff) +} + // ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod // in the given state, with a reserved key "kubernetes.io/pods-to-activate". func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { @@ -143,6 +154,10 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er return nil } + if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist { + return fmt.Errorf("podGroup %v failed recently", pgFullName) + } + pods, err := pgMgr.podLister.Pods(pod.Namespace).List( labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}), ) diff --git a/pkg/coscheduling/core/core_test.go b/pkg/coscheduling/core/core_test.go index 47c4a1049..86eeae440 100644 --- a/pkg/coscheduling/core/core_test.go +++ b/pkg/coscheduling/core/core_test.go @@ -139,7 +139,7 @@ func TestPreFilter(t *testing.T) { existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) pgMgr := &PodGroupManager{pgLister: pgLister, permittedPG: newCache(), - snapshotSharedLister: snapshot, podLister: podInformer.Lister(), scheduleTimeout: &scheduleTimeout} + snapshotSharedLister: snapshot, podLister: podInformer.Lister(), scheduleTimeout: &scheduleTimeout, backedOffPG: gochache.New(10*time.Second, 10*time.Second)} informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index 042413c80..d84412d35 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -22,6 +22,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" corev1helpers "k8s.io/component-helpers/scheduling/corev1" @@ -30,6 +31,7 @@ import ( "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" @@ -41,6 +43,7 @@ type Coscheduling struct { frameworkHandler framework.Handle pgMgr core.Manager scheduleTimeout *time.Duration + pgBackoff *time.Duration } var _ framework.QueueSortPlugin = &Coscheduling{} @@ -78,6 +81,14 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) pgMgr: pgMgr, scheduleTimeout: &scheduleTimeDuration, } + if args.PodGroupBackoffSeconds < 0 { + err := fmt.Errorf("Parse Arguments Failed") + klog.ErrorS(err, "PodGroupBackoffSeconds cannot be negative") + return nil, err + } else if args.PodGroupBackoffSeconds > 0 { + pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second + plugin.pgBackoff = &pgBackoff + } pgInformerFactory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), pgInformer.Informer().HasSynced) { err := fmt.Errorf("WaitForCacheSync failed") @@ -166,6 +177,16 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter") } }) + + if cs.pgBackoff != nil { + pods, err := cs.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}), + ) + if err == nil && len(pods) >= int(pg.Spec.MinMember) { + cs.pgMgr.BackoffPodGroup(pgName, *cs.pgBackoff) + } + } + cs.pgMgr.DeletePermittedPodGroup(pgName) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name)) diff --git a/pkg/coscheduling/coscheduling_test.go b/pkg/coscheduling/coscheduling_test.go index 4c317ae1a..848109ece 100644 --- a/pkg/coscheduling/coscheduling_test.go +++ b/pkg/coscheduling/coscheduling_test.go @@ -18,6 +18,8 @@ package coscheduling import ( "context" + "fmt" + "reflect" "testing" "time" @@ -30,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/utils/pointer" _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" @@ -39,6 +42,99 @@ import ( testutil "sigs.k8s.io/scheduler-plugins/test/util" ) +func TestPodGroupBackoffTime(t *testing.T) { + tests := []struct { + name string + pod1 *v1.Pod + pod2 *v1.Pod + pod3 *v1.Pod + }{ + { + name: "pod in infinite scheduling loop.", + pod1: st.MakePod().Name("pod1").UID("pod1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pod2: st.MakePod().Name("pod2").UID("pod2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pod3: st.MakePod().Name("pod3").UID("pod3").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + } + ctx := context.Background() + cs := fakepgclientset.NewSimpleClientset() + pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() + pgInformerFactory.Start(ctx.Done()) + pg1 := testutil.MakePG("pg1", "ns1", 3, nil, nil) + pgInformer.Informer().GetStore().Add(pg1) + + fakeClient := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + podInformer := informerFactory.Core().V1().Pods() + informerFactory.Start(ctx.Done()) + existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) + snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) + // Compose a framework handle. + registeredPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), + frameworkruntime.WithClientSet(fakeClient), + frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + if err != nil { + t.Fatal(err) + } + scheduleDuration := 10 * time.Second + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podInformer.Informer().GetStore().Add(tt.pod1) + podInformer.Informer().GetStore().Add(tt.pod2) + podInformer.Informer().GetStore().Add(tt.pod3) + pgMgr := core.NewPodGroupManager(cs, snapshot, &scheduleDuration, pgInformer, podInformer) + coscheduling := &Coscheduling{pgMgr: pgMgr, frameworkHandler: f, scheduleTimeout: &scheduleDuration, pgBackoff: pointer.Duration(time.Duration(1 * time.Second))} + state := framework.NewCycleState() + state.Write(framework.PodsToActivateKey, framework.NewPodsToActivate()) + code, _ := coscheduling.Permit(context.Background(), state, tt.pod1, "test") + if code.Code() != framework.Wait { + t.Errorf("expected %v, got %v", framework.Wait, code.Code()) + return + } + + podsToActiveObj, err := state.Read(framework.PodsToActivateKey) + if err != nil { + t.Errorf("expecte pod2 and pod3 in pods to active") + return + } + podsToActive, ok := podsToActiveObj.(*framework.PodsToActivate) + if !ok { + t.Errorf("cannot convert type %t to *framework.PodsToActivate", podsToActiveObj) + return + } + + var expectPodsToActivate = map[string]*v1.Pod{ + "ns1/pod2": tt.pod2, "ns1/pod3": tt.pod3, + } + if !reflect.DeepEqual(expectPodsToActivate, podsToActive.Map) { + t.Errorf("expected %v, got %v", expectPodsToActivate, podsToActive.Map) + return + } + + coscheduling.PostFilter(context.Background(), framework.NewCycleState(), tt.pod2, nil) + + _, code = coscheduling.PreFilter(context.Background(), framework.NewCycleState(), tt.pod3) + if code.Code() != framework.UnschedulableAndUnresolvable { + t.Errorf("expected %v, got %v", framework.UnschedulableAndUnresolvable, code.Code()) + return + } + pgFullName, _ := pgMgr.GetPodGroup(tt.pod1) + if code.Reasons()[0] != fmt.Sprintf("podGroup %v failed recently", pgFullName) { + t.Errorf("expected %v, got %v", pgFullName, code.Reasons()[0]) + return + } + }) + } +} + func TestLess(t *testing.T) { now := time.Now() times := make([]time.Time, 0) diff --git a/test/integration/coscheduling_test.go b/test/integration/coscheduling_test.go index f06c5bb2f..6a4c88665 100644 --- a/test/integration/coscheduling_test.go +++ b/test/integration/coscheduling_test.go @@ -377,6 +377,184 @@ func TestCoschedulingPlugin(t *testing.T) { } } +func TestPodgroupBackoff(t *testing.T) { + testCtx := &testContext{} + testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) + + cs := kubernetes.NewForConfigOrDie(globalKubeConfig) + extClient := versioned.NewForConfigOrDie(globalKubeConfig) + testCtx.ClientSet = cs + testCtx.KubeConfig = globalKubeConfig + + if err := wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + groupList, _, err := cs.ServerGroupsAndResources() + if err != nil { + return false, nil + } + for _, group := range groupList { + if group.Name == scheduling.GroupName { + t.Log("The CRD is ready to serve") + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatalf("Timed out waiting for CRD to be ready: %v", err) + } + + cfg, err := util.NewDefaultSchedulerComponentConfig() + if err != nil { + t.Fatal(err) + } + cfg.Profiles[0].Plugins.QueueSort = schedapi.PluginSet{ + Enabled: []schedapi.Plugin{{Name: coscheduling.Name}}, + Disabled: []schedapi.Plugin{{Name: "*"}}, + } + cfg.Profiles[0].Plugins.PreFilter.Enabled = append(cfg.Profiles[0].Plugins.PreFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].Plugins.PostFilter.Enabled = append(cfg.Profiles[0].Plugins.PostFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].Plugins.Permit.Enabled = append(cfg.Profiles[0].Plugins.Permit.Enabled, schedapi.Plugin{Name: coscheduling.Name}) + cfg.Profiles[0].PluginConfig = append(cfg.Profiles[0].PluginConfig, schedapi.PluginConfig{ + Name: coscheduling.Name, + Args: &schedconfig.CoschedulingArgs{ + PermitWaitingTimeSeconds: 3, + PodGroupBackoffSeconds: 1, + }, + }) + + ns := fmt.Sprintf("integration-test-%v", string(uuid.NewUUID())) + createNamespace(t, testCtx, ns) + + testCtx = initTestSchedulerWithOptions( + t, + testCtx, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(fwkruntime.Registry{coscheduling.Name: coscheduling.New}), + ) + syncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + t.Log("Init scheduler success") + defer cleanupTest(t, testCtx) + + // Create Nodes. + node1Name := "fake-node-1" + node1 := st.MakeNode().Name(node1Name).Label("node", node1Name).Obj() + node1.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(6, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node1.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(6, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node1, err = cs.CoreV1().Nodes().Create(testCtx.Ctx, node1, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Node %q: %v", node1Name, err) + } + node2Name := "fake-node-2" + node2 := st.MakeNode().Name(node2Name).Label("node", node2Name).Obj() + node2.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node2.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node2, err = cs.CoreV1().Nodes().Create(testCtx.Ctx, node2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Node %q: %v", node2Name, err) + } + node3Name := "fake-node-3" + node3 := st.MakeNode().Name(node3Name).Label("node", node3Name).Obj() + node3.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node3.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node3, err = cs.CoreV1().Nodes().Create(testCtx.Ctx, node3, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Node %q: %v", node3Name, err) + } + node4Name := "fake-node-4" + node4 := st.MakeNode().Name(node4Name).Label("node", node4Name).Obj() + node4.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node4.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + } + node4, err = cs.CoreV1().Nodes().Create(testCtx.Ctx, node4, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Node %q: %v", node4Name, err) + } + pause := imageutils.GetPauseImageName() + for _, tt := range []struct { + name string + pods []*v1.Pod + podGroups []*v1alpha1.PodGroup + expectedPods []string + }{ + { + name: "pg1 can not be scheduled and pg2 can be scheduled", + pods: []*v1.Pod{ + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-3").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-4").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-5").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p1-6").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg1-1").ZeroTerminationGracePeriod().Obj(), pause), + WithContainer(st.MakePod().Namespace(ns).Name("t1-p2-1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Priority( + midPriority).Label(v1alpha1.PodGroupLabel, "pg2-1").ZeroTerminationGracePeriod().Obj(), pause), + }, + podGroups: []*v1alpha1.PodGroup{ + util.MakePG("pg1-1", ns, 6, nil, nil), + util.MakePG("pg2-1", ns, 1, nil, nil), + }, + expectedPods: []string{"t1-p2-1"}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Logf("Start-coscheduling-test %v", tt.name) + defer cleanupPodGroups(testCtx.Ctx, extClient, tt.podGroups) + // create pod group + if err := createPodGroups(testCtx.Ctx, extClient, tt.podGroups); err != nil { + t.Fatal(err) + } + defer cleanupPods(t, testCtx, tt.pods) + // Create Pods, we will expect them to be scheduled in a reversed order. + for i := range tt.pods { + klog.InfoS("Creating pod ", "podName", tt.pods[i].Name) + if _, err := cs.CoreV1().Pods(tt.pods[i].Namespace).Create(testCtx.Ctx, tt.pods[i], metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", tt.pods[i].Name, err) + } + } + err = wait.Poll(1*time.Second, 120*time.Second, func() (bool, error) { + for _, v := range tt.expectedPods { + if !podScheduled(cs, ns, v) { + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("%v Waiting expectedPods error: %v", tt.name, err.Error()) + } + t.Logf("Case %v finished", tt.name) + }) + } +} + func WithContainer(pod *v1.Pod, image string) *v1.Pod { pod.Spec.Containers[0].Name = "con0" pod.Spec.Containers[0].Image = image