Skip to content

Commit 808b10e

Browse files
committed
adjust the Priority of General and Accurate Estimator of Karmada Scheduler
Signed-off-by: chaosi-zju <[email protected]>
1 parent 697170b commit 808b10e

File tree

14 files changed

+580
-39
lines changed

14 files changed

+580
-39
lines changed

pkg/descheduler/core/helper.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,26 +62,61 @@ func NewSchedulingResultHelper(binding *workv1alpha2.ResourceBinding) *Schedulin
6262
func (h *SchedulingResultHelper) FillUnschedulableReplicas(unschedulableThreshold time.Duration) {
6363
reference := &h.Spec.Resource
6464
undesiredClusters, undesiredClusterNames := h.GetUndesiredClusters()
65+
// clusterIndexToEstimatorPriority key refers to index of cluster slice,
66+
// value refers to the EstimatorPriority of who gave its estimated result.
67+
clusterIndexToEstimatorPriority := make(map[int]estimatorclient.EstimatorPriority)
68+
6569
// Set the boundary.
6670
for i := range undesiredClusters {
6771
undesiredClusters[i].Unschedulable = math.MaxInt32
6872
}
69-
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
73+
74+
// Get all replicaEstimators, which are stored in TreeMap.
7075
estimators := estimatorclient.GetUnschedulableReplicaEstimators()
7176
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
7277
fmt.Sprintf("kind=%s, name=%s/%s", reference.Kind, reference.Namespace, reference.Name))
73-
for _, estimator := range estimators {
74-
res, err := estimator.GetUnschedulableReplicas(ctx, undesiredClusterNames, reference, unschedulableThreshold)
75-
if err != nil {
76-
klog.Errorf("Max cluster unschedulable replicas error: %v", err)
77-
continue
78+
79+
// List all unschedulableReplicaEstimators in ascending order of priority. The estimators are grouped with different
80+
// priorities, e.g: [priority:10, {estimators:[es1, es3]}, {priority:20, estimators:[es2, es4]}, ...]
81+
estimatorGroups := estimators.Values()
82+
83+
// Iterate the estimator groups in descending order of priority
84+
for i := len(estimatorGroups) - 1; i >= 0; i-- {
85+
// if higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
86+
if len(clusterIndexToEstimatorPriority) == len(undesiredClusterNames) {
87+
break
7888
}
79-
for i := range res {
80-
if res[i].Replicas == estimatorclient.UnauthenticReplica {
89+
// get a group of estimators corresponding to that specific priority.
90+
estimatorsWithSamePriority := estimatorGroups[i].(map[string]estimatorclient.UnschedulableReplicaEstimator)
91+
// iterate through these estimators with the same priority.
92+
for _, estimator := range estimatorsWithSamePriority {
93+
res, err := estimator.GetUnschedulableReplicas(ctx, undesiredClusterNames, reference, unschedulableThreshold)
94+
if err != nil {
95+
klog.Errorf("Max cluster unschedulable replicas error: %v", err)
8196
continue
8297
}
83-
if undesiredClusters[i].ClusterName == res[i].Name && undesiredClusters[i].Unschedulable > res[i].Replicas {
84-
undesiredClusters[i].Unschedulable = res[i].Replicas
98+
for i := range res {
99+
// the result of this cluster estimated failed, ignore the corresponding result
100+
if res[i].Replicas == estimatorclient.UnauthenticReplica {
101+
continue
102+
}
103+
// the cluster name not match, ignore, which hardly ever happens
104+
if res[i].Name != undesiredClusters[i].ClusterName {
105+
klog.Errorf("unexpected cluster name in the result of estimator with %d priority, "+
106+
"expected: %s, got: %s", estimator.Priority(), undesiredClusters[i].ClusterName, res[i].Name)
107+
continue
108+
}
109+
// the result of this cluster has already been estimated by higher-priority estimator,
110+
// ignore the corresponding result by this estimator
111+
if priority, ok := clusterIndexToEstimatorPriority[i]; ok && estimator.Priority() < priority {
112+
continue
113+
}
114+
// if multiple estimators are called, choose the minimum value of each estimated result,
115+
// record the priority of result provider.
116+
if res[i].Replicas < undesiredClusters[i].Unschedulable {
117+
undesiredClusters[i].Unschedulable = res[i].Replicas
118+
clusterIndexToEstimatorPriority[i] = estimator.Priority()
119+
}
85120
}
86121
}
87122
}

pkg/descheduler/descheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ func NewDescheduler(karmadaClient karmadaclientset.Interface, kubeClient kuberne
100100
ReconcileFunc: desched.reconcileEstimatorConnection,
101101
}
102102
desched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions)
103-
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, opts.SchedulerEstimatorTimeout.Duration)
103+
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache,
104+
opts.SchedulerEstimatorTimeout.Duration, estimatorclient.Accurate)
104105
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
105106
deschedulerWorkerOptions := util.Options{
106107
Name: "descheduler",

pkg/descheduler/descheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func TestDescheduler_worker(t *testing.T) {
485485
unschedulableThreshold: 5 * time.Minute,
486486
eventRecorder: record.NewFakeRecorder(1024),
487487
}
488-
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, 5*time.Second)
488+
schedulerEstimator := estimatorclient.NewSchedulerEstimator(desched.schedulerEstimatorCache, 5*time.Second, estimatorclient.Accurate)
489489
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
490490

491491
for _, c := range tt.args.unschedulable {

pkg/estimator/client/accurate.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,25 @@ import (
3232

3333
// RegisterSchedulerEstimator will register a SchedulerEstimator.
3434
func RegisterSchedulerEstimator(se *SchedulerEstimator) {
35-
replicaEstimators["scheduler-estimator"] = se
36-
unschedulableReplicaEstimators["scheduler-estimator"] = se
35+
registerReplicaEstimator("scheduler-estimator", se)
36+
registerUnschedulableReplicaEstimator("scheduler-estimator", se)
3737
}
3838

3939
type getClusterReplicasFunc func(ctx context.Context, cluster string) (int32, error)
4040

4141
// SchedulerEstimator is an estimator that calls karmada-scheduler-estimator for estimation.
4242
type SchedulerEstimator struct {
43-
cache *SchedulerEstimatorCache
44-
timeout time.Duration
43+
cache *SchedulerEstimatorCache
44+
timeout time.Duration
45+
priority EstimatorPriority
4546
}
4647

4748
// NewSchedulerEstimator builds a new SchedulerEstimator.
48-
func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration) *SchedulerEstimator {
49+
func NewSchedulerEstimator(cache *SchedulerEstimatorCache, timeout time.Duration, priority EstimatorPriority) *SchedulerEstimator {
4950
return &SchedulerEstimator{
50-
cache: cache,
51-
timeout: timeout,
51+
cache: cache,
52+
timeout: timeout,
53+
priority: priority,
5254
}
5355
}
5456

@@ -67,6 +69,11 @@ func (se *SchedulerEstimator) MaxAvailableReplicas(
6769
})
6870
}
6971

72+
// Priority provides the priority of this estimator.
73+
func (se *SchedulerEstimator) Priority() EstimatorPriority {
74+
return se.priority
75+
}
76+
7077
// GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator.
7178
func (se *SchedulerEstimator) GetUnschedulableReplicas(
7279
parentCtx context.Context,

pkg/estimator/client/general.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@ import (
3232

3333
// GeneralEstimator is the default replica estimator.
3434
func init() {
35-
replicaEstimators["general-estimator"] = NewGeneralEstimator()
35+
registerReplicaEstimator("general-estimator", NewGeneralEstimator(General))
3636
}
3737

3838
// GeneralEstimator is a normal estimator in terms of cluster ResourceSummary.
39-
type GeneralEstimator struct{}
39+
type GeneralEstimator struct {
40+
priority EstimatorPriority
41+
}
4042

4143
// NewGeneralEstimator builds a new GeneralEstimator.
42-
func NewGeneralEstimator() *GeneralEstimator {
43-
return &GeneralEstimator{}
44+
func NewGeneralEstimator(priority EstimatorPriority) *GeneralEstimator {
45+
return &GeneralEstimator{priority: priority}
4446
}
4547

4648
// MaxAvailableReplicas estimates the maximum replicas that can be applied to the target cluster by cluster ResourceSummary.
@@ -53,6 +55,11 @@ func (ge *GeneralEstimator) MaxAvailableReplicas(_ context.Context, clusters []*
5355
return availableTargetClusters, nil
5456
}
5557

58+
// Priority provides the priority of this estimator.
59+
func (ge *GeneralEstimator) Priority() EstimatorPriority {
60+
return ge.priority
61+
}
62+
5663
func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) int32 {
5764
resourceSummary := cluster.Status.ResourceSummary
5865
if resourceSummary == nil {

pkg/estimator/client/interface.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"time"
2222

23+
"github.com/emirpasic/gods/maps/treemap"
24+
2325
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
2426
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
2527
)
@@ -30,26 +32,69 @@ import (
3032
const UnauthenticReplica = -1
3133

3234
var (
33-
replicaEstimators = map[string]ReplicaEstimator{}
34-
unschedulableReplicaEstimators = map[string]UnschedulableReplicaEstimator{}
35+
// replicaEstimators are organized in a TreeMap, where the key is of type EstimatorPriority,
36+
// indicating the estimator's priority level, the value is a map with string keys and ReplicaEstimator values,
37+
// grouping estimators by their respective priorities.
38+
replicaEstimators = treemap.NewWithIntComparator()
39+
40+
// unschedulableReplicaEstimators are organized in a TreeMap, where the key is of type EstimatorPriority,
41+
// indicating the estimator's priority level, the value is a map with string keys and UnschedulableReplicaEstimator values,
42+
// grouping estimators by their respective priorities.
43+
unschedulableReplicaEstimators = treemap.NewWithIntComparator()
3544
)
3645

46+
// registerReplicaEstimator add a estimator to replicaEstimators
47+
func registerReplicaEstimator(estimatorName string, estimator ReplicaEstimator) {
48+
if val, ok := replicaEstimators.Get(estimator.Priority()); !ok {
49+
replicaEstimators.Put(estimator.Priority(), map[string]ReplicaEstimator{estimatorName: estimator})
50+
} else {
51+
estimatorsWithSamePriority := val.(map[string]ReplicaEstimator)
52+
estimatorsWithSamePriority[estimatorName] = estimator
53+
}
54+
}
55+
56+
// registerUnschedulableReplicaEstimator add a estimator to unschedulableReplicaEstimators
57+
func registerUnschedulableReplicaEstimator(estimatorName string, estimator UnschedulableReplicaEstimator) {
58+
if val, ok := unschedulableReplicaEstimators.Get(estimator.Priority()); !ok {
59+
unschedulableReplicaEstimators.Put(estimator.Priority(), map[string]UnschedulableReplicaEstimator{estimatorName: estimator})
60+
} else {
61+
estimatorsWithSamePriority := val.(map[string]UnschedulableReplicaEstimator)
62+
estimatorsWithSamePriority[estimatorName] = estimator
63+
}
64+
}
65+
3766
// ReplicaEstimator is an estimator which estimates the maximum replicas that can be applied to the target cluster.
3867
type ReplicaEstimator interface {
3968
MaxAvailableReplicas(ctx context.Context, clusters []*clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) ([]workv1alpha2.TargetCluster, error)
69+
Priority() EstimatorPriority
4070
}
4171

4272
// UnschedulableReplicaEstimator is an estimator which estimates the unschedulable replicas which belong to a specified workload.
4373
type UnschedulableReplicaEstimator interface {
4474
GetUnschedulableReplicas(ctx context.Context, clusters []string, reference *workv1alpha2.ObjectReference, unschedulableThreshold time.Duration) ([]workv1alpha2.TargetCluster, error)
75+
Priority() EstimatorPriority
4576
}
4677

4778
// GetReplicaEstimators returns all replica estimators.
48-
func GetReplicaEstimators() map[string]ReplicaEstimator {
79+
func GetReplicaEstimators() *treemap.Map {
4980
return replicaEstimators
5081
}
5182

5283
// GetUnschedulableReplicaEstimators returns all unschedulable replica estimators.
53-
func GetUnschedulableReplicaEstimators() map[string]UnschedulableReplicaEstimator {
84+
func GetUnschedulableReplicaEstimators() *treemap.Map {
5485
return unschedulableReplicaEstimators
5586
}
87+
88+
// EstimatorPriority the priority of estimator
89+
// 1. If two estimators are of the same priority, call both and choose the minimum value of each estimated result.
90+
// 2. If higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
91+
// 3. If higher-priority estimators haven't given the result for certain member clusters, lower-priority estimator will
92+
// continue to estimate for such clusters haven't got a result.
93+
type EstimatorPriority int
94+
95+
const (
96+
// General general priority, e.g: ResourceModel
97+
General EstimatorPriority = 10
98+
// Accurate accurate priority, e.g: SchedulerEstimator
99+
Accurate EstimatorPriority = 20
100+
)

pkg/scheduler/core/util.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1al
5252
}
5353

5454
func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster {
55+
// availableTargetClusters stores the result of estimated replicas for each clusters
5556
availableTargetClusters := make([]workv1alpha2.TargetCluster, len(clusters))
57+
// clusterIndexToEstimatorPriority key refers to index of cluster slice,
58+
// value refers to the EstimatorPriority of who gave its estimated result.
59+
clusterIndexToEstimatorPriority := make(map[int]estimatorclient.EstimatorPriority)
5660

5761
// Set the boundary.
5862
for i := range availableTargetClusters {
@@ -68,22 +72,52 @@ func calAvailableReplicas(clusters []*clusterv1alpha1.Cluster, spec *workv1alpha
6872
return availableTargetClusters
6973
}
7074

71-
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
72-
estimators := estimatorclient.GetReplicaEstimators()
75+
// Get all replicaEstimators, which are stored in TreeMap.
76+
replicaEstimators := estimatorclient.GetReplicaEstimators()
7377
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
7478
fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
75-
for _, estimator := range estimators {
76-
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
77-
if err != nil {
78-
klog.Errorf("Max cluster available replicas error: %v", err)
79-
continue
79+
80+
// List all replicaEstimators in ascending order of priority. The estimators are grouped with different priorities,
81+
// e.g: [priority:10, {estimators:[es1, es3]}, {priority:20, estimators:[es2, es4]}, ...]
82+
estimatorGroups := replicaEstimators.Values()
83+
84+
// Iterate the estimator groups in descending order of priority
85+
for i := len(estimatorGroups) - 1; i >= 0; i-- {
86+
// if higher-priority estimators have formed a full result of member clusters, no longer to call lower-priority estimator.
87+
if len(clusterIndexToEstimatorPriority) == len(clusters) {
88+
break
8089
}
81-
for i := range res {
82-
if res[i].Replicas == estimatorclient.UnauthenticReplica {
90+
// get a group of estimators corresponding to that specific priority.
91+
estimatorsWithSamePriority := estimatorGroups[i].(map[string]estimatorclient.ReplicaEstimator)
92+
// iterate through these estimators with the same priority.
93+
for _, estimator := range estimatorsWithSamePriority {
94+
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
95+
if err != nil {
96+
klog.Errorf("Max cluster available replicas error: %v", err)
8397
continue
8498
}
85-
if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas {
86-
availableTargetClusters[i].Replicas = res[i].Replicas
99+
for i := range res {
100+
// the result of this cluster estimated failed, ignore the corresponding result
101+
if res[i].Replicas == estimatorclient.UnauthenticReplica {
102+
continue
103+
}
104+
// the cluster name not match, ignore, which hardly ever happens
105+
if res[i].Name != availableTargetClusters[i].Name {
106+
klog.Errorf("unexpected cluster name in the result of estimator with %d priority, "+
107+
"expected: %s, got: %s", estimator.Priority(), availableTargetClusters[i].Name, res[i].Name)
108+
continue
109+
}
110+
// the result of this cluster has already been estimated by higher-priority estimator,
111+
// ignore the corresponding result by this estimator
112+
if priority, ok := clusterIndexToEstimatorPriority[i]; ok && estimator.Priority() < priority {
113+
continue
114+
}
115+
// if multiple estimators are called, choose the minimum value of each estimated result,
116+
// record the priority of result provider.
117+
if res[i].Replicas < availableTargetClusters[i].Replicas {
118+
availableTargetClusters[i].Replicas = res[i].Replicas
119+
clusterIndexToEstimatorPriority[i] = estimator.Priority()
120+
}
87121
}
88122
}
89123
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
262262
ReconcileFunc: sched.reconcileEstimatorConnection,
263263
}
264264
sched.schedulerEstimatorWorker = util.NewAsyncWorker(schedulerEstimatorWorkerOptions)
265-
schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache, options.schedulerEstimatorTimeout.Duration)
265+
schedulerEstimator := estimatorclient.NewSchedulerEstimator(sched.schedulerEstimatorCache,
266+
options.schedulerEstimatorTimeout.Duration, estimatorclient.Accurate)
266267
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
267268
}
268269
sched.enableEmptyWorkloadPropagation = options.enableEmptyWorkloadPropagation

vendor/github.com/emirpasic/gods/maps/maps.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)