Skip to content

Commit

Permalink
improve replace: add finalizer for origin pod, improve list new pod, …
Browse files Browse the repository at this point in the history
…add some events (#239)

* finish-lifecycle-if-job-finished

* add finalizer for replace origin pod

* fix golint and e2e

* fix golint

* fix golint

* fix golint

* remove finalizer if oj is deleted or failed

* fix remove finalizer

* try to pass codecov

* remove redundant codes

* revert err handles
  • Loading branch information
ColdsteelRail authored Aug 5, 2024
1 parent d0e6766 commit ccae59b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
6 changes: 2 additions & 4 deletions pkg/controllers/operationjob/operationjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,8 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{}, err
}

if !ojutils.IsJobFinished(instance) {
if err := r.doReconcile(ctx, instance, logger); err != nil {
return reconcile.Result{}, err
}
if err := r.doReconcile(ctx, instance, logger); err != nil {
return reconcile.Result{}, err
}

jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ var _ = Describe("operationjob controller", func() {

Expect(c.Create(ctx, oj)).Should(BeNil())

assertJobProgressSucceeded(oj, time.Second*5)
assertJobProgressFailed(oj, time.Second*5)
})

It("[replace] parallel", func() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/operationjob/operationjob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
"kusionstack.io/operating/pkg/controllers/collaset/podcontrol"
. "kusionstack.io/operating/pkg/controllers/operationjob/opscore"
"kusionstack.io/operating/pkg/controllers/operationjob/replace"
ojutils "kusionstack.io/operating/pkg/controllers/operationjob/utils"
Expand All @@ -39,7 +38,7 @@ import (

// RegisterOperationJobActions register actions for operationJob
func RegisterOperationJobActions(c client.Client, scheme *runtime.Scheme) {
RegisterAction(appsv1alpha1.OpsActionReplace, &replace.PodReplaceHandler{PodControl: podcontrol.NewRealPodControl(c, scheme)}, false)
RegisterAction(appsv1alpha1.OpsActionReplace, &replace.PodReplaceHandler{}, false)
}

// getActionHandler get actions registered for operationJob
Expand Down
86 changes: 62 additions & 24 deletions pkg/controllers/operationjob/replace/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
"kusionstack.io/operating/pkg/controllers/collaset/podcontrol"
. "kusionstack.io/operating/pkg/controllers/operationjob/opscore"
controllerutils "kusionstack.io/operating/pkg/controllers/utils"
)

const (
OperationJobReplacePodFinalizer = "finalizer.operationjob.kusionstack.io/replace-protected"
)

var _ ActionHandler = &PodReplaceHandler{}

type PodReplaceHandler struct {
PodControl podcontrol.Interface
}
type PodReplaceHandler struct{}

func (p *PodReplaceHandler) Init(c client.Client, controller controller.Controller, _ *runtime.Scheme, _ cache.Cache) error {
// Watch for changes to replace new pods
Expand All @@ -54,6 +56,10 @@ func (p *PodReplaceHandler) Init(c client.Client, controller controller.Controll
}

func (p *PodReplaceHandler) OperateTarget(ctx context.Context, logger logr.Logger, recorder record.EventRecorder, c client.Client, candidate *OpsCandidate, operationJob *appsv1alpha1.OperationJob) error {
if candidate.Pod == nil {
return nil
}

// parse replace information from origin pod
_, replaceIndicated := candidate.Pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]
_, replaceByReplaceUpdate := candidate.Pod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]
Expand All @@ -63,9 +69,14 @@ func (p *PodReplaceHandler) OperateTarget(ctx context.Context, logger logr.Logge
replaceTriggered := replaceIndicated || replaceByReplaceUpdate || replaceNewPodExists
if !replaceTriggered {
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%v"}}}`, appsv1alpha1.PodReplaceIndicationLabelKey, true)))
// add finalizer on origin pod before trigger replace
if err := controllerutils.AddFinalizer(ctx, c, candidate.Pod, OperationJobReplacePodFinalizer); err != nil {
return fmt.Errorf("fail to add %s finalizer to origin pod %s/%s : %s", OperationJobReplacePodFinalizer, candidate.Pod.Namespace, candidate.Pod.Name, err.Error())
}
if err := c.Patch(ctx, candidate.Pod, patch); err != nil {
return fmt.Errorf("fail to label origin pod %s/%s with replace indicate label by replaceUpdate: %s", candidate.Pod.Namespace, candidate.Pod.Name, err)
}
recorder.Eventf(operationJob, corev1.EventTypeNormal, "ReplaceOriginPod", fmt.Sprintf("Succeeded to trigger originPod %s/%s to replace", operationJob.Namespace, candidate.Pod.Name))
}

return nil
Expand All @@ -78,32 +89,58 @@ func (p *PodReplaceHandler) GetOpsProgress(
progress = candidate.OpsStatus.Progress
reason = candidate.OpsStatus.Reason
message = candidate.OpsStatus.Message
// try to find replaceNewPod
if candidate.Pod != nil && candidate.CollaSet != nil {

if candidate.Pod != nil {
// mark ops status as processing if origin pod exists
progress = appsv1alpha1.OperationProgressProcessing
// try to find replaceNewPod
newPodId, exist := candidate.Pod.Labels[appsv1alpha1.PodReplacePairNewId]
if exist {
var filteredPods []*corev1.Pod
filteredPods, err = p.PodControl.GetFilteredPods(candidate.CollaSet.Spec.Selector, candidate.CollaSet)
if err != nil {
newPods := corev1.PodList{}
if err = c.List(ctx, &newPods, client.InNamespace(operationJob.Namespace), client.MatchingLabels{
appsv1alpha1.PodReplacePairOriginName: candidate.Pod.Name,
}); err != nil {
return
}
for _, newPod := range filteredPods {
if newPodId == newPod.Labels[appsv1alpha1.PodInstanceIDLabelKey] {
reason = appsv1alpha1.ReasonReplacedByNewPod
message = newPod.Name

for i := range newPods.Items {
newPod := newPods.Items[i]
// do not consider this pod as newPod if pair info don't match
if newPodId != newPod.Labels[appsv1alpha1.PodInstanceIDLabelKey] {
continue
}

// update ops status if newPod exists
reason = appsv1alpha1.ReasonReplacedByNewPod
message = newPod.Name
recorder.Eventf(operationJob, corev1.EventTypeNormal, "ReplaceNewPod", fmt.Sprintf("Succeeded to create newPod %s/%s for originPod %s/%s", operationJob.Namespace, newPod.Name, operationJob.Namespace, candidate.Pod.Name))
if _, serviceAvailable := newPod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable {
recorder.Eventf(operationJob, corev1.EventTypeNormal, "ReplaceNewPod", fmt.Sprintf("newPod %s/%s is serviceAvailable, ready to delete originPod %s", operationJob.Namespace, newPod.Name, candidate.Pod.Name))
}

// remove replace-protection finalizer from origin pod
if candidate.Pod.DeletionTimestamp != nil {
if removeErr := controllerutils.RemoveFinalizer(ctx, c, candidate.Pod, OperationJobReplacePodFinalizer); removeErr != nil {
err = fmt.Errorf("fail to add %s finalizer to origin pod %s/%s : %s", OperationJobReplacePodFinalizer, candidate.Pod.Namespace, candidate.Pod.Name, removeErr.Error())
}
}
return
}
}
}

// origin pod is deleted not exist, mark as succeeded
if candidate.Pod == nil {
progress = appsv1alpha1.OperationProgressSucceeded
if candidate.OpsStatus.Reason != appsv1alpha1.ReasonReplacedByNewPod {
} else {
if candidate.OpsStatus.Reason == appsv1alpha1.ReasonReplacedByNewPod {
newPod := &corev1.Pod{}
if getErr := c.Get(ctx, types.NamespacedName{Namespace: operationJob.Namespace, Name: message}, newPod); getErr != nil {
err = fmt.Errorf("fail to find replace newPod %s/%s : %s", operationJob.Namespace, message, getErr.Error())
return
}
// mark ops status as succeeded if origin pod is replaced
progress = appsv1alpha1.OperationProgressSucceeded
} else {
// mark ops status as failed if origin pod not found
progress = appsv1alpha1.OperationProgressFailed
reason = appsv1alpha1.ReasonPodNotFound
}
} else {
progress = appsv1alpha1.OperationProgressProcessing
}
return
}
Expand All @@ -128,8 +165,9 @@ func (p *PodReplaceHandler) ReleaseTarget(ctx context.Context, logger logr.Logge
return err
}

if err := p.PodControl.PatchPod(candidate.Pod, client.RawPatch(types.JSONPatchType, patchBytes)); err != nil {
return fmt.Errorf("failed to remove to-replace label %s/%s: %s", candidate.Pod.Namespace, candidate.Pod.Name, err)
if err := controllerutils.RemoveFinalizer(ctx, c, candidate.Pod, OperationJobReplacePodFinalizer); err != nil {
return fmt.Errorf("fail to add %s finalizer to origin pod %s/%s : %s", OperationJobReplacePodFinalizer, candidate.Pod.Namespace, candidate.Pod.Name, err.Error())
}
return nil

return c.Patch(ctx, candidate.Pod, client.RawPatch(types.JSONPatchType, patchBytes))
}
4 changes: 2 additions & 2 deletions test/e2e/apps/operationjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ var _ = SIGDescribe("OperationJob", func() {
Expect(ojTester.CreateOperationJob(oj)).NotTo(HaveOccurred())

By("Wait for replace OperationJob Succeeded")
Eventually(func() error { return ojTester.ExpectOperationJobProgress(oj, appsv1alpha1.OperationProgressSucceeded) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())
Eventually(func() error { return ojTester.ExpectOperationJobProgress(oj, appsv1alpha1.OperationProgressFailed) }, 30*time.Second, 3*time.Second).ShouldNot(HaveOccurred())

By("Check reason for Succeeded")
By("Check reason for Failed")
Expect(oj.Status.TargetDetails[0].Reason).To(Equal(appsv1alpha1.ReasonPodNotFound))
})

Expand Down

0 comments on commit ccae59b

Please sign in to comment.