diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 41d2457e84..a7e7abcc90 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -188,13 +188,8 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // From this point onwards we have to take into account if the poller // task was executed. // If so, we need to return a Result with EnqueueAfter set. - result := reconcile.Result{} - if repoPolled { - result = reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)} - result.RequeueAfter = addJitter(result.RequeueAfter) - } - res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled, result) + res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled) if err != nil || res.Requeue { return res, err } @@ -205,10 +200,10 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err != nil { logger.Error(err, "Reconcile failed final update to git repo status", "status", gitrepo.Status) - return result, err + return r.result(gitrepo), err } - return result, nil + return r.result(gitrepo), nil } // addJitter to the requeue time to avoid thundering herd @@ -222,7 +217,7 @@ func addJitter(d time.Duration) time.Duration { } // manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly -func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool, oldResult reconcile.Result) (reconcile.Result, error) { +func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool) (reconcile.Result, error) { name := types.NamespacedName{Namespace: gitrepo.Namespace, Name: gitrepo.Name} var job batchv1.Job err := r.Get(ctx, types.NamespacedName{ @@ -232,7 +227,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, if err != nil && !errors.IsNotFound(err) { err = fmt.Errorf("error retrieving git job: %w", err) r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedToGetGitJob", err.Error()) - return oldResult, err + return r.result(gitrepo), err } if errors.IsNotFound(err) { @@ -255,16 +250,16 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, r.updateGenerationValuesIfNeeded(gitrepo) if err := r.validateExternalSecretExist(ctx, gitrepo); err != nil { r.Recorder.Event(gitrepo, fleetevent.Warning, "FailedValidatingSecret", err.Error()) - return oldResult, updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) } if err := r.createJobAndResources(ctx, gitrepo, logger); err != nil { - return oldResult, err + return r.result(gitrepo), err } } } else if gitrepo.Status.Commit != "" && gitrepo.Status.Commit == oldCommit { err, recreateGitJob := r.deleteJobIfNeeded(ctx, gitrepo, &job) if err != nil { - return oldResult, fmt.Errorf("error deleting git job: %w", err) + return r.result(gitrepo), fmt.Errorf("error deleting git job: %w", err) } // job was deleted and we need to recreate it // Requeue so the reconciler creates the job again @@ -276,7 +271,7 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo.Status.ObservedGeneration = gitrepo.Generation if err = setStatusFromGitjob(ctx, r.Client, gitrepo, &job); err != nil { - return oldResult, updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) + return r.result(gitrepo), updateErrorStatus(ctx, r.Client, name, gitrepo.Status, err) } return reconcile.Result{}, nil @@ -1157,6 +1152,37 @@ func getPollingIntervalDuration(gitrepo *v1alpha1.GitRepo) time.Duration { return gitrepo.Spec.PollingInterval.Duration } +func (r *GitJobReconciler) result(gitrepo *v1alpha1.GitRepo) reconcile.Result { + // We always return a reconcile Result with RequeueAfter set to the polling interval + // unless polling is disabled. + // This is done to ensure the polling cycle is never broken due to race conditions + // between regular events and RequeueAfter events. + // Requeuing more events when there is already an event in the queue is not a problem + // because controller-runtime ignores events with higher timestamp + // For example, if we have an event in the queue that should be executed at time X + // and we try to enqueue another event that should be executed at time X+10 it will be + // dropped. + // If we try to enqueue an event at time X-10, it will replace the one in the queue. + // The queue will always keep the event that should be triggered earlier. + if gitrepo.Spec.DisablePolling { + return reconcile.Result{} + } + + // Calculate next reconciliation schedule based on the elapsed time since the last polling + // so it matches the configured polling interval. + // A fixed value may lead to drifts due to out-of-schedule reconciliations. + requeueAfter := getPollingIntervalDuration(gitrepo) - r.Clock.Since(gitrepo.Status.LastPollingTime.Time) + if requeueAfter <= 0 { + // This is a protection for cases in which the calculation above is 0 or less. + // In those cases controller-runtime does not call AddAfter for this object and + // the RequeueAfter cycle is lost. + // To ensure that this cycle is not broken we force the object to be requeued. + return reconcile.Result{Requeue: true} + } + requeueAfter = addJitter(requeueAfter) + return reconcile.Result{RequeueAfter: requeueAfter} +} + func webhookCommitChangedPredicate() predicate.Predicate { return predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool {