Skip to content
2 changes: 2 additions & 0 deletions internal/controller/postgrescluster/pgbackrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,8 @@ func (r *Reconciler) reconcileManualBackup(ctx context.Context,
})
backupJob.ObjectMeta.Labels = labels
backupJob.ObjectMeta.Annotations = annotations
// K8SPG-703
backupJob.Finalizers = []string{pNaming.FinalizerKeepJob}

// K8SPG-613
initImage, err := k8s.InitImage(ctx, r.Client, postgresCluster, &postgresCluster.Spec.Backups.PGBackRest)
Expand Down
40 changes: 34 additions & 6 deletions percona/controller/pgbackup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pgbackup
import (
"context"
"path"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -131,7 +132,7 @@ func (r *PGBackupReconciler) Reconcile(ctx context.Context, request reconcile.Re
}

// start backup only if backup job doesn't exist
_, err := findBackupJob(ctx, r.Client, pgCluster, pgBackup)
_, err := findBackupJob(ctx, r.Client, pgBackup)
if err != nil {
if !errors.Is(err, ErrBackupJobNotFound) {
return reconcile.Result{}, errors.Wrap(err, "find backup job")
Expand Down Expand Up @@ -184,7 +185,7 @@ func (r *PGBackupReconciler) Reconcile(ctx context.Context, request reconcile.Re
return reconcile.Result{}, errors.Errorf("PostgresCluster %s is not found", pgBackup.Spec.PGCluster)
}

job, err := findBackupJob(ctx, r.Client, pgCluster, pgBackup)
job, err := findBackupJob(ctx, r.Client, pgBackup)
if err != nil {
if errors.Is(err, ErrBackupJobNotFound) {
log.Info("Waiting for backup to start")
Expand Down Expand Up @@ -231,6 +232,15 @@ func (r *PGBackupReconciler) Reconcile(ctx context.Context, request reconcile.Re
job := &batchv1.Job{}
err := r.Client.Get(ctx, types.NamespacedName{Name: pgBackup.Status.JobName, Namespace: pgBackup.Namespace}, job)
if err != nil {
// If something has deleted the job even with the finalizer, we should fail the backup.
if k8serrors.IsNotFound(err) {
if err := updateStatus(ctx, r.Client, pgBackup, func(bcp *v2.PerconaPGBackup) {
bcp.Status.State = v2.BackupFailed
}); err != nil {
return reconcile.Result{}, errors.Wrap(err, "update PGBackup status")
}
return reconcile.Result{}, nil
}
return reconcile.Result{}, errors.Wrap(err, "get backup job")
}

Expand Down Expand Up @@ -265,6 +275,21 @@ func (r *PGBackupReconciler) Reconcile(ctx context.Context, request reconcile.Re

return reconcile.Result{}, nil
case v2.BackupSucceeded:
job, err := findBackupJob(ctx, r.Client, pgBackup)
if err == nil && slices.Contains(job.Finalizers, pNaming.FinalizerKeepJob) {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
j := new(batchv1.Job)
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(job), j); err != nil {
return errors.Wrap(err, "get job")
}
j.Finalizers = slices.DeleteFunc(j.Finalizers, func(s string) bool { return s == pNaming.FinalizerKeepJob })

return r.Client.Update(ctx, j)
}); err != nil {
return reconcile.Result{}, errors.Wrap(err, "update PGBackup status")
}
}

if pgCluster == nil {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -636,8 +661,11 @@ func startBackup(ctx context.Context, c client.Client, pb *v2.PerconaPGBackup) e
return nil
}

func findBackupJob(ctx context.Context, c client.Client, pg *v2.PerconaPGCluster, pb *v2.PerconaPGBackup) (*batchv1.Job, error) {
if jobName := pb.GetAnnotations()[pNaming.AnnotationPGBackrestBackupJobName]; jobName != "" {
func findBackupJob(ctx context.Context, c client.Client, pb *v2.PerconaPGBackup) (*batchv1.Job, error) {
if jobName := pb.GetAnnotations()[pNaming.AnnotationPGBackrestBackupJobName]; jobName != "" || pb.Status.JobName != "" {
if jobName == "" {
jobName = pb.Status.JobName
}
job := new(batchv1.Job)
err := c.Get(ctx, types.NamespacedName{Name: jobName, Namespace: pb.Namespace}, job)
if err != nil {
Expand All @@ -648,9 +676,9 @@ func findBackupJob(ctx context.Context, c client.Client, pg *v2.PerconaPGCluster

jobList := &batchv1.JobList{}
err := c.List(ctx, jobList,
client.InNamespace(pg.Namespace),
client.InNamespace(pb.Namespace),
client.MatchingLabelsSelector{
Selector: naming.PGBackRestBackupJobSelector(pg.Name, pb.Spec.RepoName, naming.BackupManual),
Selector: naming.PGBackRestBackupJobSelector(pb.Spec.PGCluster, pb.Spec.RepoName, naming.BackupManual),
})
if err != nil {
return nil, errors.Wrap(err, "get backup jobs")
Expand Down
4 changes: 3 additions & 1 deletion percona/controller/pgbackup/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func buildFakeClient(ctx context.Context, cr *v2.PerconaPGCluster, objs ...clien
objs = append(objs, dcs)

cl := new(fakeClient)
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).Build()
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).
WithIndex(new(v2.PerconaPGBackup), v2.IndexFieldPGCluster, v2.PGClusterIndexerFunc).
Build()

return cl, nil
}
Expand Down
17 changes: 11 additions & 6 deletions percona/controller/pgcluster/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -87,14 +88,18 @@ func (r *PGClusterReconciler) cleanupOutdatedBackups(ctx context.Context, cr *v2
// After the pg-backup is deleted, the job is not deleted immediately.
// We need to set the DeletionTimestamp for a job so that `reconcileBackupJob` doesn't create a new pg-backup before the job deletion.
job := new(batchv1.Job)
if err := r.Client.Get(ctx, types.NamespacedName{Name: pgBackup.Status.JobName, Namespace: pgBackup.Namespace}, job); err != nil {
err := r.Client.Get(ctx, types.NamespacedName{Name: pgBackup.Status.JobName, Namespace: pgBackup.Namespace}, job)
if client.IgnoreNotFound(err) != nil {
return errors.Wrap(err, "get backup job")
}
prop := metav1.DeletePropagationForeground
if err := r.Client.Delete(ctx, job, &client.DeleteOptions{
PropagationPolicy: &prop,
}); err != nil {
return errors.Wrapf(err, "delete job %s/%s", job.Name, job.Namespace)
// The job may be deleted earlier due to ttlSecondsAfterFinished
if !k8serrors.IsNotFound(err) {
prop := metav1.DeletePropagationForeground
if err := r.Client.Delete(ctx, job, &client.DeleteOptions{
PropagationPolicy: &prop,
}); err != nil {
return errors.Wrapf(err, "delete job %s/%s", job.Name, job.Namespace)
}
}
if err := r.Client.Delete(ctx, &pgBackup); err != nil {
return errors.Wrapf(err, "delete backup %s/%s", pgBackup.Name, pgBackup.Namespace)
Expand Down
4 changes: 2 additions & 2 deletions percona/controller/pgcluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ var _ = Describe("Watching secrets", Ordered, func() {
})

It("should reconcile 1 time", func() {
Eventually(func() int { return getReconcileCount(crunchyR) }, time.Second*15, time.Millisecond*250).
Eventually(func() int { return getReconcileCount(crunchyR) }, time.Second*20, time.Millisecond*250).
Should(Equal(reconcileCount + 1))
})

Expand All @@ -692,7 +692,7 @@ var _ = Describe("Watching secrets", Ordered, func() {
})

It("should reconcile 2 times", func() {
Eventually(func() int { return getReconcileCount(crunchyR) }, time.Second*15, time.Millisecond*250).
Eventually(func() int { return getReconcileCount(crunchyR) }, time.Second*20, time.Millisecond*250).
Should(Equal(reconcileCount + 2))
})
})
Expand Down
4 changes: 3 additions & 1 deletion percona/controller/pgcluster/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func buildFakeClient(ctx context.Context, cr *v2.PerconaPGCluster, objs ...clien
objs = append(objs, dcs)

cl := new(fakeClient)
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).Build()
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).
WithIndex(new(v2.PerconaPGBackup), v2.IndexFieldPGCluster, v2.PGClusterIndexerFunc).
Build()

return cl, nil
}
4 changes: 3 additions & 1 deletion percona/k8s/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func buildFakeClient(ctx context.Context, cr *v2.PerconaPGCluster, objs ...clien
objs = append(objs, dcs)

cl := new(fakeClient)
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).Build()
cl.Client = fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).WithStatusSubresource(objs...).
WithIndex(new(v2.PerconaPGBackup), v2.IndexFieldPGCluster, v2.PGClusterIndexerFunc).
Build()

return cl, nil
}
Expand Down
5 changes: 5 additions & 0 deletions percona/naming/finalizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ const (
const (
FinalizerDeleteBackup = PrefixPerconaInternal + "delete-backup" //nolint:gosec
)

// PerconaPGBackup job finalizers
const (
FinalizerKeepJob = PrefixPerconaInternal + "keep-job" //nolint:gosec
)
Loading