Skip to content

Commit 2871bb5

Browse files
committed
refactor filter and lifecycle func by cr
1 parent 1e1a072 commit 2871bb5

File tree

2 files changed

+75
-171
lines changed

2 files changed

+75
-171
lines changed

pkg/controllers/collaset/synccontrol/sync_control.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -701,12 +701,13 @@ func (r *RealSyncControl) Update(
701701
return false, nil, fmt.Errorf("fail to attach pod update info, %v", err)
702702
}
703703

704-
// 3. decide Pod update candidates
704+
// 2. decide Pod update candidates
705705
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
706706
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
707707
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)
708708
updating := false
709709

710+
// 3. filter already updated revision,
710711
for i, podInfo := range podToUpdate {
711712
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
712713
continue
@@ -721,14 +722,15 @@ func (r *RealSyncControl) Update(
721722
if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
722723
continue
723724
}
725+
724726
podCh <- podToUpdate[i]
725727
}
726728

727-
updating, err = updater.BeginUpdate(resources, podCh)
729+
updating, err = updater.BeginUpdatePod(resources, podCh)
728730
if err != nil {
729731
return updating, recordedRequeueAfter, err
730732
}
731-
recordedRequeueAfter, err = updater.FilterAllowOpsPodsAndUpdatePodContext(podToUpdate, ownedIDs, resources, podCh)
733+
recordedRequeueAfter, err = updater.FilterAllowOpsPods(podToUpdate, ownedIDs, resources, podCh)
732734
if err != nil {
733735
collasetutils.AddOrUpdateCondition(resources.NewStatus,
734736
appsv1alpha1.CollaSetScale, err, "UpdateFailed",

pkg/controllers/collaset/synccontrol/update.go

Lines changed: 70 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -268,47 +268,23 @@ func (o orderByDefault) Less(i, j int) bool {
268268
}
269269

270270
type PodUpdater interface {
271-
BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error)
272-
FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error)
273271
FulfillPodUpdatedInfo(revision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error
272+
BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error)
273+
FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error)
274274
UpgradePod(podInfo *PodUpdateInfo) error
275275
GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (bool, string, error)
276276
FinishUpdatePod(podInfo *PodUpdateInfo) error
277277
}
278278

279-
func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet, podControl podcontrol.Interface, recorder record.EventRecorder) PodUpdater {
280-
switch cls.Spec.UpdateStrategy.PodUpdatePolicy {
281-
case appsv1alpha1.CollaSetRecreatePodUpdateStrategyType:
282-
return &recreatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder}
283-
case appsv1alpha1.CollaSetInPlaceOnlyPodUpdateStrategyType:
284-
// In case of using native K8s, Pod is only allowed to update with container image, so InPlaceOnly policy is
285-
// implemented with InPlaceIfPossible policy as default for compatibility.
286-
return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client}
287-
case appsv1alpha1.CollaSetReplacePodUpdateStrategyType:
288-
return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder}
289-
default:
290-
return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder}
291-
}
292-
}
293-
294-
type PodStatus struct {
295-
ContainerStates map[string]*ContainerStatus `json:"containerStates,omitempty"`
296-
}
297-
298-
type ContainerStatus struct {
299-
LatestImage string `json:"latestImage,omitempty"`
300-
LastImageID string `json:"lastImageID,omitempty"`
301-
}
302-
303-
type inPlaceIfPossibleUpdater struct {
279+
type GenericPodUpdater struct {
304280
collaSet *appsv1alpha1.CollaSet
305281
ctx context.Context
306282
podControl podcontrol.Interface
307283
recorder record.EventRecorder
308284
client.Client
309285
}
310286

311-
func (u *inPlaceIfPossibleUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) {
287+
func (u *GenericPodUpdater) BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) {
312288
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
313289
podInfo := <-podCh
314290
u.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet")
@@ -339,7 +315,7 @@ func (u *inPlaceIfPossibleUpdater) BeginUpdate(resources *collasetutils.RelatedR
339315
return updating, nil
340316
}
341317

342-
func (u *inPlaceIfPossibleUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) {
318+
func (u *GenericPodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) {
343319
var recordedRequeueAfter *time.Duration
344320
needUpdateContext := false
345321
for i := range podToUpdate {
@@ -386,6 +362,56 @@ func (u *inPlaceIfPossibleUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUp
386362
return recordedRequeueAfter, nil
387363
}
388364

365+
func (u *GenericPodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error {
366+
//u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
367+
if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
368+
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
369+
} else if updated {
370+
// add an expectation for this pod update, before next reconciling
371+
if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
372+
return err
373+
}
374+
u.recorder.Eventf(podInfo.Pod,
375+
corev1.EventTypeNormal,
376+
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
377+
}
378+
return nil
379+
}
380+
381+
func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet, podControl podcontrol.Interface, recorder record.EventRecorder) PodUpdater {
382+
genericPodUpdater := &GenericPodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder}
383+
switch cls.Spec.UpdateStrategy.PodUpdatePolicy {
384+
case appsv1alpha1.CollaSetRecreatePodUpdateStrategyType:
385+
return &recreatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder, GenericPodUpdater: *genericPodUpdater}
386+
case appsv1alpha1.CollaSetInPlaceOnlyPodUpdateStrategyType:
387+
// In case of using native K8s, Pod is only allowed to update with container image, so InPlaceOnly policy is
388+
// implemented with InPlaceIfPossible policy as default for compatibility.
389+
return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client}
390+
case appsv1alpha1.CollaSetReplacePodUpdateStrategyType:
391+
return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder}
392+
default:
393+
return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder, GenericPodUpdater: *genericPodUpdater}
394+
}
395+
}
396+
397+
type PodStatus struct {
398+
ContainerStates map[string]*ContainerStatus `json:"containerStates,omitempty"`
399+
}
400+
401+
type ContainerStatus struct {
402+
LatestImage string `json:"latestImage,omitempty"`
403+
LastImageID string `json:"lastImageID,omitempty"`
404+
}
405+
406+
type inPlaceIfPossibleUpdater struct {
407+
collaSet *appsv1alpha1.CollaSet
408+
ctx context.Context
409+
podControl podcontrol.Interface
410+
recorder record.EventRecorder
411+
GenericPodUpdater
412+
client.Client
413+
}
414+
389415
func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo(
390416
updatedRevision *appsv1.ControllerRevision,
391417
podUpdateInfo *PodUpdateInfo) error {
@@ -605,22 +631,6 @@ func (u *inPlaceIfPossibleUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUp
605631
return true, "", nil
606632
}
607633

608-
func (u *inPlaceIfPossibleUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error {
609-
//u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
610-
if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
611-
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
612-
} else if updated {
613-
// add an expectation for this pod update, before next reconciling
614-
if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
615-
return err
616-
}
617-
u.recorder.Eventf(podInfo.Pod,
618-
corev1.EventTypeNormal,
619-
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
620-
}
621-
return nil
622-
}
623-
624634
// TODO
625635
type inPlaceOnlyPodUpdater struct {
626636
}
@@ -650,87 +660,10 @@ type recreatePodUpdater struct {
650660
ctx context.Context
651661
podControl podcontrol.Interface
652662
recorder record.EventRecorder
663+
GenericPodUpdater
653664
client.Client
654665
}
655666

656-
func (u *recreatePodUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) {
657-
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
658-
podInfo := <-podCh
659-
u.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet")
660-
if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
661-
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
662-
return podopslifecycle.WhenBeginDelete(obj)
663-
}
664-
return false, nil
665-
}); err != nil {
666-
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
667-
} else if updated {
668-
// add an expectation for this pod update, before next reconciling
669-
if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
670-
return err
671-
}
672-
}
673-
674-
return nil
675-
})
676-
677-
updating := succCount > 0
678-
if err != nil {
679-
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
680-
return updating, err
681-
} else {
682-
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
683-
}
684-
return updating, nil
685-
}
686-
687-
func (u *recreatePodUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) {
688-
var recordedRequeueAfter *time.Duration
689-
needUpdateContext := false
690-
for i := range podToUpdate {
691-
podInfo := podToUpdate[i]
692-
requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(u.collaSet.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod)
693-
if !allowed {
694-
u.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod))
695-
continue
696-
}
697-
if requeueAfter != nil {
698-
u.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds())
699-
if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter {
700-
recordedRequeueAfter = requeueAfter
701-
}
702-
continue
703-
}
704-
705-
if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) {
706-
needUpdateContext = true
707-
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
708-
}
709-
if podInfo.PodDecorationChanged {
710-
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
711-
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
712-
needUpdateContext = true
713-
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
714-
}
715-
}
716-
717-
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
718-
continue
719-
}
720-
// if Pod has not been updated, update it.
721-
podCh <- podToUpdate[i]
722-
}
723-
// 4. mark Pod to use updated revision before updating it.
724-
if needUpdateContext {
725-
u.recorder.Eventf(u.collaSet, corev1.EventTypeNormal, "UpdateToPodContext", "try to update ResourceContext for CollaSet")
726-
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
727-
return podcontext.UpdateToPodContext(u.Client, u.collaSet, ownedIDs)
728-
})
729-
return recordedRequeueAfter, err
730-
}
731-
return recordedRequeueAfter, nil
732-
}
733-
734667
func (u *recreatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) error {
735668
return nil
736669
}
@@ -744,21 +677,6 @@ func (u *recreatePodUpdater) GetPodUpdateFinishStatus(podInfo *PodUpdateInfo) (f
744677
return podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged, "", nil
745678
}
746679

747-
func (u *recreatePodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error {
748-
if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
749-
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
750-
} else if updated {
751-
// add an expectation for this pod update, before next reconciling
752-
if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
753-
return err
754-
}
755-
u.recorder.Eventf(podInfo.Pod,
756-
corev1.EventTypeNormal,
757-
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
758-
}
759-
return nil
760-
}
761-
762680
type replaceUpdatePodUpdater struct {
763681
collaSet *appsv1alpha1.CollaSet
764682
ctx context.Context
@@ -767,7 +685,7 @@ type replaceUpdatePodUpdater struct {
767685
client.Client
768686
}
769687

770-
func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (updating bool, err error) {
688+
func (u *replaceUpdatePodUpdater) BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) {
771689
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
772690
podInfo := <-podCh
773691
if podInfo.replacePairNewPodInfo != nil {
@@ -785,10 +703,10 @@ func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedRe
785703
newPodRevision,
786704
resources.UpdatedRevision.Name)
787705
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
788-
if err = u.Patch(u.ctx, podInfo.replacePairNewPodInfo.Pod, patch); err != nil {
789-
err = fmt.Errorf("failed to delete replace pair new pod %s/%s %s",
790-
podInfo.replacePairNewPodInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err)
791-
return nil
706+
if patchErr := u.Patch(u.ctx, podInfo.replacePairNewPodInfo.Pod, patch); patchErr != nil {
707+
err := fmt.Errorf("failed to delete replace pair new pod %s/%s %s",
708+
podInfo.replacePairNewPodInfo.Namespace, podInfo.replacePairNewPodInfo.Name, patchErr)
709+
return err
792710
}
793711
}
794712
return nil
@@ -797,34 +715,18 @@ func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedRe
797715
return succCount > 0, err
798716
}
799717

800-
func (u *replaceUpdatePodUpdater) FilterAllowOpsPodsAndUpdatePodContext(_ []*PodUpdateInfo, _ map[int]*appsv1alpha1.ContextDetail, _ *collasetutils.RelatedResources, _ chan *PodUpdateInfo) (requeueAfter *time.Duration, err error) {
801-
return
718+
func (u *replaceUpdatePodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, _ map[int]*appsv1alpha1.ContextDetail, _ *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (requeueAfter *time.Duration, err error) {
719+
for i, podInfo := range podToUpdate {
720+
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
721+
continue
722+
}
723+
724+
podCh <- podToUpdate[i]
725+
}
726+
return nil, err
802727
}
803728

804729
func (u *replaceUpdatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (err error) {
805-
//// when replaceUpdate, inPlaceUpdateSupport and onlyMetadataChanged always false
806-
//// judge replace pair new pod is updated revision, if not, delete.
807-
//if podUpdateInfo.replacePairNewPodInfo != nil {
808-
// replacePairNewPod := podUpdateInfo.replacePairNewPodInfo.Pod
809-
// newPodRevision, exist := replacePairNewPod.Labels[appsv1.ControllerRevisionHashLabelKey]
810-
// if exist && newPodRevision == updatedRevision.Name {
811-
// return
812-
// }
813-
// u.recorder.Eventf(podUpdateInfo.Pod,
814-
// corev1.EventTypeNormal,
815-
// "ReplaceUpdatePod",
816-
// "label to-delete on new pair pod %s/%s because it is not updated revision, current revision: %s, updated revision: %s",
817-
// replacePairNewPod.Namespace,
818-
// replacePairNewPod.Name,
819-
// newPodRevision,
820-
// updatedRevision.Name)
821-
// patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
822-
// if err = u.Patch(u.ctx, podUpdateInfo.replacePairNewPodInfo.Pod, patch); err != nil {
823-
// err = fmt.Errorf("failed to delete replace pair new pod %s/%s %s",
824-
// podUpdateInfo.replacePairNewPodInfo.Namespace, podUpdateInfo.replacePairNewPodInfo.Name, err)
825-
// return
826-
// }
827-
//}
828730
return
829731
}
830732

0 commit comments

Comments
 (0)