Skip to content

Commit

Permalink
vrg: Add Kube object restore condition
Browse files Browse the repository at this point in the history
Earlier, we used to restore kube objects along with PV and PVCs.

However, with the introduction of hooks this is not correct. The hooks
depend on the successful restoration of the PV and PVCs to succeed.
There is a strict dependency in the order of operations and both cannot
be performed in the same function call.

We introduce a new condition to keep track of the restore status for
Kube Objects. Once the pv and pvcs are restored then we proceed to the
kube objects restoration.

Co-Authored-by: Annaraya Narasagond <[email protected]>
Signed-off-by: Raghavendra Talur <[email protected]>
  • Loading branch information
2 people authored and BenamarMk committed Jan 31, 2025
1 parent 122bdb8 commit f1926ba
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 11 deletions.
35 changes: 35 additions & 0 deletions internal/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
// after this condition is true.
VRGConditionTypeClusterDataReady = "ClusterDataReady"

// Kube objects are ready. This condition is used to indicate that all the
// Kube objects required for the app to be active in the cluster are ready.
VRGConditionTypeKubeObjectsReady = "KubeObjectsReady"

// PV cluster data is protected. This condition indicates whether an app,
// which is active in a cluster, has all its PV related cluster data
// protected from a disaster by uploading it to the required S3 store(s).
Expand Down Expand Up @@ -58,6 +62,7 @@ const (
VRGConditionReasonDataProtected = "DataProtected"
VRGConditionReasonProgressing = "Progressing"
VRGConditionReasonClusterDataRestored = "Restored"
VRGConditionReasonKubeObjectsRestored = "KubeObjectsRestored"
VRGConditionReasonError = "Error"
VRGConditionReasonErrorUnknown = "UnknownError"
VRGConditionReasonUploading = "Uploading"
Expand Down Expand Up @@ -116,6 +121,14 @@ func setVRGInitialCondition(conditions *[]metav1.Condition, observedGeneration i
LastTransitionTime: time,
Message: message,
})
setStatusConditionIfNotFound(conditions, metav1.Condition{
Type: VRGConditionTypeKubeObjectsReady,
Reason: VRGConditionReasonInitializing,
ObservedGeneration: observedGeneration,
Status: metav1.ConditionUnknown,
LastTransitionTime: time,
Message: message,
})
}

// sets conditions when VRG as Secondary is replicating the data with Primary.
Expand Down Expand Up @@ -371,6 +384,28 @@ func newVRGClusterDataUnprotectedCondition(observedGeneration int64, reason, mes
}
}

// sets conditions when Kube objects are restored
func setVRGKubeObjectsReadyCondition(conditions *[]metav1.Condition, observedGeneration int64, message string) {
setStatusCondition(conditions, metav1.Condition{
Type: VRGConditionTypeKubeObjectsReady,
Reason: VRGConditionReasonKubeObjectsRestored,
ObservedGeneration: observedGeneration,
Status: metav1.ConditionTrue,
Message: message,
})
}

// sets conditions when Kube objects failed to restore
func setVRGKubeObjectsErrorCondition(conditions *[]metav1.Condition, observedGeneration int64, message string) {
setStatusCondition(conditions, metav1.Condition{
Type: VRGConditionTypeKubeObjectsReady,
Reason: VRGConditionReasonError,
ObservedGeneration: observedGeneration,
Status: metav1.ConditionFalse,
Message: message,
})
}

func setStatusConditionIfNotFound(existingConditions *[]metav1.Condition, newCondition metav1.Condition) {
if existingConditions == nil {
existingConditions = &[]metav1.Condition{}
Expand Down
44 changes: 41 additions & 3 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,11 @@ func (v *VRGInstance) clusterDataRestore(result *ctrl.Result) (int, error) {
}

// Only after both succeed, we mark ClusterDataReady as true
msg := "Restored PVs and PVCs"
var msg string
if numRestoredForVS+numRestoredForVR == 0 {
msg = "Nothing to restore"
} else {
msg = fmt.Sprintf("Restored %d volsync PVs/PVCs and %d volrep PVs/PVCs", numRestoredForVS, numRestoredForVR)
}

setVRGClusterDataReadyCondition(&v.instance.Status.Conditions, v.instance.Generation, msg)
Expand Down Expand Up @@ -1183,6 +1185,22 @@ func (v *VRGInstance) processAsPrimary() ctrl.Result {

v.reconcileAsPrimary()

if v.shouldRestoreKubeObjects() {
err := v.kubeObjectsRecover(&v.result)
if err != nil {
v.log.Info("Kube objects restore failed", "error", err)
v.errorConditionLogAndSet(err, "Failed to restore kube objects", setVRGKubeObjectsErrorCondition)

return v.updateVRGStatus(v.result)
}

v.log.Info("Kube objects restored")
setVRGKubeObjectsReadyCondition(&v.instance.Status.Conditions, v.instance.Generation, "Kube objects restored")
}

v.kubeObjectsProtectPrimary(&v.result)
v.vrgObjectProtect(&v.result)

// If requeue is false, then VRG was successfully processed as primary.
// Hence the event to be generated is Success of type normal.
// Expectation is that, if something failed and requeue is true, then
Expand Down Expand Up @@ -1224,6 +1242,28 @@ func (v *VRGInstance) shouldRestoreClusterData() bool {
return true
}

func (v *VRGInstance) shouldRestoreKubeObjects() bool {
KubeObjectsRestored := findCondition(v.instance.Status.Conditions, VRGConditionTypeKubeObjectsReady)
if KubeObjectsRestored != nil {
v.log.Info("KubeObjectsReady condition",
"status", KubeObjectsRestored.Status,
"reason", KubeObjectsRestored.Reason,
"message", KubeObjectsRestored.Message,
"observedGeneration", KubeObjectsRestored.ObservedGeneration,
"generation", v.instance.Generation,
)

if KubeObjectsRestored.Status == metav1.ConditionTrue &&
KubeObjectsRestored.ObservedGeneration == v.instance.Generation {
v.log.Info("VRG's KubeObjectsReady condition found. All kube objects must have already been restored")

return false
}
}

return true
}

func (v *VRGInstance) reconcileAsPrimary() {
var finalSyncPrepared struct {
volSync bool
Expand All @@ -1232,8 +1272,6 @@ func (v *VRGInstance) reconcileAsPrimary() {
vrg := v.instance
v.result.Requeue = v.reconcileVolSyncAsPrimary(&finalSyncPrepared.volSync)
v.reconcileVolRepsAsPrimary()
v.kubeObjectsProtectPrimary(&v.result)
v.vrgObjectProtect(&v.result)

if vrg.Spec.PrepareForFinalSync {
vrg.Status.PrepareForFinalSyncComplete = finalSyncPrepared.volSync
Expand Down
58 changes: 51 additions & 7 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,16 +511,20 @@ func (v *VRGInstance) getVRGFromS3Profile(s3ProfileName string) (*ramen.VolumeRe
return vrg, nil
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result, s3ProfileName string) error {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
func (v *VRGInstance) skipIfS3ProfileIsForTest() bool {
for _, s3ProfileName := range v.instance.Spec.S3Profiles {
if s3ProfileName == NoS3StoreAvailable {
v.log.Info("NoS3 available to fetch")

return true
}
}

if v.instance.Spec.Action == "" {
v.log.Info("Skipping kube objects restore in fresh deployment case")
return false
}

return nil
}
func (v *VRGInstance) kubeObjectsRecoverFromS3(result *ctrl.Result, accessor s3StoreAccessor) error {
s3ProfileName := accessor.S3ProfileName

sourceVrg, err := v.getVRGFromS3Profile(s3ProfileName)
if err != nil {
Expand All @@ -538,6 +542,46 @@ func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result, s3ProfileName stri
return v.kubeObjectsRecoveryStartOrResume(result, s3ProfileName, captureToRecoverFromIdentifier, log)
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result) error {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
}

if v.instance.Spec.Action == "" {
v.log.Info("Skipping kube objects restore in fresh deployment case")

return nil
}

if len(v.s3StoreAccessors) == 0 {
v.log.Info("No S3 profiles configured")

result.Requeue = true

return fmt.Errorf("no S3Profiles configured")
}

if v.skipIfS3ProfileIsForTest() {
return nil
}

for _, s3StoreAccessor := range v.s3StoreAccessors {
if err := v.kubeObjectsRecoverFromS3(result, s3StoreAccessor); err != nil {
v.log.Info("Kube objects restore error", "profile", s3StoreAccessor.S3ProfileName, "error", err)

continue
}

v.log.Info("Kube objects restore complete", "profile", s3StoreAccessor.S3ProfileName)

return nil
}

result.Requeue = true

return fmt.Errorf("kube objects restore error, will retry")
}

func (v *VRGInstance) findS3StoreAccessor(s3ProfileName string) (s3StoreAccessor, error) {
for _, s3StoreAccessor := range v.s3StoreAccessors {
if s3StoreAccessor.S3StoreProfile.S3ProfileName == s3ProfileName {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2044,7 +2044,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

v.log.Info(fmt.Sprintf("Restored %d PVs and %d PVCs using profile %s", pvCount, pvcCount, s3ProfileName))

return pvCount + pvcCount, v.kubeObjectsRecover(result, s3ProfileName)
return pvCount + pvcCount, nil
}

if NoS3 {
Expand Down

0 comments on commit f1926ba

Please sign in to comment.