Skip to content

Commit 55a5722

Browse files
author
mingzhou.swx
committed
workloadspread support rolling update
Signed-off-by: mingzhou.swx <[email protected]>
1 parent 28c0a72 commit 55a5722

13 files changed

+1007
-341
lines changed

Diff for: apis/apps/v1alpha1/workloadspread_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ type WorkloadSpreadStatus struct {
130130
// Contains the status of each subset. Each element in this array represents one subset
131131
// +optional
132132
SubsetStatuses []WorkloadSpreadSubsetStatus `json:"subsetStatuses,omitempty"`
133+
134+
// VersionedSubsetStatuses is to solve rolling-update problems, where the creation of new-version pod
135+
// may be earlier than deletion of old-version pod. We have to calculate the pod subset distribution for
136+
// each version.
137+
VersionedSubsetStatuses map[string][]WorkloadSpreadSubsetStatus `json:"versionedSubsetStatuses,omitempty"`
133138
}
134139

135140
type WorkloadSpreadSubsetConditionType string

Diff for: apis/apps/v1alpha1/zz_generated.deepcopy.go

+17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: config/crd/bases/apps.kruise.io_workloadspreads.yaml

+90
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,96 @@ spec:
432432
- replicas
433433
type: object
434434
type: array
435+
versionedSubsetStatuses:
436+
additionalProperties:
437+
items:
438+
description: WorkloadSpreadSubsetStatus defines the observed state
439+
of subset
440+
properties:
441+
conditions:
442+
description: Conditions is an array of current observed subset
443+
conditions.
444+
items:
445+
properties:
446+
lastTransitionTime:
447+
description: Last time the condition transitioned from
448+
one status to another.
449+
format: date-time
450+
type: string
451+
message:
452+
description: A human readable message indicating details
453+
about the transition.
454+
type: string
455+
reason:
456+
description: The reason for the condition's last transition.
457+
type: string
458+
status:
459+
description: Status of the condition, one of True, False,
460+
Unknown.
461+
type: string
462+
type:
463+
description: Type of in place set condition.
464+
type: string
465+
required:
466+
- status
467+
- type
468+
type: object
469+
type: array
470+
creatingPods:
471+
additionalProperties:
472+
format: date-time
473+
type: string
474+
description: CreatingPods contains information about pods
475+
whose creation was processed by the webhook handler but
476+
not yet been observed by the WorkloadSpread controller.
477+
A pod will be in this map from the time when the webhook
478+
handler processed the creation request to the time when
479+
the pod is seen by controller. The key in the map is the
480+
name of the pod and the value is the time when the webhook
481+
handler process the creation request. If the real creation
482+
didn't happen and a pod is still in this map, it will be
483+
removed from the list automatically by WorkloadSpread controller
484+
after some time. If everything goes smooth this map should
485+
be empty for the most of the time. Large number of entries
486+
in the map may indicate problems with pod creations.
487+
type: object
488+
deletingPods:
489+
additionalProperties:
490+
format: date-time
491+
type: string
492+
description: DeletingPods is similar with CreatingPods and
493+
it contains information about pod deletion.
494+
type: object
495+
missingReplicas:
496+
description: MissingReplicas is the number of active replicas
497+
belong to this subset not be found. MissingReplicas > 0
498+
indicates the subset is still missing MissingReplicas pods
499+
to create MissingReplicas = 0 indicates the subset already
500+
has enough pods, there is no need to create MissingReplicas
501+
= -1 indicates the subset's MaxReplicas not set, then there
502+
is no limit for pods number
503+
format: int32
504+
type: integer
505+
name:
506+
description: Name should be unique between all of the subsets
507+
under one WorkloadSpread.
508+
type: string
509+
replicas:
510+
description: Replicas is the most recently observed number
511+
of active replicas for subset.
512+
format: int32
513+
type: integer
514+
required:
515+
- missingReplicas
516+
- name
517+
- replicas
518+
type: object
519+
type: array
520+
description: VersionedSubsetStatuses is to solve rolling-update problems,
521+
where the creation of new-version pod may be earlier than deletion
522+
of old-version pod. We have to calculate the pod subset distribution
523+
for each version.
524+
type: object
435525
type: object
436526
type: object
437527
served: true

Diff for: pkg/controller/workloadspread/reschedule_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
66
You may obtain a copy of the License at
77
8-
http://www.apache.org/licenses/LICENSE-2.0
8+
http://www.apache.org/licenses/LICENSE-2.0
99
1010
Unless required by applicable law or agreed to in writing, software
1111
distributed under the License is distributed on an "AS IS" BASIS,

Diff for: pkg/controller/workloadspread/update_pod_deletion_cost.go

+57-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strconv"
2424

2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"k8s.io/apimachinery/pkg/types"
2728
"k8s.io/apimachinery/pkg/util/intstr"
2829
"k8s.io/klog/v2"
@@ -33,19 +34,71 @@ import (
3334
wsutil "github.com/openkruise/kruise/pkg/util/workloadspread"
3435
)
3536

37+
const (
38+
// RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
39+
RevisionAnnotation = "deployment.kubernetes.io/revision"
40+
)
41+
42+
func (r *ReconcileWorkloadSpread) getWorkloadLatestVersion(ws *appsv1alpha1.WorkloadSpread) (string, error) {
43+
targetRef := ws.Spec.TargetReference
44+
if targetRef == nil {
45+
return "", nil
46+
}
47+
48+
gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind)
49+
key := types.NamespacedName{Namespace: ws.Namespace, Name: targetRef.Name}
50+
51+
object := wsutil.GenerateEmptyWorkloadObject(gvk, key)
52+
if err := r.Get(context.TODO(), key, object); err != nil {
53+
return "", client.IgnoreNotFound(err)
54+
}
55+
56+
return wsutil.GetWorkloadVersion(r.Client, object)
57+
}
58+
3659
func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread,
37-
podMap map[string][]*corev1.Pod,
60+
versionedPodMap map[string]map[string][]*corev1.Pod,
3861
workloadReplicas int32) error {
3962
targetRef := ws.Spec.TargetReference
4063
if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) {
4164
return nil
4265
}
43-
// update Pod's deletion-cost annotation in each subset
44-
for idx, subset := range ws.Spec.Subsets {
45-
if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil {
66+
67+
latestVersion, err := r.getWorkloadLatestVersion(ws)
68+
if err != nil {
69+
klog.Errorf("Failed to get the latest version for workload in workloadSpread %v, err: %v", klog.KObj(ws), err)
70+
return err
71+
}
72+
73+
// To try our best to keep the distribution of workload description during workload rolling:
74+
// - to the latest version, we hope to scale down the last subset preferentially;
75+
// - to other old versions, we hope to scale down the first subset preferentially;
76+
for version, podMap := range versionedPodMap {
77+
err = r.updateDeletionCostBySubset(ws, podMap, workloadReplicas, version != latestVersion)
78+
if err != nil {
4679
return err
4780
}
4881
}
82+
return nil
83+
}
84+
85+
func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.WorkloadSpread,
86+
podMap map[string][]*corev1.Pod, workloadReplicas int32, reverseOrder bool) error {
87+
// update Pod's deletion-cost annotation in each subset
88+
if reverseOrder {
89+
subsetNum := len(ws.Spec.Subsets)
90+
for idx, subset := range ws.Spec.Subsets {
91+
if err := r.syncSubsetPodDeletionCost(ws, &subset, subsetNum-idx-1, podMap[subset.Name], workloadReplicas); err != nil {
92+
return err
93+
}
94+
}
95+
} else {
96+
for idx, subset := range ws.Spec.Subsets {
97+
if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil {
98+
return err
99+
}
100+
}
101+
}
49102
// update the deletion-cost annotation for such pods that do not match any real subsets.
50103
// these pods will have the minimum deletion-cost, and will be deleted preferentially.
51104
if len(podMap[FakeSubsetName]) > 0 {

Diff for: pkg/controller/workloadspread/workloadspread_controller.go

+78-15
Original file line numberDiff line numberDiff line change
@@ -319,20 +319,20 @@ func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSp
319319
klog.Warningf("WorkloadSpread (%s/%s) has no matched pods, target workload's replicas[%d]", ws.Namespace, ws.Name, workloadReplicas)
320320
}
321321

322-
// group Pods by subset
323-
podMap, err := r.groupPod(ws, pods, workloadReplicas)
322+
// group Pods by pod-revision and subset
323+
versionedPodMap, subsetPodMap, err := r.groupVersionedPods(ws, pods, workloadReplicas)
324324
if err != nil {
325325
return err
326326
}
327327

328328
// update deletion-cost for each subset
329-
err = r.updateDeletionCost(ws, podMap, workloadReplicas)
329+
err = r.updateDeletionCost(ws, versionedPodMap, workloadReplicas)
330330
if err != nil {
331331
return err
332332
}
333333

334334
// calculate status and reschedule
335-
status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, podMap, workloadReplicas)
335+
status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, versionedPodMap, subsetPodMap, workloadReplicas)
336336
if status == nil {
337337
return nil
338338
}
@@ -362,8 +362,33 @@ func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSprea
362362
return injectWS
363363
}
364364

365-
// groupPod returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset.
366-
func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) {
365+
// groupVersionedPods will group pods by pod version and subset
366+
func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSpread, allPods []*corev1.Pod, replicas int32) (map[string]map[string][]*corev1.Pod, map[string][]*corev1.Pod, error) {
367+
versionedPods := map[string][]*corev1.Pod{}
368+
for _, pod := range allPods {
369+
version := wsutil.GetPodVersion(pod)
370+
versionedPods[version] = append(versionedPods[version], pod)
371+
}
372+
373+
subsetPodMap := map[string][]*corev1.Pod{}
374+
versionedPodMap := map[string]map[string][]*corev1.Pod{}
375+
// group pods by version
376+
for version, pods := range versionedPods {
377+
// group pods by subset
378+
podMap, err := r.groupPodBySubset(ws, pods, replicas)
379+
if err != nil {
380+
return nil, nil, err
381+
}
382+
for subset, ps := range podMap {
383+
subsetPodMap[subset] = append(subsetPodMap[subset], ps...)
384+
}
385+
versionedPodMap[version] = podMap
386+
}
387+
return versionedPodMap, subsetPodMap, nil
388+
}
389+
390+
// groupPodBySubset returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset.
391+
func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) {
367392
podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1)
368393
podMap[FakeSubsetName] = []*corev1.Pod{}
369394
subsetMissingReplicas := make(map[string]int)
@@ -507,18 +532,58 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P
507532
// 1. current WorkloadSpreadStatus
508533
// 2. a map, the key is the subsetName, the value is the schedule failed Pods belongs to the subset.
509534
func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread,
510-
podMap map[string][]*corev1.Pod, workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) {
511-
// set the generation in the returned status
535+
versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod,
536+
workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) {
512537
status := appsv1alpha1.WorkloadSpreadStatus{}
538+
// set the generation in the returned status
513539
status.ObservedGeneration = ws.Generation
514540
// status.ObservedWorkloadReplicas = workloadReplicas
515-
status.SubsetStatuses = make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets))
541+
status.VersionedSubsetStatuses = make(map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus, len(versionedPodMap))
542+
543+
// overall subset statuses
544+
var scheduleFailedPodMap map[string][]*corev1.Pod
545+
status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.SubsetStatuses, subsetPodMap, workloadReplicas)
546+
547+
// versioned subset statuses calculated by observed pods
548+
for version, podMap := range versionedPodMap {
549+
status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], podMap, workloadReplicas)
550+
}
551+
552+
// Consider this case:
553+
// A Pod has been created and processed by webhook, but the Pod is not cached by controller.
554+
// We have to keep the subsetStatus for this version even though there is no Pod belonging to it.
555+
for version := range ws.Status.VersionedSubsetStatuses {
556+
if _, exist := versionedPodMap[version]; exist {
557+
continue
558+
}
559+
versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], nil, workloadReplicas)
560+
if !isEmptySubsetStatuses(versionSubsetStatues) {
561+
status.VersionedSubsetStatuses[version] = versionSubsetStatues
562+
}
563+
}
564+
565+
return &status, scheduleFailedPodMap
566+
}
567+
568+
func isEmptySubsetStatuses(statues []appsv1alpha1.WorkloadSpreadSubsetStatus) bool {
569+
replicas, creating, deleting := 0, 0, 0
570+
for _, subset := range statues {
571+
replicas += int(subset.Replicas)
572+
creating += len(subset.CreatingPods)
573+
deleting += len(subset.DeletingPods)
574+
}
575+
return replicas+creating+deleting == 0
576+
}
577+
578+
func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *appsv1alpha1.WorkloadSpread,
579+
oldSubsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podMap map[string][]*corev1.Pod, workloadReplicas int32,
580+
) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, map[string][]*corev1.Pod) {
581+
subsetStatuses := make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets))
516582
scheduleFailedPodMap := make(map[string][]*corev1.Pod)
517583

518584
// Using a map to restore name and old status of subset, because user could adjust the spec's subset sequence
519585
// to change priority of subset. We guarantee that operation and use subset name to distinguish which subset
520586
// from old status.
521-
oldSubsetStatuses := ws.Status.SubsetStatuses
522587
oldSubsetStatusMap := make(map[string]*appsv1alpha1.WorkloadSpreadSubsetStatus, len(oldSubsetStatuses))
523588
for i := range oldSubsetStatuses {
524589
oldSubsetStatusMap[oldSubsetStatuses[i].Name] = &oldSubsetStatuses[i]
@@ -557,10 +622,10 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1
557622
removeWorkloadSpreadSubsetCondition(subsetStatus, appsv1alpha1.SubsetSchedulable)
558623
}
559624

560-
status.SubsetStatuses[i] = *subsetStatus
625+
subsetStatuses[i] = *subsetStatus
561626
}
562627

563-
return &status, scheduleFailedPodMap
628+
return subsetStatuses, scheduleFailedPodMap
564629
}
565630

566631
// calculateWorkloadSpreadSubsetStatus returns the current subsetStatus for subset.
@@ -678,9 +743,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1
678743

679744
func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread,
680745
status *appsv1alpha1.WorkloadSpreadStatus) error {
681-
if status.ObservedGeneration == ws.Status.ObservedGeneration &&
682-
// status.ObservedWorkloadReplicas == ws.Status.ObservedWorkloadReplicas &&
683-
apiequality.Semantic.DeepEqual(status.SubsetStatuses, ws.Status.SubsetStatuses) {
746+
if apiequality.Semantic.DeepEqual(status, ws.Status) {
684747
return nil
685748
}
686749

0 commit comments

Comments
 (0)