Skip to content

Commit 8b1d3a9

Browse files
authored
refactor podUpdater interface (#166)
* 1. refactor podUpdater interface. 2. fix bug: replace pod by to-replace label, not check new pod status when pod upgrade policy is not replace update * remove some comments * refactor filter and lifecycle func by cr * fix unittests error * correct wrong rbac configuration for collaset_controller * format comments * change diffPod method receiver to pointer, and return false in replaceUpdate GetPodUpdateFinishStatus when replace new pair pod is not exist
1 parent 0ab85ee commit 8b1d3a9

File tree

4 files changed

+351
-216
lines changed

4 files changed

+351
-216
lines changed

pkg/controllers/collaset/collaset_controller_test.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ var _ = Describe("collaset controller", func() {
618618
}
619619
})
620620

621-
It("replace update reconcile by partition", func() {
621+
It("[replace update] reconcile by partition", func() {
622622
testcase := "test-replace-update-by-partition"
623623
Expect(createNamespace(c, testcase)).Should(BeNil())
624624

@@ -753,7 +753,7 @@ var _ = Describe("collaset controller", func() {
753753
Expect(inDeleteReplicas).Should(BeEquivalentTo(4))
754754
})
755755

756-
It("replace update reconcile by label", func() {
756+
It("[replace update] reconcile by label", func() {
757757
testcase := "test-replace-update-by-label"
758758
Expect(createNamespace(c, testcase)).Should(BeNil())
759759

@@ -989,7 +989,7 @@ var _ = Describe("collaset controller", func() {
989989

990990
// double check updated pod replicas
991991
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
992-
var replacePairNewId, newPodInstanceId string
992+
var replacePairNewId, newPodInstanceId, newCreatePodName string
993993
for _, pod := range podList.Items {
994994
Expect(pod.Labels).ShouldNot(BeNil())
995995
// replace by current revision
@@ -999,13 +999,40 @@ var _ = Describe("collaset controller", func() {
999999
replacePairNewId = pod.Labels[appsv1alpha1.PodReplacePairNewId]
10001000
Expect(replacePairNewId).ShouldNot(BeNil())
10011001
} else {
1002+
newCreatePodName = pod.Name
10021003
newPodInstanceId = pod.Labels[appsv1alpha1.PodInstanceIDLabelKey]
10031004
Expect(pod.Labels[appsv1alpha1.PodReplacePairOriginName]).Should(BeEquivalentTo(replacePod.Name))
10041005
}
10051006
}
10061007
Expect(replacePairNewId).ShouldNot(BeEquivalentTo(""))
10071008
Expect(newPodInstanceId).ShouldNot(BeEquivalentTo(""))
10081009
Expect(newPodInstanceId).Should(BeEquivalentTo(replacePairNewId))
1010+
Expect(newCreatePodName).ShouldNot(BeEquivalentTo(""))
1011+
1012+
Expect(updatePodWithRetry(c, replacePod.Namespace, newCreatePodName, func(pod *corev1.Pod) bool {
1013+
if pod.Labels == nil {
1014+
pod.Labels = map[string]string{}
1015+
}
1016+
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
1017+
return true
1018+
})).Should(BeNil())
1019+
1020+
Eventually(func() error {
1021+
// check updated pod replicas by CollaSet status
1022+
return expectedStatusReplicas(c, cs, 0, 0, 1, 2, 0, 0, 0, 0)
1023+
}, 30*time.Second, 1*time.Second).Should(BeNil())
1024+
1025+
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
1026+
for _, pod := range podList.Items {
1027+
Expect(pod.Labels).ShouldNot(BeNil())
1028+
// replace by current revision
1029+
Expect(pod.Labels[appsv1.ControllerRevisionHashLabelKey]).Should(BeEquivalentTo(cs.Status.CurrentRevision))
1030+
Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v1"))
1031+
if pod.Name == replacePod.Name {
1032+
_, exist := pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]
1033+
Expect(exist).Should(BeTrue())
1034+
}
1035+
}
10091036
})
10101037

10111038
It("replace update change to inplaceUpdate", func() {

pkg/controllers/collaset/synccontrol/sync_control.go

Lines changed: 42 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,14 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR
365365

366366
// pod is replace new created pod, skip replace
367367
if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
368-
if _, exist := podNameMap[originPodName]; !exist {
368+
// replace pair origin pod is not exist, clean label.
369+
if originPod, exist := podNameMap[originPodName]; !exist {
369370
needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairOriginName)
371+
} else if !replaceByUpdate {
372+
// not replace update, delete origin pod when new created pod is service available
373+
if _, serviceAvailable := pod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable {
374+
needDeletePods = append(needDeletePods, originPod)
375+
}
370376
}
371377
}
372378

@@ -690,143 +696,57 @@ func (r *RealSyncControl) Update(
690696
logger := r.logger.WithValues("collaset", commonutils.ObjectKeyString(cls))
691697
var recordedRequeueAfter *time.Duration
692698
// 1. scan and analysis pods update info
693-
podUpdateInfos, err := attachPodUpdateInfo(ctx, podWrappers, resources)
699+
podUpdateInfos, err := attachPodUpdateInfo(ctx, cls, podWrappers, resources)
694700
if err != nil {
695701
return false, nil, fmt.Errorf("fail to attach pod update info, %v", err)
696702
}
697-
for _, podInfo := range podUpdateInfos {
698-
// if template is updated, update pod by recreate
699-
podInfo.PvcTmpHashChanged, err = pvccontrol.IsPodPvcTmpChanged(cls, podInfo.PodWrapper.Pod, resources.ExistingPvcs)
700-
if err != nil {
701-
return false, nil, fmt.Errorf("fail to check pvc template changed, %v", err)
702-
}
703-
}
704703

705704
// 2. decide Pod update candidates
706705
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
707706
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
708-
updater := newPodUpdater(ctx, r.client, cls)
707+
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)
709708
updating := false
710-
analysedPod := sets.NewString()
711709

712-
if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
713-
// 3. prepare Pods to begin PodOpsLifecycle
714-
for i, podInfo := range podToUpdate {
715-
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
716-
continue
717-
}
718-
719-
if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
720-
continue
721-
}
722-
podCh <- podToUpdate[i]
710+
// 3. filter already updated revision,
711+
for i, podInfo := range podToUpdate {
712+
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
713+
continue
723714
}
724715

725-
// 4. begin podOpsLifecycle parallel
726-
727-
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
728-
podInfo := <-podCh
729-
// fulfill Pod update information
730-
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
731-
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
732-
}
733-
analysedPod.Insert(podInfo.Name)
734-
logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod))
735-
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
736-
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
737-
return podopslifecycle.WhenBeginDelete(obj)
738-
}
739-
return false, nil
740-
}); err != nil {
741-
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
742-
} else if updated {
743-
// add an expectation for this pod update, before next reconciling
744-
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
745-
return err
746-
}
747-
}
748-
return nil
749-
})
750-
751-
updating = updating || succCount > 0
752-
if err != nil {
753-
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
754-
return updating, nil, err
755-
} else {
756-
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
716+
// 3.1 fulfillPodUpdateInfo to all not updatedRevision pod
717+
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
718+
logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support", podInfo.Namespace, podInfo.Name))
719+
continue
757720
}
758721

759-
needUpdateContext := false
760-
for i := range podToUpdate {
761-
podInfo := podToUpdate[i]
762-
requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod)
763-
if !allowed {
764-
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod))
765-
continue
766-
}
767-
if requeueAfter != nil {
768-
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds())
769-
if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter {
770-
recordedRequeueAfter = requeueAfter
771-
}
772-
continue
773-
}
774-
775-
if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) {
776-
needUpdateContext = true
777-
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
778-
}
779-
if podInfo.PodDecorationChanged {
780-
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
781-
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
782-
needUpdateContext = true
783-
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
784-
}
785-
}
786-
787-
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
788-
continue
789-
}
790-
// if Pod has not been updated, update it.
791-
podCh <- podToUpdate[i]
722+
if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
723+
continue
792724
}
793725

794-
// 5. mark Pod to use updated revision before updating it.
795-
if needUpdateContext {
796-
logger.V(1).Info("try to update ResourceContext for CollaSet")
797-
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
798-
return podcontext.UpdateToPodContext(r.client, cls, ownedIDs)
799-
})
726+
podCh <- podToUpdate[i]
727+
}
800728

801-
if err != nil {
802-
collasetutils.AddOrUpdateCondition(resources.NewStatus,
803-
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
804-
fmt.Sprintf("fail to update Context for updating: %s", err))
805-
return updating, recordedRequeueAfter, err
806-
} else {
807-
collasetutils.AddOrUpdateCondition(resources.NewStatus,
808-
appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "")
809-
}
810-
}
729+
// 4. begin pod update lifecycle
730+
updating, err = updater.BeginUpdatePod(resources, podCh)
731+
if err != nil {
732+
return updating, recordedRequeueAfter, err
733+
}
734+
735+
// 5. filter pods not allow to ops now, such as OperationDelaySeconds strategy
736+
recordedRequeueAfter, err = updater.FilterAllowOpsPods(podToUpdate, ownedIDs, resources, podCh)
737+
if err != nil {
738+
collasetutils.AddOrUpdateCondition(resources.NewStatus,
739+
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
740+
fmt.Sprintf("fail to update Context for updating: %s", err))
741+
return updating, recordedRequeueAfter, err
811742
} else {
812-
for i, podInfo := range podToUpdate {
813-
// The pod is in a "replaceUpdate" state, always requires further update processing.
814-
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
815-
continue
816-
}
817-
podCh <- podToUpdate[i]
818-
}
743+
collasetutils.AddOrUpdateCondition(resources.NewStatus,
744+
appsv1alpha1.CollaSetScale, nil, "Updated", "")
819745
}
820746

821747
// 6. update Pod
822748
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
823749
podInfo := <-podCh
824-
if !analysedPod.Has(podInfo.Name) {
825-
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
826-
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
827-
}
828-
}
829-
830750
logger.V(1).Info("before pod update operation",
831751
"pod", commonutils.ObjectKeyString(podInfo.Pod),
832752
"revision.from", podInfo.CurrentRevision.Name,
@@ -835,41 +755,8 @@ func (r *RealSyncControl) Update(
835755
"onlyMetadataChanged", podInfo.OnlyMetadataChanged,
836756
)
837757

838-
if (podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport) && !podInfo.PvcTmpHashChanged {
839-
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
840-
if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil {
841-
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
842-
} else {
843-
podInfo.Pod = podInfo.UpdatedPod
844-
r.recorder.Eventf(podInfo.Pod,
845-
corev1.EventTypeNormal,
846-
"UpdatePod",
847-
"succeed to update Pod %s/%s to from revision %s to revision %s by in-place",
848-
podInfo.Namespace, podInfo.Name,
849-
podInfo.CurrentRevision.Name,
850-
resources.UpdatedRevision.Name)
851-
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil {
852-
return err
853-
}
854-
}
855-
} else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
856-
return nil
857-
} else {
858-
// 6.2 if pod has changes not in-place supported, recreate it
859-
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
860-
return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err)
861-
} else {
862-
r.recorder.Eventf(podInfo.Pod,
863-
corev1.EventTypeNormal,
864-
"UpdatePod",
865-
"succeed to update Pod %s/%s to from revision %s to revision %s by recreate",
866-
podInfo.Namespace,
867-
podInfo.Name,
868-
podInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
869-
if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pod, podInfo.Name); err != nil {
870-
return err
871-
}
872-
}
758+
if err = updater.UpgradePod(podInfo); err != nil {
759+
return err
873760
}
874761

875762
return nil
@@ -883,7 +770,7 @@ func (r *RealSyncControl) Update(
883770
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
884771
}
885772

886-
// try to finish all Pods'PodOpsLifecycle if its update is finished.
773+
// 7. try to finish all Pods'PodOpsLifecycle if its update is finished.
887774
succCount, err = controllerutils.SlowStartBatch(len(podUpdateInfos), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
888775
podInfo := podUpdateInfos[i]
889776

@@ -898,29 +785,9 @@ func (r *RealSyncControl) Update(
898785
}
899786

900787
if finished {
901-
if podInfo.isInReplacing {
902-
replacePairNewPodInfo := podInfo.replacePairNewPodInfo
903-
if replacePairNewPodInfo != nil {
904-
if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist {
905-
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
906-
if err = r.podControl.PatchPod(podInfo.Pod, patch); err != nil {
907-
return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err)
908-
}
909-
}
910-
}
911-
} else {
912-
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
913-
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
914-
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
915-
} else if updated {
916-
// add an expectation for this pod update, before next reconciling
917-
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
918-
return err
919-
}
920-
r.recorder.Eventf(podInfo.Pod,
921-
corev1.EventTypeNormal,
922-
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
923-
}
788+
err := updater.FinishUpdatePod(podInfo)
789+
if err != nil {
790+
return err
924791
}
925792
} else {
926793
r.recorder.Eventf(podInfo.Pod,

0 commit comments

Comments
 (0)