Skip to content

Commit

Permalink
recipe: refactoring essential hook check
Browse files Browse the repository at this point in the history
Signed-off-by: Annaraya Narasagond <[email protected]>
  • Loading branch information
asn1809 committed Feb 3, 2025
1 parent 9cda486 commit c91198f
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 123 deletions.
266 changes: 150 additions & 116 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (v *VRGInstance) kubeObjectsCaptureStartOrResumeOrDelay(
v.kubeObjectsCaptureStartOrResume(result,
captureStartConditionally,
captureInProgressStatusUpdate,
number, pathName, capturePathName, namePrefix, veleroNamespaceName, interval, labels,
number, pathName, capturePathName, namePrefix, veleroNamespaceName, interval,
generation,
kubeobjects.RequestsMapKeyedByName(requests),
log,
Expand Down Expand Up @@ -236,91 +236,100 @@ func (v *VRGInstance) kubeObjectsCaptureStartOrResume(
captureNumber int64,
pathName, capturePathName, namePrefix, veleroNamespaceName string,
interval time.Duration,
labels map[string]string,
generation int64,
requests map[string]kubeobjects.Request,
log logr.Logger,
) {
groups := v.recipeElements.CaptureWorkflow
captureSteps := v.recipeElements.CaptureWorkflow

annotations := map[string]string{vrgGenerationKey: strconv.FormatInt(generation, vrgGenerationNumberBase)}
labels := util.OwnerLabels(v.instance)

allEssentialStepsFailed, err := v.executeCaptureSteps(result, pathName, capturePathName, namePrefix,
veleroNamespaceName, captureInProgressStatusUpdate, annotations, requests, log)
if err != nil {
return
}

if allEssentialStepsFailed {
v.kubeObjectsCaptureFailed("KubeObjectsCaptureError", "Kube objects capture failed")

return
}

firstRequest := getFirstRequest(captureSteps, requests, namePrefix, v.s3StoreAccessors[0].S3ProfileName)
v.kubeObjectsCaptureComplete(
result,
captureStartConditionally,
captureNumber,
veleroNamespaceName,
interval,
labels,
firstRequest.StartTime(),
firstRequest.Object().GetAnnotations(),
)
}

//nolint:gocognit
func (v *VRGInstance) executeCaptureSteps(result *ctrl.Result, pathName, capturePathName, namePrefix,
veleroNamespaceName string, captureInProgressStatusUpdate captureInProgressStatusUpdate,
annotations map[string]string, requests map[string]kubeobjects.Request, log logr.Logger,
) (bool, error) {
captureSteps := v.recipeElements.CaptureWorkflow
failOn := v.recipeElements.CaptureFailOn
allEssentialStepsFailed := true
requestsProcessedCount := 0
requestsCompletedCount := 0
annotations := map[string]string{vrgGenerationKey: strconv.FormatInt(generation, vrgGenerationNumberBase)}
//executionResults := make(map[string]bool)
var workflowResult = true
labels := util.OwnerLabels(v.instance)

for groupNumber, captureGroup := range captureSteps {
var err error

var loopCount int

for groupNumber, captureGroup := range groups {
cg := captureGroup
log1 := log.WithValues("group", groupNumber, "name", cg.Name)
isEssentialStep := cg.GroupEssential != nil && *cg.GroupEssential

if cg.IsHook {
if err := v.executeHook(cg.Hook, log1); err != nil {

if failOn == anyError {
v.kubeObjectsCaptureStatusFalse("KubeObjectsHookExecutionError", err.Error())
break
} else if failOn == essentialError {
if cg.Hook.Essential != nil && *cg.Hook.Essential {
v.kubeObjectsCaptureStatusFalse("KubeObjectsHookExecutionError", err.Error())
break
}
} else if failOn == fullError {
if cg.Hook.Essential != nil && *cg.Hook.Essential {
workflowResult = false
}
}
}
} else {
requestsCompletedCount += v.kubeObjectsGroupCapture(
err = v.executeHook(cg.Hook, log1)
}

if !cg.IsHook {
loopCount, err = v.kubeObjectsGroupCapture(
result, cg, pathName, capturePathName, namePrefix, veleroNamespaceName,
captureInProgressStatusUpdate,
labels, annotations, requests, log,
)
// result.Requeue true could be used to determine if error has occured or not
if result.Requeue {
if failOn == anyError {
// mark as backup failed
v.kubeObjectsCaptureStatusFalse("KubeObjectsCaptureError", fmt.Errorf(
"kube objects group capture failed").Error())
break
} else if failOn == essentialError {
if cg.Spec.KubeResourcesSpec.GroupEssential != nil && *cg.Spec.KubeResourcesSpec.GroupEssential {
v.kubeObjectsCaptureStatusFalse("KubeObjectsCaptureError", fmt.Errorf(
"kube objects group capture failed").Error())
break
}
} else if failOn == fullError {
if cg.Spec.KubeResourcesSpec.GroupEssential != nil && *cg.Spec.KubeResourcesSpec.GroupEssential {
workflowResult = false
}
}
}
requestsCompletedCount += loopCount
}

requestsProcessedCount += len(v.s3StoreAccessors)
if requestsCompletedCount < requestsProcessedCount {
log1.Info("Kube objects group capturing", "complete", requestsCompletedCount, "total", requestsProcessedCount)
if err != nil {
if shouldStopExecution(failOn, isEssentialStep) {
v.kubeObjectsCaptureFailed("KubeObjectsWorkflowError", err.Error())

return
return false, err
}

allEssentialStepsFailed = allEssentialStepsFailed && isEssentialStep

continue
}
}

if !workflowResult {
v.kubeObjectsCaptureFailed("KubeObjectsCaptureError", "Kube objects capture failed")
return
if isEssentialStep {
// shows that at least one essential step has succeeded
allEssentialStepsFailed = false
}

requestsProcessedCount += len(v.s3StoreAccessors)
if requestsCompletedCount < requestsProcessedCount {
log1.Info("Kube objects group capturing", "complete", requestsCompletedCount, "total", requestsProcessedCount)

return allEssentialStepsFailed, fmt.Errorf("kube objects group capturing incomplete")
}
}

firstRequest := getFirstRequest(groups, requests, namePrefix, v.s3StoreAccessors[0].S3ProfileName)
v.kubeObjectsCaptureComplete(
result,
captureStartConditionally,
captureNumber,
veleroNamespaceName,
interval,
labels,
firstRequest.StartTime(),
firstRequest.Object().GetAnnotations(),
)
return allEssentialStepsFailed, nil
}

func (v *VRGInstance) executeHook(hook kubeobjects.HookSpec, log1 logr.Logger) error {
Expand Down Expand Up @@ -365,7 +374,7 @@ func (v *VRGInstance) kubeObjectsGroupCapture(
captureInProgressStatusUpdate captureInProgressStatusUpdate,
labels, annotations map[string]string, requests map[string]kubeobjects.Request,
log logr.Logger,
) (requestsCompletedCount int) {
) (requestsCompletedCount int, reqErr error) {
for _, s3StoreAccessor := range v.s3StoreAccessors {
requestName := kubeObjectsCaptureName(namePrefix, captureGroup.Name, s3StoreAccessor.S3ProfileName)
log1 := log.WithValues("profile", s3StoreAccessor.S3ProfileName)
Expand All @@ -382,10 +391,12 @@ func (v *VRGInstance) kubeObjectsGroupCapture(
log1.Error(err, "Kube objects group capture request submit error")

result.Requeue = true
reqErr = fmt.Errorf("kube objects group capture error: %v", err)

continue
}

captureInProgressStatusUpdate()
log1.Info("Kube objects group capture request submitted")
} else {
err := request.Status(v.log)
Expand All @@ -409,14 +420,13 @@ func (v *VRGInstance) kubeObjectsGroupCapture(
v.kubeObjectsCaptureStatusFalse("KubeObjectsCaptureError", err.Error())

result.Requeue = true
reqErr = fmt.Errorf("kube objects group capture error: %v", err)

return
continue
}

captureInProgressStatusUpdate()
}

return requestsCompletedCount
return requestsCompletedCount, reqErr
}

func (v *VRGInstance) kubeObjectsCaptureAndCaptureRequestDelete(
Expand Down Expand Up @@ -717,8 +727,6 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
log logr.Logger,
) error {
labels := util.OwnerLabels(v.instance)

captureRequests, err := v.getCaptureRequests()
if err != nil {
return err
Expand All @@ -729,64 +737,91 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
return err
}

groups := v.recipeElements.RecoverWorkflow
requests := make([]kubeobjects.Request, len(groups))
failOn := v.recipeElements.RestoreFailOn
workflowResult := true
steps := v.recipeElements.RecoverWorkflow
requests := make([]kubeobjects.Request, len(steps))
labels := util.OwnerLabels(v.instance)

s3StoreAccessor, err := v.findS3StoreAccessor(s3ProfileName)
if err != nil {
return fmt.Errorf("kube objects recovery couldn't build s3StoreAccessor: %v", err)
}

for groupNumber, recoverGroup := range groups {
allEssentialStepsFailed, err := v.executeRecoverSteps(result, s3StoreAccessor, captureToRecoverFromIdentifier,
captureRequests, recoverRequests, requests, log)
if err != nil {
return err
}

if allEssentialStepsFailed {
return fmt.Errorf("workflow execution failed during restore")
}

startTime := getRequestsStartTime(requests)
duration := time.Since(startTime.Time)
log.Info("Kube objects recovered", "groups", len(steps), "start", startTime, "duration", duration)

return v.kubeObjectsRecoverRequestsDelete(result, v.veleroNamespaceName(), labels)
}

func (v *VRGInstance) executeRecoverSteps(result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier, captureRequests,
recoverRequests map[string]kubeobjects.Request, requests []kubeobjects.Request, log logr.Logger,
) (bool, error) {
failOn := v.recipeElements.RestoreFailOn
allEssentialStepsFailed := false
labels := util.OwnerLabels(v.instance)

recoverSteps := v.recipeElements.RecoverWorkflow
for groupNumber, recoverGroup := range recoverSteps {
var err error

rg := recoverGroup
log1 := log.WithValues("group", groupNumber, "name", rg.BackupName)
isEssentialStep := rg.GroupEssential != nil && *rg.GroupEssential

if rg.IsHook {
if err := v.executeHook(rg.Hook, log1); err != nil {
if failOn == anyError {
return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err)
} else if failOn == essentialError {
if rg.Hook.Essential != nil && *rg.Hook.Essential {
return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err)
}
} else if failOn == fullError {
if rg.Hook.Essential != nil && *rg.Hook.Essential {
workflowResult = false
}
}
err = v.executeHook(rg.Hook, log1)
}

}
} else {
if err := v.executeRecoverGroup(result, s3StoreAccessor,
if !rg.IsHook {
err = v.executeRecoverGroup(result, s3StoreAccessor,
captureToRecoverFromIdentifier, captureRequests,
recoverRequests, labels, groupNumber, rg,
requests, log1); err != nil {
if failOn == anyError {
return err
} else if failOn == essentialError {
if rg.GroupEssential != nil && *rg.GroupEssential {
return err
}
} else if failOn == fullError {
if rg.GroupEssential != nil && *rg.GroupEssential {
workflowResult = false
}
}
requests, log1)
}

if err != nil {
if shouldStopExecution(failOn, isEssentialStep) {
return false, err
}

allEssentialStepsFailed = allEssentialStepsFailed && isEssentialStep

continue
}
}

if !workflowResult {
return fmt.Errorf("workflow execution failed during restore")
if isEssentialStep {
// shows that at least one essential step has succeeded
allEssentialStepsFailed = false
}
}

startTime := getRequestsStartTime(requests)
duration := time.Since(startTime.Time)
log.Info("Kube objects recovered", "groups", len(groups), "start", startTime, "duration", duration)
return allEssentialStepsFailed, nil
}

return v.kubeObjectsRecoverRequestsDelete(result, v.veleroNamespaceName(), labels)
// function considers failOn and essential parameters and returns
// stopExecution: should further execution be stopped
func shouldStopExecution(failOn string, essential bool) bool {
switch failOn {
case WorkflowAnyError:
return true
case WorkflowEssentialError:
return essential
case WorkflowFullError:
return false
}

return false
}

func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
Expand Down Expand Up @@ -907,14 +942,14 @@ func kubeObjectsRequestsWatch(
return b
}

func getCaptureGroups(recipe Recipe.Recipe) ([]kubeobjects.CaptureSpec, error) {
func getCaptureGroups(recipe Recipe.Recipe) ([]kubeobjects.CaptureSpec, string, error) {
workflow, err := getBackupWorkflow(recipe)
if err != nil {
return nil, err
return nil, "", err
}

if err := validateWorkflow(workflow); err != nil {
return nil, err
return nil, "", err
}

resources := make([]kubeobjects.CaptureSpec, len(workflow.Sequence))
Expand All @@ -939,14 +974,14 @@ func getCaptureGroups(recipe Recipe.Recipe) ([]kubeobjects.CaptureSpec, error) {
return resources, workflow.FailOn, nil
}

func getRecoverGroups(recipe Recipe.Recipe) ([]kubeobjects.RecoverSpec, error) {
func getRecoverGroups(recipe Recipe.Recipe) ([]kubeobjects.RecoverSpec, string, error) {
workflow, err := getRestoreWorkflow(recipe)
if err != nil {
return nil, err
return nil, "", err
}

if err := validateWorkflow(workflow); err != nil {
return nil, err
return nil, "", err
}

resources := make([]kubeobjects.RecoverSpec, len(workflow.Sequence))
Expand Down Expand Up @@ -1155,7 +1190,6 @@ func getChkHookSpec(hook *Recipe.Hook, suffix string) kubeobjects.HookSpec {
func getOpHookSpec(hook *Recipe.Hook, suffix string) kubeobjects.HookSpec {
for _, op := range hook.Ops {
if op.Name == suffix {

return kubeobjects.HookSpec{
Name: hook.Name,
Namespace: hook.Namespace,
Expand Down
Loading

0 comments on commit c91198f

Please sign in to comment.