Skip to content

Commit

Permalink
Expose assignReplicas and selectClusters function in scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed Nov 22, 2023
1 parent bf1098b commit 89a4ed1
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 37 deletions.
57 changes: 57 additions & 0 deletions pkg/scheduler/core/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package core

import (
"fmt"
"time"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
)

func SelectClusters(clustersScore framework.ClusterScoreList,
placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) {
startTime := time.Now()
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, startTime)

groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec, calAvailableReplicas)
return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas)
}

func AssignReplicas(
clusters []*clusterv1alpha1.Cluster,
placement *policyv1alpha1.Placement,
object *workv1alpha2.ResourceBindingSpec,
) ([]workv1alpha2.TargetCluster, error) {
startTime := time.Now()
defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime)

if len(clusters) == 0 {
return nil, fmt.Errorf("no clusters available to schedule")
}

if object.Replicas > 0 {
state := newAssignState(clusters, placement, object)
assignFunc, ok := assignFuncMap[state.strategyType]
if !ok {
// should never happen at present
return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+
"please try another scheduling strategy", placement.ReplicaSchedulingType(), placement.ReplicaScheduling.ReplicaDivisionPreference)
}
assignResults, err := assignFunc(state)
if err != nil {
return nil, err
}
return removeZeroReplicasCluster(assignResults), nil
}

// If not workload, assign all clusters without considering replicas.
targetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
for i, cluster := range clusters {
targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name}
}
return targetClusters, nil
}
41 changes: 4 additions & 37 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core/spreadconstraint"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
Expand Down Expand Up @@ -159,44 +158,12 @@ func (g *genericScheduler) prioritizeClusters(

func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList,
placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) ([]*clusterv1alpha1.Cluster, error) {
startTime := time.Now()
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, startTime)

groupClustersInfo := spreadconstraint.GroupClustersWithScore(clustersScore, placement, spec, calAvailableReplicas)
return spreadconstraint.SelectBestClusters(placement, groupClustersInfo, spec.Replicas)
return SelectClusters(clustersScore, placement, spec)
}

func (g *genericScheduler) assignReplicas(
clusters []*clusterv1alpha1.Cluster,
placement *policyv1alpha1.Placement,
object *workv1alpha2.ResourceBindingSpec,
) ([]workv1alpha2.TargetCluster, error) {
startTime := time.Now()
defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, startTime)
func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, placement *policyv1alpha1.Placement,
object *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) {

if len(clusters) == 0 {
return nil, fmt.Errorf("no clusters available to schedule")
}

if object.Replicas > 0 {
state := newAssignState(clusters, placement, object)
assignFunc, ok := assignFuncMap[state.strategyType]
if !ok {
// should never happen at present
return nil, fmt.Errorf("unsupported replica scheduling strategy, replicaSchedulingType: %s, replicaDivisionPreference: %s, "+
"please try another scheduling strategy", placement.ReplicaSchedulingType(), placement.ReplicaScheduling.ReplicaDivisionPreference)
}
assignResults, err := assignFunc(state)
if err != nil {
return nil, err
}
return removeZeroReplicasCluster(assignResults), nil
}

// If not workload, assign all clusters without considering replicas.
targetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
for i, cluster := range clusters {
targetClusters[i] = workv1alpha2.TargetCluster{Name: cluster.Name}
}
return targetClusters, nil
return AssignReplicas(clusters, placement, object)
}
92 changes: 92 additions & 0 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package core

import (
"testing"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/test/helper"
)

func Test_genericScheduler_AssignReplicas(t *testing.T) {
tests := []struct {
name string
retryTimes int
clusters []*clusterv1alpha1.Cluster
placement *policyv1alpha1.Placement
object *workv1alpha2.ResourceBindingSpec
wants [][]workv1alpha2.TargetCluster
wantErr bool
}{
{
name: "replica 3, static weighted 1:1, retry 10 times",
retryTimes: 10,
clusters: []*clusterv1alpha1.Cluster{
helper.NewCluster(ClusterMember1),
helper.NewCluster(ClusterMember2),
},
object: &workv1alpha2.ResourceBindingSpec{
Replicas: 3,
},
placement: &policyv1alpha1.Placement{
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceWeighted,
WeightPreference: &policyv1alpha1.ClusterPreferences{
StaticWeightList: []policyv1alpha1.StaticClusterWeight{
{TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{ClusterMember1}}, Weight: 1},
{TargetCluster: policyv1alpha1.ClusterAffinity{ClusterNames: []string{ClusterMember2}}, Weight: 1},
},
},
},
},
wants: [][]workv1alpha2.TargetCluster{
{
{Name: ClusterMember1, Replicas: 1},
{Name: ClusterMember2, Replicas: 2},
},
{
{Name: ClusterMember1, Replicas: 2},
{Name: ClusterMember2, Replicas: 1},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &genericScheduler{}
resultHitCounts := make(map[int]int)

// retry for multi times, the actual result must within tt.wants, and also cover tt.wants
for i := 0; i < tt.retryTimes; i++ {
got, err := g.assignReplicas(tt.clusters, tt.placement, tt.object)
if (err != nil) != tt.wantErr {
t.Errorf("AssignReplicas() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantErr {
continue
}
hitIdx := -1
for idx, want := range tt.wants {
if helper.IsScheduleResultEqual(got, want) {
hitIdx = idx
break
}
}
if hitIdx >= 0 {
resultHitCounts[hitIdx]++
} else {
t.Errorf("AssignReplicas() got = %v, wants %v", got, tt.wants)
return
}
}

if len(resultHitCounts) < len(tt.wants) {
t.Errorf("want %d possible result, but got %d possible result", len(tt.wants), len(resultHitCounts))
}
})
}
}

0 comments on commit 89a4ed1

Please sign in to comment.