Skip to content

Commit

Permalink
Improve opj handler returned errors (#310)
Browse files Browse the repository at this point in the history
* release and operate return error map

* add some comment

* add some comment

* release failed targets with retry

* concurrent map write
  • Loading branch information
ColdsteelRail authored Feb 6, 2025
1 parent 2fa4811 commit 6b9b766
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 24 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/operationjob/operationjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{}, err
}

if err := r.ensureFailedTargetsReleased(ctx, instance, candidates); err != nil {
return reconcile.Result{}, err
}

err = r.doReconcile(ctx, instance, candidates)
return requeueResult(requeueAfter), err
}
Expand Down
50 changes: 36 additions & 14 deletions pkg/controllers/operationjob/operationjob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (r *ReconcileOperationJob) operateTargets(
if len(candidates) == 0 {
return nil
}
return operator.OperateTargets(ctx, candidates, operationJob)
errMap := operator.OperateTargets(ctx, candidates, operationJob)
return ctrlutils.AggregateErrors(ojutils.ConvertErrMapToList(errMap))
}

func (r *ReconcileOperationJob) getTargetsOpsStatus(
Expand Down Expand Up @@ -244,11 +245,10 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus(
// ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets
func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) {
if operationJob.Spec.ActiveDeadlineSeconds != nil {
var allowReleaseCandidates []*OpsCandidate
for i := range candidates {
candidate := candidates[i]
// just skip if target operation already finished, or not started
if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTime == nil {
// just skip if target not started
if candidate.OpsStatus.StartTime == nil {
continue
}
leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTime.Time)
Expand All @@ -257,17 +257,10 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context,
} else {
logger.Info("should end but still processing")
r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...")
// mark operationjob and targets failed and release targets
// mark target failed if timeout
MarkCandidateFailed(candidate)
allowReleaseCandidates = append(allowReleaseCandidates, candidate)
}
}
if len(allowReleaseCandidates) > 0 {
releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false)
operationJob.Status = r.calculateStatus(operationJob, candidates)
updateErr := r.updateStatus(ctx, operationJob)
return false, nil, controllerutils.AggregateErrors([]error{releaseErr, updateErr})
}
}

if operationJob.Spec.TTLSecondsAfterFinished != nil {
Expand All @@ -286,26 +279,55 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context,
return false, nil, nil
}

// ensureFailedTargetsReleased select failed but unreleased targets and call releaseTargets
func (r *ReconcileOperationJob) ensureFailedTargetsReleased(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error {
var allowReleaseCandidates []*OpsCandidate
for i := range candidates {
if IsCandidateOpsFailed(candidates[i]) && !IsCandidateOpsReleased(candidates[i]) {
allowReleaseCandidates = append(allowReleaseCandidates, candidates[i])
}
}
if len(allowReleaseCandidates) > 0 {
releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false)
operationJob.Status = r.calculateStatus(operationJob, candidates)
updateErr := r.updateStatus(ctx, operationJob)
return controllerutils.AggregateErrors([]error{releaseErr, updateErr})
}
return nil
}

// releaseTargets try to release the targets from operation when the operationJob is deleted
func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, needUpdateStatus bool) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob)
if err != nil {
return err
}
releaseErr := actionHandler.ReleaseTargets(ctx, candidates, operationJob)

// start to release targets
releaseErrMap := actionHandler.ReleaseTargets(ctx, candidates, operationJob)
_, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
candidate := candidates[i]
// cancel lifecycle if necessary
if enablePodOpsLifecycle {
err = r.cleanCandidateOpsLifecycle(ctx, true, candidate, operationJob)
releaseErr = controllerutils.AggregateErrors([]error{releaseErr, err})
releaseErrMap[candidate.PodName] = controllerutils.AggregateErrors([]error{releaseErrMap[candidate.PodName], err})
}
// mark candidate as failed if not finished
if !IsCandidateOpsFinished(candidate) {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
}
return nil
})

// mark target as released if error not occurred
for _, candidate := range candidates {
if releaseErrMap[candidate.PodName] == nil {
MarkCandidateReleased(candidate)
}
}
releaseErr := ctrlutils.AggregateErrors(ojutils.ConvertErrMapToList(releaseErrMap))

// update candidates status to job status
if !needUpdateStatus {
return releaseErr
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/controllers/operationjob/opscore/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type OpsCandidate struct {
OpsStatus *appsv1alpha1.OpsStatus
}

const (
ExtraInfoReleased = "Released"
)

func DecideCandidateByPartition(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) []*OpsCandidate {
if instance.Spec.Partition == nil {
return candidates
Expand Down Expand Up @@ -81,6 +85,23 @@ func IsCandidateOpsFinished(candidate *OpsCandidate) bool {
candidate.OpsStatus.Progress == appsv1alpha1.OperationProgressSucceeded
}

func IsCandidateOpsReleased(candidate *OpsCandidate) bool {
if candidate.OpsStatus == nil || candidate.OpsStatus.ExtraInfo == nil {
return false
}
if val, exist := candidate.OpsStatus.ExtraInfo[ExtraInfoReleased]; exist && val == "true" {
return true
}
return false
}

func IsCandidateOpsFailed(candidate *OpsCandidate) bool {
if candidate.OpsStatus == nil || candidate.OpsStatus.Progress == "" {
return false
}
return candidate.OpsStatus.Progress == appsv1alpha1.OperationProgressFailed
}

func IsCandidateServiceAvailable(candidate *OpsCandidate) bool {
if candidate.Pod == nil || candidate.Pod.Labels == nil {
return false
Expand All @@ -94,3 +115,13 @@ func MarkCandidateFailed(candidate *OpsCandidate) {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
}
}

func MarkCandidateReleased(candidate *OpsCandidate) {
if candidate.OpsStatus == nil {
candidate.OpsStatus = &appsv1alpha1.OpsStatus{}
}
if candidate.OpsStatus.ExtraInfo == nil {
candidate.OpsStatus.ExtraInfo = map[string]string{}
}
candidate.OpsStatus.ExtraInfo[ExtraInfoReleased] = "true"
}
8 changes: 4 additions & 4 deletions pkg/controllers/operationjob/opscore/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type ActionHandler interface {
// Setup sets up action with manager in AddToMgr, i.e., watch, cache...
Setup(controller.Controller, *mixin.ReconcilerMixin) error

// OperateTargets do real operation to targets
OperateTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) error
// OperateTargets do real operation to targets, and returns an error map to each target name
OperateTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) map[string]error

// GetOpsProgress returns target's current opsStatus, e.g., progress, reason, message
GetOpsProgress(context.Context, *OpsCandidate, *appsv1alpha1.OperationJob) (progress ActionProgress, err error)

// ReleaseTargets releases the target from operation when the operationJob is deleted
ReleaseTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) error
// ReleaseTargets releases the target from operation when failed, and returns an error map to each target name
ReleaseTargets(context.Context, []*OpsCandidate, *appsv1alpha1.OperationJob) map[string]error
}
22 changes: 16 additions & 6 deletions pkg/controllers/operationjob/replace/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package replace
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -62,9 +63,13 @@ func (p *PodReplaceHandler) Setup(controller controller.Controller, reconcileMix
return controller.Watch(&source.Kind{Type: &corev1.Pod{}}, &OriginPodHandler{Client: reconcileMixin.Client})
}

func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) error {
_, err := controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) map[string]error {
errMap := &sync.Map{}
_, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) (err error) {
candidate := candidates[i]
defer func() {
errMap.Store(candidate.PodName, err)
}()
if candidate.Pod == nil || candidate.Pod.Labels == nil {
return nil
}
Expand Down Expand Up @@ -99,7 +104,7 @@ func (p *PodReplaceHandler) OperateTargets(ctx context.Context, candidates []*Op
}
return nil
})
return err
return ojutils.ConvertSyncErrMap(errMap)
}

func (p *PodReplaceHandler) GetOpsProgress(ctx context.Context, candidate *OpsCandidate, operationJob *appsv1alpha1.OperationJob) (progress ActionProgress, err error) {
Expand Down Expand Up @@ -173,9 +178,13 @@ func (p *PodReplaceHandler) GetOpsProgress(ctx context.Context, candidate *OpsCa
return
}

func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) error {
_, err := controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*OpsCandidate, operationJob *appsv1alpha1.OperationJob) map[string]error {
errMap := &sync.Map{}
_, _ = controllerutils.SlowStartBatch(len(candidates), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) (err error) {
candidate := candidates[i]
defer func() {
errMap.Store(candidate.PodName, err)
}()
if candidate.Pod == nil || candidate.Pod.DeletionTimestamp != nil {
return nil
}
Expand All @@ -196,7 +205,8 @@ func (p *PodReplaceHandler) ReleaseTargets(ctx context.Context, candidates []*Op
ojutils.SetOpsStatusError(candidate, ojutils.ReasonUpdateObjectFailed, retErr.Error())
return retErr
}
candidate.OpsStatus.ExtraInfo[ExtraInfoReleased] = "true"
return nil
})
return err
return ojutils.ConvertSyncErrMap(errMap)
}
22 changes: 22 additions & 0 deletions pkg/controllers/operationjob/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package utils

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -89,3 +90,24 @@ func UpdatePodWithRetry(ctx context.Context, c client.Client, obj client.Object,
return c.Update(ctx, pod)
})
}

func ConvertErrMapToList(errMap map[string]error) []error {
var errList []error
for _, v := range errMap {
errList = append(errList, v)
}
return errList
}

func ConvertSyncErrMap(errMap *sync.Map) map[string]error {
ret := make(map[string]error)
errMap.Range(func(key, value any) bool {
if value == nil {
ret[key.(string)] = nil
} else {
ret[key.(string)] = value.(error)
}
return true
})
return ret
}

0 comments on commit 6b9b766

Please sign in to comment.