From 89a4ed1e2c62358517f023e643598f680def2f79 Mon Sep 17 00:00:00 2001 From: chaosi-zju Date: Tue, 21 Nov 2023 20:27:44 +0800 Subject: [PATCH] Expose assignReplicas and selectClusters function in scheduler Signed-off-by: chaosi-zju --- pkg/scheduler/core/common.go | 57 ++++++++++++ pkg/scheduler/core/generic_scheduler.go | 41 +-------- pkg/scheduler/core/generic_scheduler_test.go | 92 ++++++++++++++++++++ 3 files changed, 153 insertions(+), 37 deletions(-) create mode 100644 pkg/scheduler/core/common.go create mode 100644 pkg/scheduler/core/generic_scheduler_test.go diff --git a/pkg/scheduler/core/common.go b/pkg/scheduler/core/common.go new file mode 100644 index 000000000000..1819a6f4681d --- /dev/null +++ b/pkg/scheduler/core/common.go @@ -0,0 +1,57 @@ +package core + +import ( + "fmt" + "time" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint" + "github.com/karmada-io/karmada/pkg/scheduler/framework" + "github.com/karmada-io/karmada/pkg/scheduler/metrics" +) + +func SelectClusters(clustersScore framework.ClusterScoreList, + placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) { + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepSelect, startTime) + + groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec, calAvailableReplicas) + return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas) +} + +func AssignReplicas( + clusters []*clusterv1alpha1.Cluster, + placement *policyv1alpha1.Placement, + object *workv1alpha2.ResourceBindingSpec, +) ([]workv1alpha2.TargetCluster, error) { + startTime := time.Now() + defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime) + + if len(clusters) == 0 { + return nil, fmt.Errorf("no clusters available to schedule") + } + + if object.Replicas > 0 { + state := newAssignState(clusters, placement, object) + assignFunc, ok := assignFuncMap[state.strategyType] + if !ok { + // should never happen at present + return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+ + "please try another scheduling strategy", placement.ReplicaSchedulingType(), placement.ReplicaScheduling.ReplicaDivisionPreference) + } + assignResults, err := assignFunc(state) + if err != nil { + return nil, err + } + return removeZeroReplicasCluster(assignResults), nil + } + + // If not workload, assign all clusters without considering replicas. + targetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) + for i, cluster := range clusters { + targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name} + } + return targetClusters, nil +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 068f2facf11a..b45242db28e2 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -11,7 +11,6 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/scheduler/cache" - "github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint" "github.com/karmada-io/karmada/pkg/scheduler/framework" "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" "github.com/karmada-io/karmada/pkg/scheduler/metrics" @@ -159,44 +158,12 @@ func (g *genericScheduler) prioritizeClusters( func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) { - startTime := time.Now() - defer metrics.ScheduleStep(metrics.ScheduleStepSelect, startTime) - groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec, calAvailableReplicas) - return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas) + return SelectClusters(clustersScore, placement, spec) } -func (g *genericScheduler) assignReplicas( - clusters []*clusterv1alpha1.Cluster, - placement *policyv1alpha1.Placement, - object *workv1alpha2.ResourceBindingSpec, -) ([]workv1alpha2.TargetCluster, error) { - startTime := time.Now() - defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime) +func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, placement *policyv1alpha1.Placement, + object *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) { - if len(clusters) == 0 { - return nil, fmt.Errorf("no clusters available to schedule") - } - - if object.Replicas > 0 { - state := newAssignState(clusters, placement, object) - assignFunc, ok := assignFuncMap[state.strategyType] - if !ok { - // should never happen at present - return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+ - "please try another scheduling strategy", placement.ReplicaSchedulingType(), placement.ReplicaScheduling.ReplicaDivisionPreference) - } - assignResults, err := assignFunc(state) - if err != nil { - return nil, err - } - return removeZeroReplicasCluster(assignResults), nil - } - - // If not workload, assign all clusters without considering replicas. - targetClusters := make([]workv1alpha2.TargetCluster, len(clusters)) - for i, cluster := range clusters { - targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name} - } - return targetClusters, nil + return AssignReplicas(clusters, placement, object) } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go new file mode 100644 index 000000000000..3b46223db853 --- /dev/null +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -0,0 +1,92 @@ +package core + +import ( + "testing" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/test/helper" +) + +func Test_genericScheduler_AssignReplicas(t *testing.T) { + tests := []struct { + name string + retryTimes int + clusters []*clusterv1alpha1.Cluster + placement *policyv1alpha1.Placement + object *workv1alpha2.ResourceBindingSpec + wants [][]workv1alpha2.TargetCluster + wantErr bool + }{ + { + name: "replica 3, static weighted 1:1, retry 10 times", + retryTimes: 10, + clusters: []*clusterv1alpha1.Cluster{ + helper.NewCluster(ClusterMember1), + helper.NewCluster(ClusterMember2), + }, + object: &workv1alpha2.ResourceBindingSpec{ + Replicas: 3, + }, + placement: &policyv1alpha1.Placement{ + ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{ + ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided, + ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted, + WeightPreference: &policyv1alpha1.ClusterPreferences{ + StaticWeightList: []policyv1alpha1.StaticClusterWeight{ + {TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{ClusterMember1}}, Weight: 1}, + {TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{ClusterMember2}}, Weight: 1}, + }, + }, + }, + }, + wants: [][]workv1alpha2.TargetCluster{ + { + {Name: ClusterMember1, Replicas: 1}, + {Name: ClusterMember2, Replicas: 2}, + }, + { + {Name: ClusterMember1, Replicas: 2}, + {Name: ClusterMember2, Replicas: 1}, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := &genericScheduler{} + resultHitCounts := make(map[int]int) + + // retry for multi times, the actual result must within tt.wants, and also cover tt.wants + for i := 0; i < tt.retryTimes; i++ { + got, err := g.assignReplicas(tt.clusters, tt.placement, tt.object) + if (err != nil) != tt.wantErr { + t.Errorf("AssignReplicas() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + continue + } + hitIdx := -1 + for idx, want := range tt.wants { + if helper.IsScheduleResultEqual(got, want) { + hitIdx = idx + break + } + } + if hitIdx >= 0 { + resultHitCounts[hitIdx]++ + } else { + t.Errorf("AssignReplicas() got = %v, wants %v", got, tt.wants) + return + } + } + + if len(resultHitCounts) < len(tt.wants) { + t.Errorf("want %d possible result, but got %d possible result", len(tt.wants), len(resultHitCounts)) + } + }) + } +}