diff --git a/internal/csi-addons/rbd/encryptionkeyrotation.go b/internal/csi-addons/rbd/encryptionkeyrotation.go index 5acb4858aa3..0d7a09d789d 100644 --- a/internal/csi-addons/rbd/encryptionkeyrotation.go +++ b/internal/csi-addons/rbd/encryptionkeyrotation.go @@ -68,7 +68,7 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate( rbdVol, err := mgr.GetVolumeByID(ctx, volID) if err != nil { switch { - case errors.Is(err, rbd.ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index be0864b6061..3925fdbcf13 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -651,7 +651,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, sts, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { // the image gets recreated after issuing resync - if errors.Is(err, corerbd.ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not // be executed as the error would get returned due to getMirroringInfo @@ -785,7 +785,7 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ - corerbd.ErrImageNotFound: codes.NotFound, + util.ErrImageNotFound: codes.NotFound, util.ErrPoolNotFound: codes.NotFound, corerbd.ErrInvalidArgument: codes.InvalidArgument, corerbd.ErrFlattenInProgress: codes.Aborted, @@ -835,7 +835,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err) switch { - case errors.Is(err, corerbd.ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = status.Error(codes.NotFound, err.Error()) case errors.Is(err, util.ErrPoolNotFound): err = status.Error(codes.NotFound, err.Error()) @@ -872,7 +872,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, if err != nil { log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err) - if errors.Is(err, corerbd.ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) } diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index ba7212483eb..79984425109 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -597,8 +597,8 @@ func TestGetGRPCError(t *testing.T) { }, { name: "ErrImageNotFound", - err: corerbd.ErrImageNotFound, - expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()), + err: util.ErrImageNotFound, + expectedErr: status.Error(codes.NotFound, util.ErrImageNotFound.Error()), }, { name: "ErrPoolNotFound", diff --git a/internal/rbd/clone.go b/internal/rbd/clone.go index 25452921a99..fb99ce0c398 100644 --- a/internal/rbd/clone.go +++ b/internal/rbd/clone.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" + "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" @@ -66,7 +67,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) return true, nil - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): // as the temp clone does not exist,check snapshot exists on parent volume // snapshot name is same as temporary clone image snap.RbdImageName = tempClone.RbdImageName diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 8283970f563..cb7a497b98f 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -575,7 +575,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { snaps, children, err := rbdVol.listSnapAndChildren() if err != nil { - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { return status.Error(codes.InvalidArgument, err.Error()) } @@ -831,7 +831,7 @@ func checkContentSource( rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets()) if err != nil { log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err) - if !errors.Is(err, ErrImageNotFound) { + if !errors.Is(err, util.ErrImageNotFound) { return nil, nil, status.Error(codes.Internal, err.Error()) } @@ -871,7 +871,7 @@ func (cs *ControllerServer) checkErrAndUndoReserve( return &csi.DeleteVolumeResponse{}, nil } - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { notFoundErr := rbdVol.ensureImageCleanup(ctx) if notFoundErr != nil { return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr) @@ -946,7 +946,7 @@ func (cs *ControllerServer) DeleteVolume( return nil, status.Error(codes.InvalidArgument, pErr.Error()) } pErr = deleteMigratedVolume(ctx, pmVolID, cr) - if pErr != nil && !errors.Is(pErr, ErrImageNotFound) { + if pErr != nil && !errors.Is(pErr, util.ErrImageNotFound) { return nil, status.Error(codes.Internal, pErr.Error()) } @@ -1118,7 +1118,7 @@ func (cs *ControllerServer) CreateSnapshot( }() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId()) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err) @@ -1459,7 +1459,7 @@ func (cs *ControllerServer) DeleteSnapshot( // if the error is ErrImageNotFound, We need to cleanup the image from // trash and remove the metadata in OMAP. - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err) err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr) @@ -1562,7 +1562,7 @@ func (cs *ControllerServer) ControllerExpandVolume( rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets()) if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) diff --git a/internal/rbd/errors.go b/internal/rbd/errors.go index 8248dd98ce5..791319316bc 100644 --- a/internal/rbd/errors.go +++ b/internal/rbd/errors.go @@ -19,8 +19,6 @@ package rbd import "errors" var ( - // ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace. - ErrImageNotFound = errors.New("image not found") // ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the // given image. ErrSnapNotFound = errors.New("snapshot not found") diff --git a/internal/rbd/group/util.go b/internal/rbd/group/util.go index 6c4e56d5dd9..8ef8b154779 100644 --- a/internal/rbd/group/util.go +++ b/internal/rbd/group/util.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/ceph/go-ceph/rados" @@ -63,31 +64,27 @@ type commonVolumeGroup struct { journal journal.VolumeGroupJournal } -func (cvg *commonVolumeGroup) initCommonVolumeGroup( - ctx context.Context, +// generateVolumeGroup generates a commonVolumeGroup structure from the volumeGroup identifier. +func generateVolumeGroup( id string, csiDriver string, creds *util.Credentials, -) error { - csiID := util.CSIIdentifier{} - err := csiID.DecomposeCSIID(id) - if err != nil { - return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) - } - + csiID util.CSIIdentifier, +) (*commonVolumeGroup, error) { + cvg := &commonVolumeGroup{} mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID) if err != nil { - return fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err) + return nil, fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err) } namespace, err := util.GetRBDRadosNamespace(util.CsiConfigFile, csiID.ClusterID) if err != nil { - return fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err) + return nil, fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err) } pool, err := util.GetPoolName(mons, creds, csiID.LocationID) if err != nil { - return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err) + return nil, fmt.Errorf("failed to get pool for volume group id %q: %w", id, err) } cvg.csiDriver = csiDriver @@ -99,6 +96,106 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup( cvg.pool = pool cvg.namespace = namespace + return cvg, nil +} + +// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and +// generates commonVolumeGroup structure for the mapped clusterID and poolID. +func generateVolumeGroupFromMapping( + ctx context.Context, + id string, + csiDriver string, + creds *util.Credentials, + csiID util.CSIIdentifier, + mapping *[]util.ClusterMappingInfo, +) (*commonVolumeGroup, error) { + cvg := &commonVolumeGroup{} + mcsiID := csiID + existingClusterID := csiID.ClusterID + existingPoolID := strconv.FormatInt(csiID.LocationID, 10) + + for _, cm := range *mapping { + for key, val := range cm.ClusterIDMapping { + mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID) + if mappedClusterID == "" { + continue + } + + log.DebugLog(ctx, + "found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID) + + // Add mapped clusterID to Identifier + mcsiID.ClusterID = mappedClusterID + for _, pools := range cm.RBDpoolIDMappingInfo { + for key, val := range pools { + mappedPoolID := util.GetMappedID(key, val, existingPoolID) + if mappedPoolID == "" { + continue + } + log.DebugLog(ctx, + "found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID) + + mPID, err := strconv.ParseInt(mappedPoolID, 10, 64) + if err != nil { + return cvg, err + } + mcsiID.LocationID = mPID + cvg, err = generateVolumeGroup(id, csiDriver, creds, mcsiID) + if err != nil && !util.ShouldRetryVolumeGeneration(err) { + return cvg, err + } + // If the pool is found, return the volume group + if cvg != nil { + return cvg, nil + } + } + } + } + } + + return nil, util.ErrPoolNotFound +} + +func (cvg *commonVolumeGroup) initCommonVolumeGroup( + ctx context.Context, + id string, + csiDriver string, + creds *util.Credentials, +) error { + csiID := util.CSIIdentifier{} + + err := csiID.DecomposeCSIID(id) + if err != nil { + return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) + } + + vg, err := generateVolumeGroup(id, csiDriver, creds, csiID) + if err != nil && !util.ShouldRetryVolumeGeneration(err) { + return err + } + + if err != nil && util.ShouldRetryVolumeGeneration(err) { + mapping, err := util.GetClusterMappingInfo(csiID.ClusterID) + if err != nil { + return err + } + if mapping != nil { + vg, err = generateVolumeGroupFromMapping(ctx, id, csiDriver, creds, csiID, mapping) + if err != nil { + return err + } + } + } + + cvg.csiDriver = vg.csiDriver + cvg.credentials = vg.credentials + cvg.id = vg.id + cvg.clusterID = vg.clusterID + cvg.objectUUID = vg.objectUUID + cvg.monitors = vg.monitors + cvg.pool = vg.pool + cvg.namespace = vg.namespace + log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id) return nil diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 61fcfdcaadf..f5b4204b259 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -174,7 +174,7 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets) if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = fmt.Errorf("volume %s not found: %w", id, err) return nil, err @@ -199,7 +199,7 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets) if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, util.ErrImageNotFound): err = fmt.Errorf("volume %s not found: %w", id, err) return nil, err @@ -467,7 +467,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } - } else if err != nil && !errors.Is(ErrImageNotFound, err) { + } else if err != nil && !errors.Is(err, util.ErrImageNotFound) { // ErrImageNotFound can be returned if the VolumeGroupSnapshot // could not be found. It is expected that it does not exist // yet, in which case it will be created below. diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 67e13daba9e..3d9b47fff66 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -172,7 +172,7 @@ func checkSnapCloneExists( // Fetch on-disk image attributes err = vol.getImageInfo() if err != nil { - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { err = parentVol.deleteSnapshot(ctx, rbdSnap) if err != nil { if !errors.Is(err, ErrSnapNotFound) { @@ -298,7 +298,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er // Fetch on-disk image attributes and compare against request err = rv.getImageInfo() if err != nil { - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { // Need to check cloned info here not on createvolume, if parentVol != nil { found, cErr := rv.checkCloneImage(ctx, parentVol) diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 274981bfe86..d6da2aad364 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -524,7 +524,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) { image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot) if err != nil { if errors.Is(err, librbd.ErrNotFound) { - err = fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err) + err = fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err) } return nil, err @@ -542,7 +542,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) { func (ri *rbdImage) isInUse() (bool, error) { image, err := ri.open() if err != nil { - if errors.Is(err, ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { + if errors.Is(err, util.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { return false, err } // any error should assume something else is using the image @@ -681,7 +681,7 @@ func (ri *rbdImage) Delete(ctx context.Context) error { err = rbdImage.Trash(0) if err != nil { if errors.Is(err, librbd.ErrNotFound) { - return fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err) + return fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err) } log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err) @@ -731,7 +731,7 @@ func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error { tempClone := rv.generateTempClone() err := tempClone.Delete(ctx) if err != nil { - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { return tempClone.ensureImageCleanup(ctx) } else { // return error if it is not ErrImageNotFound @@ -770,7 +770,7 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { // if the parent image is moved to trash the name will be present // in rbd image info but the image will be in trash, in that case // return the found depth - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { return depth, nil } log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err) @@ -956,7 +956,7 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6 // is in the trash, when we try to open the parent image to get its // information it fails because it is already in trash. We should // treat error as nil if the parent is not found. - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, util.ErrImageNotFound) { return false, nil } log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err) @@ -1214,7 +1214,7 @@ func GenVolFromVolID( } vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets) - if !shouldRetryVolumeGeneration(err) { + if !util.ShouldRetryVolumeGeneration(err) { return vol, err } @@ -1225,7 +1225,7 @@ func GenVolFromVolID( } if mapping != nil { rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets) - if !shouldRetryVolumeGeneration(vErr) { + if !util.ShouldRetryVolumeGeneration(vErr) { return rbdVol, vErr } } @@ -1278,7 +1278,7 @@ func generateVolumeFromMapping( // Add mapping poolID to Identifier nvi.LocationID = pID vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets) - if !shouldRetryVolumeGeneration(err) { + if !util.ShouldRetryVolumeGeneration(err) { return vol, err } } @@ -1289,33 +1289,6 @@ func generateVolumeFromMapping( return vol, util.ErrPoolNotFound } -// shouldRetryVolumeGeneration determines whether the process of finding or generating -// volumes should continue based on the type of error encountered. -// -// It checks if the given error matches any of the following known errors: -// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap. -// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist. -// - ErrImageNotFound: The image doesn't exist in the rbd pool. -// - rados.ErrPermissionDenied: Permissions to access the pool is denied. -// -// If any of these errors are encountered, the function returns `true`, indicating -// that the volume search should continue because of known error. Otherwise, it -// returns `false`, meaning the search should stop. -// -// This helper function is used in scenarios where multiple attempts may be made -// to retrieve or generate volume information, and we want to gracefully handle -// specific failure cases while retrying for others. -func shouldRetryVolumeGeneration(err error) bool { - if err == nil { - return false // No error, do not retry - } - // Continue searching for specific known errors - return (errors.Is(err, util.ErrKeyNotFound) || - errors.Is(err, util.ErrPoolNotFound) || - errors.Is(err, ErrImageNotFound) || - errors.Is(err, rados.ErrPermissionDenied)) -} - func genVolFromVolumeOptions( ctx context.Context, volOptions map[string]string, diff --git a/internal/rbd/rbd_util_test.go b/internal/rbd/rbd_util_test.go index 5d14ed844a7..905e977079c 100644 --- a/internal/rbd/rbd_util_test.go +++ b/internal/rbd/rbd_util_test.go @@ -23,11 +23,8 @@ import ( "strings" "testing" - "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" "github.com/stretchr/testify/require" - - "github.com/ceph/ceph-csi/internal/util" ) func TestHasSnapshotFeature(t *testing.T) { @@ -390,54 +387,3 @@ func Test_checkValidImageFeatures(t *testing.T) { }) } } - -func Test_shouldRetryVolumeGeneration(t *testing.T) { - t.Parallel() - type args struct { - err error - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "No error (stop searching)", - args: args{err: nil}, - want: false, // No error, stop searching - }, - { - name: "ErrKeyNotFound (continue searching)", - args: args{err: util.ErrKeyNotFound}, - want: true, // Known error, continue searching - }, - { - name: "ErrPoolNotFound (continue searching)", - args: args{err: util.ErrPoolNotFound}, - want: true, // Known error, continue searching - }, - { - name: "ErrImageNotFound (continue searching)", - args: args{err: ErrImageNotFound}, - want: true, // Known error, continue searching - }, - { - name: "ErrPermissionDenied (continue searching)", - args: args{err: rados.ErrPermissionDenied}, - want: true, // Known error, continue searching - }, - { - name: "Different error (stop searching)", - args: args{err: errors.New("unknown error")}, - want: false, // Unknown error, stop searching - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - if got := shouldRetryVolumeGeneration(tt.args.err); got != tt.want { - t.Errorf("shouldRetryVolumeGeneration() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/internal/rbd/snapshot.go b/internal/rbd/snapshot.go index ff8887590c7..d47d3d31c5f 100644 --- a/internal/rbd/snapshot.go +++ b/internal/rbd/snapshot.go @@ -82,7 +82,7 @@ func cleanUpSnapshot( ) error { err := parentVol.deleteSnapshot(ctx, rbdSnap) if err != nil { - if !errors.Is(err, ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) { + if !errors.Is(err, util.ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) { log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err) return err @@ -92,7 +92,7 @@ func cleanUpSnapshot( if rbdVol != nil { err := rbdVol.Delete(ctx) if err != nil { - if !errors.Is(err, ErrImageNotFound) { + if !errors.Is(err, util.ErrImageNotFound) { log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err) return err diff --git a/internal/util/errors.go b/internal/util/errors.go index 118ed15e3c7..11a3f2964ec 100644 --- a/internal/util/errors.go +++ b/internal/util/errors.go @@ -18,9 +18,13 @@ package util import ( "errors" + + "github.com/ceph/go-ceph/rados" ) var ( + // ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace. + ErrImageNotFound = errors.New("image not found") // ErrKeyNotFound is returned when requested key in omap is not found. ErrKeyNotFound = errors.New("key not found") // ErrObjectExists is returned when named omap is already present in rados. @@ -37,3 +41,30 @@ var ( // ErrMissingConfigForMonitor is returned when clusterID is not found for the mon. ErrMissingConfigForMonitor = errors.New("missing configuration of cluster ID for monitor") ) + +// ShouldRetryVolumeGeneration determines whether the process of finding or generating +// volumes should continue based on the type of error encountered. +// +// It checks if the given error matches any of the following known errors: +// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap. +// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist. +// - ErrImageNotFound: The image doesn't exist in the rbd pool. +// - rados.ErrPermissionDenied: Permissions to access the pool is denied. +// +// If any of these errors are encountered, the function returns `true`, indicating +// that the volume search should continue because of known error. Otherwise, it +// returns `false`, meaning the search should stop. +// +// This helper function is used in scenarios where multiple attempts may be made +// to retrieve or generate volume information, and we want to gracefully handle +// specific failure cases while retrying for others. +func ShouldRetryVolumeGeneration(err error) bool { + if err == nil { + return false // No error, do not retry + } + // Continue searching for specific known errors + return (errors.Is(err, ErrKeyNotFound) || + errors.Is(err, ErrPoolNotFound) || + errors.Is(err, ErrImageNotFound) || + errors.Is(err, rados.ErrPermissionDenied)) +} diff --git a/internal/util/errors_test.go b/internal/util/errors_test.go new file mode 100644 index 00000000000..cdb10675813 --- /dev/null +++ b/internal/util/errors_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "errors" + "testing" + + "github.com/ceph/go-ceph/rados" +) + +func Test_shouldRetryVolumeGeneration(t *testing.T) { + t.Parallel() + type args struct { + err error + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "No error (stop searching)", + args: args{err: nil}, + want: false, // No error, stop searching + }, + { + name: "ErrKeyNotFound (continue searching)", + args: args{err: ErrKeyNotFound}, + want: true, // Known error, continue searching + }, + { + name: "ErrPoolNotFound (continue searching)", + args: args{err: ErrPoolNotFound}, + want: true, // Known error, continue searching + }, + { + name: "ErrImageNotFound (continue searching)", + args: args{err: ErrImageNotFound}, + want: true, // Known error, continue searching + }, + { + name: "ErrPermissionDenied (continue searching)", + args: args{err: rados.ErrPermissionDenied}, + want: true, // Known error, continue searching + }, + { + name: "Different error (stop searching)", + args: args{err: errors.New("unknown error")}, + want: false, // Unknown error, stop searching + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want { + t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want) + } + }) + } +}