diff --git a/Documentation/CRDs/Cluster/ceph-cluster-crd.md b/Documentation/CRDs/Cluster/ceph-cluster-crd.md index ae3c2deed90a..206696dafdd0 100755 --- a/Documentation/CRDs/Cluster/ceph-cluster-crd.md +++ b/Documentation/CRDs/Cluster/ceph-cluster-crd.md @@ -38,6 +38,7 @@ Settings can be specified at the global level to apply to the cluster as a whole If this value is empty, each pod will get an ephemeral directory to store their config files that is tied to the lifetime of the pod running on that node. More details can be found in the Kubernetes [empty dir docs](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). * `skipUpgradeChecks`: if set to true Rook won't perform any upgrade checks on Ceph daemons during an upgrade. Use this at **YOUR OWN RISK**, only if you know what you're doing. To understand Rook's upgrade process of Ceph, read the [upgrade doc](../../Upgrade/rook-upgrade.md#ceph-version-upgrades). * `continueUpgradeAfterChecksEvenIfNotHealthy`: if set to true Rook will continue the OSD daemon upgrade process even if the PGs are not clean, or continue with the MDS upgrade even the file system is not healthy. +* `upgradeOSDRequiresHealthyPGs`: if set to true OSD upgrade process won't start until PGs are healthy. * `dashboard`: Settings for the Ceph dashboard. To view the dashboard in your browser see the [dashboard guide](../../Storage-Configuration/Monitoring/ceph-dashboard.md). * `enabled`: Whether to enable the dashboard to view cluster status * `urlPrefix`: Allows to serve the dashboard under a subpath (useful when you are accessing the dashboard via a reverse proxy) diff --git a/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md b/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md index 8f8941c4f49b..4016ee495517 100644 --- a/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md +++ b/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md @@ -32,6 +32,10 @@ spec: distributed: 1 # distributed=<0, 1> (disabled=0) # export: # export=<0-256> (disabled=-1) # random: # random=[0.0, 1.0](disabled=0.0) + # Quota size of the subvolume group. + #quota: 10G + # data pool name for the subvolume group layout instead of the default data pool. + #dataPoolName: myfs-replicated ``` ## Settings @@ -48,7 +52,11 @@ If any setting is unspecified, a suitable default will be used automatically. * `filesystemName`: The metadata name of the CephFilesystem CR where the subvolume group will be created. -* `pinning`: To distribute load across MDS ranks in predictable and stable ways. Reference: https://docs.ceph.com/en/latest/cephfs/fs-volumes/#pinning-subvolumes-and-subvolume-groups. +* `quota`: Quota size of the Ceph Filesystem subvolume group. + +* `dataPoolName`: The data pool name for the subvolume group layout instead of the default data pool. + +* `pinning`: To distribute load across MDS ranks in predictable and stable ways. See the Ceph doc for [Pinning subvolume groups](https://docs.ceph.com/en/latest/cephfs/fs-volumes/#pinning-subvolumes-and-subvolume-groups). * `distributed`: Range: <0, 1>, for disabling it set to 0 * `export`: Range: <0-256>, for disabling it set to -1 * `random`: Range: [0.0, 1.0], for disabling it set to 0.0 diff --git a/Documentation/CRDs/specification.md b/Documentation/CRDs/specification.md index b84be542f70d..64197c517dcf 100644 --- a/Documentation/CRDs/specification.md +++ b/Documentation/CRDs/specification.md @@ -945,6 +945,20 @@ The default wait timeout is 10 minutes.

+upgradeOSDRequiresHealthyPGs
+ +bool + + + +(Optional) +

UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to true OSD upgrade process won’t start until PGs are healthy. +This configuration will be ignored if skipUpgradeChecks is true. +Default is false.

+ + + + disruptionManagement
@@ -1562,6 +1576,30 @@ reference + + +quota
+ +k8s.io/apimachinery/pkg/api/resource.Quantity + + + +(Optional) +

Quota size of the Ceph Filesystem subvolume group.

+ + + + +dataPoolName
+ +string + + + +(Optional) +

The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired.

+ +

CephFilesystemSubVolumeGroupSpecPinning @@ -4370,6 +4432,20 @@ The default wait timeout is 10 minutes.

+upgradeOSDRequiresHealthyPGs
+ +bool + + + +(Optional) +

UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to true OSD upgrade process won’t start until PGs are healthy. +This configuration will be ignored if skipUpgradeChecks is true. +Default is false.

+ + + + disruptionManagement
diff --git a/Documentation/Storage-Configuration/ceph-teardown.md b/Documentation/Storage-Configuration/ceph-teardown.md index 3e4e7fd8f9a1..4b8917ecdff4 100644 --- a/Documentation/Storage-Configuration/ceph-teardown.md +++ b/Documentation/Storage-Configuration/ceph-teardown.md @@ -2,6 +2,11 @@ title: Cleanup --- +Rook provides the following clean up options: + +1. [Uninstall: Clean up the entire cluster and delete all data](#cleaning-up-a-cluster) +1. [Force delete individual resources](#force-delete-resources) + ## Cleaning up a Cluster To tear down the cluster, the following resources need to be cleaned up: @@ -179,3 +184,22 @@ If the operator is not able to remove the finalizers (i.e., the operator is not kubectl -n rook-ceph patch configmap rook-ceph-mon-endpoints --type merge -p '{"metadata":{"finalizers": []}}' kubectl -n rook-ceph patch secrets rook-ceph-mon --type merge -p '{"metadata":{"finalizers": []}}' ``` + +## Force Delete Resources + +To keep your data safe in the cluster, Rook disallows deleting critical cluster resources by default. To override this behavior and force delete a specific custom resource, add the annotation `rook.io/force-deletion="true"` to the resource and then delete it. Rook will start a cleanup job that will delete all the related ceph resources created by that custom resource. + +For example, run the following commands to clean the `CephFilesystemSubVolumeGroup` resource named `my-subvolumegroup` + +``` console +kubectl -n rook-ceph annotate cephfilesystemsubvolumegroups.ceph.rook.io my-subvolumegroup rook.io/force-deletion="true" +kubectl -n rook-ceph delete cephfilesystemsubvolumegroups.ceph.rook.io my-subvolumegroup +``` + +Once the cleanup job is completed successfully, Rook will remove the finalizers from the deleted custom resource. + +This cleanup is supported only for the following custom resources: + +| Custom Resource | Ceph Resources to be cleaned up | +| -------- | ------- | +| CephFilesystemSubVolumeGroup | CSI stored RADOS OMAP details for pvc/volumesnapshots, subvolume snapshots, subvolume clones, subvolumes | diff --git a/build/csv/ceph/ceph.rook.io_cephclusters.yaml b/build/csv/ceph/ceph.rook.io_cephclusters.yaml index 5ae53cab3430..9e69a9015d68 100644 --- a/build/csv/ceph/ceph.rook.io_cephclusters.yaml +++ b/build/csv/ceph/ceph.rook.io_cephclusters.yaml @@ -2951,6 +2951,8 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + type: boolean waitTimeoutForHealthyOSDInMinutes: format: int64 type: integer diff --git a/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml b/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml index 3039e73b4e9b..497e168a8d92 100644 --- a/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml +++ b/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml @@ -33,6 +33,8 @@ spec: type: object spec: properties: + dataPoolName: + type: string filesystemName: type: string x-kubernetes-validations: @@ -67,6 +69,12 @@ spec: || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object diff --git a/cmd/rook/ceph/cleanup.go b/cmd/rook/ceph/cleanup.go index e13e92a3067e..de3bf72838de 100644 --- a/cmd/rook/ceph/cleanup.go +++ b/cmd/rook/ceph/cleanup.go @@ -17,12 +17,14 @@ limitations under the License. package ceph import ( + "fmt" "os" "github.com/rook/rook/cmd/rook/rook" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" cleanup "github.com/rook/rook/pkg/daemon/ceph/cleanup" "github.com/rook/rook/pkg/daemon/ceph/client" + opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/k8sutil" "github.com/rook/rook/pkg/util/flags" "github.com/spf13/cobra" @@ -40,22 +42,39 @@ var ( var cleanUpCmd = &cobra.Command{ Use: "clean", - Short: "Starts the cleanup process on the disks after ceph cluster is deleted", + Short: "Starts the cleanup process", +} + +var cleanUpHostCmd = &cobra.Command{ + Use: "host", + Short: "Starts the cleanup process on a host after the ceph cluster is deleted", +} + +var cleanUpSubVolumeGroupCmd = &cobra.Command{ + // the subcommand matches CRD kind of the custom resource to be cleaned up + Use: "CephFilesystemSubVolumeGroup", + Short: "Starts the cleanup process of a CephFilesystemSubVolumeGroup", } func init() { cleanUpCmd.Flags().StringVar(&dataDirHostPath, "data-dir-host-path", "", "dataDirHostPath on the node") cleanUpCmd.Flags().StringVar(&namespaceDir, "namespace-dir", "", "dataDirHostPath on the node") - cleanUpCmd.Flags().StringVar(&monSecret, "mon-secret", "", "monitor secret from the keyring") - cleanUpCmd.Flags().StringVar(&clusterFSID, "cluster-fsid", "", "ceph cluster fsid") - cleanUpCmd.Flags().StringVar(&sanitizeMethod, "sanitize-method", string(cephv1.SanitizeMethodQuick), "sanitize method to use (metadata or data)") - cleanUpCmd.Flags().StringVar(&sanitizeDataSource, "sanitize-data-source", string(cephv1.SanitizeDataSourceZero), "data source to sanitize the disk (zero or random)") - cleanUpCmd.Flags().Int32Var(&sanitizeIteration, "sanitize-iteration", 1, "overwrite N times the disk") - flags.SetFlagsFromEnv(cleanUpCmd.Flags(), rook.RookEnvVarPrefix) - cleanUpCmd.RunE = startCleanUp + cleanUpHostCmd.Flags().StringVar(&monSecret, "mon-secret", "", "monitor secret from the keyring") + cleanUpHostCmd.Flags().StringVar(&clusterFSID, "cluster-fsid", "", "ceph cluster fsid") + cleanUpHostCmd.Flags().StringVar(&sanitizeMethod, "sanitize-method", string(cephv1.SanitizeMethodQuick), "sanitize method to use (metadata or data)") + cleanUpHostCmd.Flags().StringVar(&sanitizeDataSource, "sanitize-data-source", string(cephv1.SanitizeDataSourceZero), "data source to sanitize the disk (zero or random)") + cleanUpHostCmd.Flags().Int32Var(&sanitizeIteration, "sanitize-iteration", 1, "overwrite N times the disk") + flags.SetFlagsFromEnv(cleanUpHostCmd.Flags(), rook.RookEnvVarPrefix) + + flags.SetFlagsFromEnv(cleanUpSubVolumeGroupCmd.Flags(), rook.RookEnvVarPrefix) + + cleanUpCmd.AddCommand(cleanUpHostCmd, cleanUpSubVolumeGroupCmd) + + cleanUpHostCmd.RunE = startHostCleanUp + cleanUpSubVolumeGroupCmd.RunE = startSubVolumeGroupCleanUp } -func startCleanUp(cmd *cobra.Command, args []string) error { +func startHostCleanUp(cmd *cobra.Command, args []string) error { rook.SetLogLevel() rook.LogStartupInfo(cleanUpCmd.Flags()) @@ -87,3 +106,37 @@ func startCleanUp(cmd *cobra.Command, args []string) error { return nil } + +func startSubVolumeGroupCleanUp(cmd *cobra.Command, args []string) error { + rook.SetLogLevel() + rook.LogStartupInfo(cleanUpSubVolumeGroupCmd.Flags()) + + ctx := cmd.Context() + context := createContext() + namespace := os.Getenv(k8sutil.PodNamespaceEnvVar) + clusterInfo := client.AdminClusterInfo(ctx, namespace, "") + + fsName := os.Getenv(opcontroller.CephFSNameEnv) + if fsName == "" { + rook.TerminateFatal(fmt.Errorf("ceph filesystem name is not available in the pod environment variables")) + } + subVolumeGroupName := os.Getenv(opcontroller.CephFSSubVolumeGroupNameEnv) + if subVolumeGroupName == "" { + rook.TerminateFatal(fmt.Errorf("cephFS SubVolumeGroup name is not available in the pod environment variables")) + } + csiNamespace := os.Getenv(opcontroller.CSICephFSRadosNamesaceEnv) + if csiNamespace == "" { + rook.TerminateFatal(fmt.Errorf("CSI rados namespace name is not available in the pod environment variables")) + } + poolName := os.Getenv(opcontroller.CephFSMetaDataPoolNameEnv) + if poolName == "" { + rook.TerminateFatal(fmt.Errorf("cephFS metadata pool name is not available in the pod environment variables")) + } + + err := cleanup.SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + if err != nil { + rook.TerminateFatal(fmt.Errorf("failed to cleanup cephFS %q SubVolumeGroup %q in the namespace %q. %v", fsName, subVolumeGroupName, namespace, err)) + } + + return nil +} diff --git a/deploy/charts/rook-ceph-cluster/values.yaml b/deploy/charts/rook-ceph-cluster/values.yaml index ed0ce51daaa5..6d04631afcec 100644 --- a/deploy/charts/rook-ceph-cluster/values.yaml +++ b/deploy/charts/rook-ceph-cluster/values.yaml @@ -121,6 +121,11 @@ cephClusterSpec: # The default wait timeout is 10 minutes. waitTimeoutForHealthyOSDInMinutes: 10 + # Whether or not requires PGs are clean before an OSD upgrade. If set to `true` OSD upgrade process won't start until PGs are healthy. + # This configuration will be ignored if `skipUpgradeChecks` is `true`. + # Default is false. + upgradeOSDRequiresHealthyPGs: false + mon: # Set the number of mons to be started. Generally recommended to be 3. # For highest availability, an odd number of mons should be specified. diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index d02d81c0d8bd..f61b5f9ad121 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -5004,6 +5004,12 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + description: |- + UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + This configuration will be ignored if `skipUpgradeChecks` is `true`. + Default is false. + type: boolean waitTimeoutForHealthyOSDInMinutes: description: |- WaitTimeoutForHealthyOSDInMinutes defines the time the operator would wait before an OSD can be stopped for upgrade or restart. @@ -7977,6 +7983,9 @@ spec: spec: description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup properties: + dataPoolName: + description: The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + type: string filesystemName: description: |- FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of @@ -8018,6 +8027,13 @@ spec: x-kubernetes-validations: - message: only one pinning type should be set rule: (has(self.export) && !has(self.distributed) && !has(self.random)) || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + description: Quota size of the Ceph Filesystem subvolume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object diff --git a/deploy/examples/cluster.yaml b/deploy/examples/cluster.yaml index 6292cb3037cf..1c0c68434075 100644 --- a/deploy/examples/cluster.yaml +++ b/deploy/examples/cluster.yaml @@ -43,6 +43,10 @@ spec: # continue with the upgrade of an OSD even if its not ok to stop after the timeout. This timeout won't be applied if `skipUpgradeChecks` is `true`. # The default wait timeout is 10 minutes. waitTimeoutForHealthyOSDInMinutes: 10 + # Whether or not requires PGs are clean before an OSD upgrade. If set to `true` OSD upgrade process won't start until PGs are healthy. + # This configuration will be ignored if `skipUpgradeChecks` is `true`. + # Default is false. + upgradeOSDRequiresHealthyPGs: false mon: # Set the number of mons to be started. Generally recommended to be 3. # For highest availability, an odd number of mons should be specified. diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index 942b083342ef..511facee7854 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -5002,6 +5002,12 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + description: |- + UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + This configuration will be ignored if `skipUpgradeChecks` is `true`. + Default is false. + type: boolean waitTimeoutForHealthyOSDInMinutes: description: |- WaitTimeoutForHealthyOSDInMinutes defines the time the operator would wait before an OSD can be stopped for upgrade or restart. @@ -7971,6 +7977,9 @@ spec: spec: description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup properties: + dataPoolName: + description: The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + type: string filesystemName: description: |- FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of @@ -8012,6 +8021,13 @@ spec: x-kubernetes-validations: - message: only one pinning type should be set rule: (has(self.export) && !has(self.distributed) && !has(self.random)) || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + description: Quota size of the Ceph Filesystem subvolume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object diff --git a/deploy/examples/subvolumegroup.yaml b/deploy/examples/subvolumegroup.yaml index 629cbe4b8a76..50345142a222 100644 --- a/deploy/examples/subvolumegroup.yaml +++ b/deploy/examples/subvolumegroup.yaml @@ -17,3 +17,7 @@ spec: distributed: 1 # distributed=<0, 1> (disabled=0) # export: # export=<0-256> (disabled=-1) # random: # random=[0.0, 1.0](disabled=0.0) + # Quota size of the subvolume group. + #quota: 10G + # data pool name for the subvolume group layout instead of the default data pool. + #dataPoolName: myfs-replicated diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index c7995cfc5cde..9babbeeca60e 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -162,6 +162,12 @@ type ClusterSpec struct { // +optional WaitTimeoutForHealthyOSDInMinutes time.Duration `json:"waitTimeoutForHealthyOSDInMinutes,omitempty"` + // UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + // This configuration will be ignored if `skipUpgradeChecks` is `true`. + // Default is false. + // +optional + UpgradeOSDRequiresHealthyPGs bool `json:"upgradeOSDRequiresHealthyPGs,omitempty"` + // A spec for configuring disruption management. // +nullable // +optional @@ -3016,6 +3022,12 @@ type CephFilesystemSubVolumeGroupSpec struct { // only one out of (export, distributed, random) can be set at a time // +optional Pinning CephFilesystemSubVolumeGroupSpecPinning `json:"pinning,omitempty"` + // Quota size of the Ceph Filesystem subvolume group. + // +optional + Quota *resource.Quantity `json:"quota,omitempty"` + // The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + // +optional + DataPoolName string `json:"dataPoolName"` } // CephFilesystemSubVolumeGroupSpecPinning represents the pinning configuration of SubVolumeGroup diff --git a/pkg/daemon/ceph/cleanup/subvolumegroups.go b/pkg/daemon/ceph/cleanup/subvolumegroups.go new file mode 100644 index 000000000000..6559a58c05f4 --- /dev/null +++ b/pkg/daemon/ceph/cleanup/subvolumegroups.go @@ -0,0 +1,143 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cleanup + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/rook/rook/pkg/clusterd" + "github.com/rook/rook/pkg/daemon/ceph/client" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" +) + +func SubVolumeGroupCleanup(context *clusterd.Context, clusterInfo *client.ClusterInfo, fsName, svg, poolName, csiNamespace string) error { + logger.Infof("starting clean up cephFS subVolumeGroup resource %q", svg) + + subVolumeList, err := cephclient.ListSubvolumesInGroup(context, clusterInfo, fsName, svg) + if err != nil { + return errors.Wrapf(err, "failed to list cephFS subVolumes in subVolumeGroup %q", svg) + } + + if len(subVolumeList) == 0 { + logger.Infof("no subvolumes found in cephFS subVolumeGroup %q", svg) + return nil + } + + var retErr error + for _, subVolume := range subVolumeList { + logger.Infof("starting clean up of subvolume %q", subVolume.Name) + err := CleanUpOMAPDetails(context, clusterInfo, subVolume.Name, poolName, csiNamespace) + if err != nil { + retErr = errors.Wrapf(err, "failed to clean up OMAP details for the subvolume %q.", subVolume.Name) + logger.Error(retErr) + } + subVolumeSnapshots, err := cephclient.ListSubVolumeSnapshots(context, clusterInfo, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to list snapshots for subvolume %q in group %q.", subVolume.Name, svg) + logger.Error(retErr) + } else { + err := CancelPendingClones(context, clusterInfo, subVolumeSnapshots, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to cancel pending clones for subvolume %q in group %q", subVolume.Name, svg) + logger.Error(retErr) + } + err = DeleteSubVolumeSnapshots(context, clusterInfo, subVolumeSnapshots, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to delete snapshots for subvolume %q in group %q", subVolume.Name, svg) + logger.Error(retErr) + } + } + err = cephclient.DeleteSubVolume(context, clusterInfo, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to delete subvolume group %q.", subVolume.Name) + logger.Error(retErr) + } + } + + if retErr != nil { + return errors.Wrapf(err, "clean up for cephFS subVolumeGroup %q didn't complete successfully.", svg) + } + logger.Infof("successfully cleaned up cephFS subVolumeGroup %q", svg) + return nil +} + +func CancelPendingClones(context *clusterd.Context, clusterInfo *client.ClusterInfo, snapshots cephclient.SubVolumeSnapshots, fsName, subvol, svg string) error { + for _, snapshot := range snapshots { + logger.Infof("deleting any pending clones of snapshot %q of subvolume %q of group %q", snapshot.Name, subvol, svg) + pendingClones, err := cephclient.ListSubVolumeSnapshotPendingClones(context, clusterInfo, fsName, subvol, snapshot.Name, svg) + if err != nil { + return errors.Wrapf(err, "failed to list all the pending clones for snapshot %q", snapshot.Name) + } + for _, pendingClone := range pendingClones.Clones { + err := cephclient.CancelSnapshotClone(context, clusterInfo, fsName, svg, pendingClone.Name) + if err != nil { + return errors.Wrapf(err, "failed to cancel the pending clone %q for snapshot %q", pendingClone.Name, snapshot.Name) + } + } + } + return nil +} + +func DeleteSubVolumeSnapshots(context *clusterd.Context, clusterInfo *client.ClusterInfo, snapshots cephclient.SubVolumeSnapshots, fsName, subvol, svg string) error { + for _, snapshot := range snapshots { + logger.Infof("deleting snapshot %q for subvolume %q in group %q", snapshot.Name, subvol, svg) + err := cephclient.DeleteSubvolumeSnapshot(context, clusterInfo, fsName, subvol, svg, snapshot.Name) + if err != nil { + return errors.Wrapf(err, "failed to delete snapshot %q for subvolume %q in group %q. %v", snapshot.Name, subvol, svg, err) + } + logger.Infof("successfully deleted snapshot %q for subvolume %q in group %q", snapshot.Name, subvol, svg) + } + return nil +} + +func CleanUpOMAPDetails(context *clusterd.Context, clusterInfo *client.ClusterInfo, objName, poolName, namespace string) error { + omapValue := getOMAPValue(objName) + if omapValue == "" { + return errors.New(fmt.Sprintf("failed to get OMAP value for object %q", objName)) + } + logger.Infof("OMAP value for the object %q is %q", objName, omapValue) + omapKey, err := cephclient.GetOMAPKey(context, clusterInfo, omapValue, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to get OMAP key for omapObj %q. %v", omapValue, err) + } + logger.Infof("OMAP key for the OIMAP value %q is %q", omapValue, omapKey) + + // delete OMAP details + err = cephclient.DeleteOmapValue(context, clusterInfo, omapValue, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to delete OMAP value %q. %v", omapValue, err) + } + if omapKey != "" { + err = cephclient.DeleteOmapKey(context, clusterInfo, omapKey, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to delete OMAP key %q. %v", omapKey, err) + } + } + return nil +} + +func getOMAPValue(subVol string) string { + splitSubvol := strings.SplitAfterN(subVol, "-", 3) + if len(splitSubvol) < 3 { + return "" + } + subvol_id := splitSubvol[len(splitSubvol)-1] + omapval := "csi.volume." + subvol_id + return omapval +} diff --git a/pkg/daemon/ceph/cleanup/subvolumegroups_test.go b/pkg/daemon/ceph/cleanup/subvolumegroups_test.go new file mode 100644 index 000000000000..151133298e5a --- /dev/null +++ b/pkg/daemon/ceph/cleanup/subvolumegroups_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cleanup + +import ( + "errors" + "testing" + "time" + + "github.com/rook/rook/pkg/clusterd" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + exectest "github.com/rook/rook/pkg/util/exec/test" + "github.com/stretchr/testify/assert" +) + +const ( + mockSubVolumeListResp = `[{"name":"csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89"}]` + mockGetOmapValResp = `Writing to /dev/stdout +pvc-e2907819-b005-4582-97ec-8fa1b277f46fsh` + mockSubvolumeSnapshotsResp = `[{"name":"snap0"}]` + mockSubvolumeSnapshotInfoResp = `{"created_at":"2024-04-0808:46:36.267888","data_pool":"myfs-replicated","has_pending_clones":"yes","pending_clones":[{"name":"clone3"}]}` +) + +func TestSubVolumeGroupCleanup(t *testing.T) { + clusterInfo := cephclient.AdminTestClusterInfo("mycluster") + fsName := "myfs" + subVolumeGroupName := "csi" + poolName := "myfs-metadata" + csiNamespace := "csi" + + t.Run("no subvolumes in subvolumegroup", func(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithTimeout = func(timeout time.Duration, command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "ls" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, subVolumeGroupName, args[4]) + return "[]", nil + } + return "", errors.New("unknown command") + } + context := &clusterd.Context{Executor: executor} + err := SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + assert.NoError(t, err) + }) + + t.Run("subvolumes with snapshots and pending clones", func(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithTimeout = func(timeout time.Duration, command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + // list all subvolumes in subvolumegroup + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "ls" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, subVolumeGroupName, args[4]) + return mockSubVolumeListResp, nil + } + if args[0] == "getomapval" { + assert.Equal(t, "csi.volume.9d942d45-6f48-4d9e-a707-edfcd602bd89", args[1]) + assert.Equal(t, "csi.volname", args[2]) + assert.Equal(t, poolName, args[4]) + assert.Equal(t, csiNamespace, args[6]) + return mockGetOmapValResp, nil + } + // delete OMAP value + if args[0] == "rm" && args[1] == "csi.volume.9d942d45-6f48-4d9e-a707-edfcd602bd89" { + assert.Equal(t, "-p", args[2]) + assert.Equal(t, poolName, args[3]) + assert.Equal(t, "--namespace", args[4]) + assert.Equal(t, csiNamespace, args[5]) + return "", nil + } + // delete OMAP key + if args[0] == "rmomapkey" && args[1] == "csi.volumes.default" { + assert.Equal(t, "ceph.volume.pvc-e2907819-b005-4582-97ec-8fa1b277f46fsh", args[2]) + assert.Equal(t, "-p", args[3]) + assert.Equal(t, poolName, args[4]) + assert.Equal(t, "--namespace", args[5]) + assert.Equal(t, csiNamespace, args[6]) + return "", nil + } + // list all snapshots in a subvolume + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "ls" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "--group_name", args[6]) + assert.Equal(t, subVolumeGroupName, args[7]) + return mockSubvolumeSnapshotsResp, nil + } + // list all pending clones in a subvolume snapshot + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "info" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "snap0", args[6]) + assert.Equal(t, "--group_name", args[7]) + assert.Equal(t, subVolumeGroupName, args[8]) + return mockSubvolumeSnapshotInfoResp, nil + } + // cancel pending clones + if args[0] == "fs" && args[1] == "clone" && args[2] == "cancel" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, "clone3", args[4]) + assert.Equal(t, "--group_name", args[5]) + assert.Equal(t, subVolumeGroupName, args[6]) + return "", nil + } + // delete snapshots + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "rm" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "snap0", args[6]) + assert.Equal(t, "--group_name", args[7]) + assert.Equal(t, subVolumeGroupName, args[8]) + return "", nil + } + // delete subvolume + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "rm" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[4]) + assert.Equal(t, subVolumeGroupName, args[5]) + assert.Equal(t, "--force", args[6]) + return "", nil + } + return "", errors.New("unknown command") + } + context := &clusterd.Context{Executor: executor} + err := SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + assert.NoError(t, err) + }) +} + +func TestGetOmapValue(t *testing.T) { + result := getOMAPValue("csi-vol-3a41b367-9566-4dbb-8884-39e1fa306ea7") + assert.Equal(t, "csi.volume.3a41b367-9566-4dbb-8884-39e1fa306ea7", result) + + result = getOMAPValue("invalidSubVolume") + assert.Equal(t, "", result) +} diff --git a/pkg/daemon/ceph/client/filesystem.go b/pkg/daemon/ceph/client/filesystem.go index f7785ff98ee5..048f7ce1bd0d 100644 --- a/pkg/daemon/ceph/client/filesystem.go +++ b/pkg/daemon/ceph/client/filesystem.go @@ -362,8 +362,8 @@ func deleteFSPool(context *clusterd.Context, clusterInfo *ClusterInfo, poolNames } // WaitForNoStandbys waits for all standbys go away -func WaitForNoStandbys(context *clusterd.Context, clusterInfo *ClusterInfo, timeout time.Duration) error { - err := wait.PollUntilContextTimeout(clusterInfo.Context, 3*time.Second, timeout, true, func(ctx ctx.Context) (bool, error) { +func WaitForNoStandbys(context *clusterd.Context, clusterInfo *ClusterInfo, retryInterval, timeout time.Duration) error { + err := wait.PollUntilContextTimeout(clusterInfo.Context, retryInterval, timeout, true, func(ctx ctx.Context) (bool, error) { mdsDump, err := GetMDSDump(context, clusterInfo) if err != nil { logger.Errorf("failed to get fs dump. %v", err) @@ -435,7 +435,7 @@ type SubvolumeList []Subvolume const NoSubvolumeGroup = "" // ListSubvolumesInGroup lists all subvolumes present in the given filesystem's subvolume group by -// name. If groupName is empty, list subvolumes that are not in any group. Times out after 5 seconds. +// name. If groupName is empty, list subvolumes that are not in any group. var ListSubvolumesInGroup = listSubvolumesInGroup // with above, allow this to be overridden for unit testing @@ -453,10 +453,61 @@ func listSubvolumesInGroup(context *clusterd.Context, clusterInfo *ClusterInfo, if err != nil { return svs, errors.Wrapf(err, "failed to list subvolumes in filesystem %q subvolume group %q", fsName, groupName) } - if err := json.Unmarshal(buf, &svs); err != nil { return svs, errors.Wrapf(err, "failed to unmarshal subvolume list for filesystem %q subvolume group %q", fsName, groupName) } + return svs, nil +} + +// SubVolumeSnapshot represents snapshot of a cephFS subvolume +type SubVolumeSnapshot struct { + Name string `json:"name"` +} + +// SubVolumeSnapshots is the list of snapshots in a CephFS subvolume +type SubVolumeSnapshots []SubVolumeSnapshot + +// ListSubVolumeSnaphots lists all the subvolume snapshots present in the subvolume in the given filesystem's subvolume group. +var ListSubVolumeSnapshots = listSubVolumeSnapshots + +func listSubVolumeSnapshots(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, subVolumeName, groupName string) (SubVolumeSnapshots, error) { + svs := SubVolumeSnapshots{} + args := []string{"fs", "subvolume", "snapshot", "ls", fsName, subVolumeName, "--group_name", groupName} + cmd := NewCephCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return svs, errors.Wrapf(err, "failed to list subvolumes in filesystem %q subvolume group %q", fsName, groupName) + } + + if err := json.Unmarshal(buf, &svs); err != nil { + return svs, errors.Wrapf(err, "failed to unmarshal snapshots for subvolume %q for filesystem %q subvolume group %q", subVolumeName, fsName, groupName) + } return svs, nil } + +// SubVolumeSnapshotPendingClones refers to all the pending clones available in a cephFS subvolume snapshot +type SubVolumeSnapshotPendingClones struct { + Clones []struct { + Name string `json:"name"` + } `json:"pending_clones"` +} + +var ListSubVolumeSnapshotPendingClones = listSubVolumeSnapshotPendingClones + +// listSubVolumeSnapshotPendingClones lists all the pending clones available in a cephFS subvolume snapshot +func listSubVolumeSnapshotPendingClones(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, subVolumeName, snap, groupName string) (SubVolumeSnapshotPendingClones, error) { + pendingClones := SubVolumeSnapshotPendingClones{} + args := []string{"fs", "subvolume", "snapshot", "info", fsName, subVolumeName, snap, "--group_name", groupName} + cmd := NewCephCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return pendingClones, errors.Wrapf(err, "failed to list pending clones available for snapshot %q in filesystem %q in subvolume group %q", snap, fsName, groupName) + } + + if err := json.Unmarshal(buf, &pendingClones); err != nil { + return pendingClones, errors.Wrapf(err, "failed to unmarshal pending clones list for for snapshot %q in filesystem %q subvolume group %q", snap, fsName, groupName) + } + + return pendingClones, nil +} diff --git a/pkg/daemon/ceph/client/filesystem_test.go b/pkg/daemon/ceph/client/filesystem_test.go index 5df458464330..e23a8412891a 100644 --- a/pkg/daemon/ceph/client/filesystem_test.go +++ b/pkg/daemon/ceph/client/filesystem_test.go @@ -542,7 +542,7 @@ func TestWaitForNoStandbys(t *testing.T) { return "", errors.Errorf("unexpected ceph command %q", args) } - err := WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err := WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.Error(t, err) executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) { @@ -555,7 +555,7 @@ func TestWaitForNoStandbys(t *testing.T) { return "", errors.Errorf("unexpected ceph command %q", args) } - err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.Error(t, err) firstCall := true @@ -582,9 +582,8 @@ func TestWaitForNoStandbys(t *testing.T) { } return "", errors.Errorf("unexpected ceph command %q", args) } - err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.NoError(t, err) - } func TestListSubvolumeGroups(t *testing.T) { diff --git a/pkg/daemon/ceph/client/subvolumegroup.go b/pkg/daemon/ceph/client/subvolumegroup.go index 933fa4075551..bfe991a378ba 100644 --- a/pkg/daemon/ceph/client/subvolumegroup.go +++ b/pkg/daemon/ceph/client/subvolumegroup.go @@ -17,32 +17,104 @@ limitations under the License. package client import ( + "encoding/json" "fmt" "strconv" + "strings" + "syscall" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" "github.com/rook/rook/pkg/clusterd" + "github.com/rook/rook/pkg/util/exec" "k8s.io/apimachinery/pkg/types" ) // CreateCephFSSubVolumeGroup create a CephFS subvolume group. // volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. -func CreateCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) error { +func CreateCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string, svgSpec *cephv1.CephFilesystemSubVolumeGroupSpec) error { logger.Infof("creating cephfs %q subvolume group %q", volName, groupName) - // [--pool_layout ] [--uid ] [--gid ] [--mode ] + // [] [--pool_layout ] [--uid ] [--gid ] [--mode ] args := []string{"fs", "subvolumegroup", "create", volName, groupName} + if svgSpec != nil { + if svgSpec.Quota != nil { + // convert the size to bytes as ceph expect the size in bytes + args = append(args, fmt.Sprintf("--size=%d", svgSpec.Quota.Value())) + } + if svgSpec.DataPoolName != "" { + args = append(args, fmt.Sprintf("--pool_layout=%s", svgSpec.DataPoolName)) + } + } + + svgInfo, err := getCephFSSubVolumeGroupInfo(context, clusterInfo, volName, groupName) + if err != nil { + // return error other than not found. + if code, ok := exec.ExitStatus(err); ok && code != int(syscall.ENOENT) { + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q", groupName, volName) + } + } + + // if the subvolumegroup exists, resize the subvolumegroup + if err == nil && svgSpec != nil && svgSpec.Quota != nil && svgSpec.Quota.CmpInt64(svgInfo.BytesQuota) != 0 { + err = resizeCephFSSubVolumeGroup(context, clusterInfo, volName, groupName, svgSpec) + if err != nil { + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q", groupName, volName) + } + } + + cmd := NewCephCommand(context, clusterInfo, args) + cmd.JsonOutput = false + output, err := cmd.Run() + if err != nil { + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q. %s", groupName, volName, output) + } + + logger.Infof("successfully created subvolume group %q in filesystem %q", groupName, volName) + return nil +} + +// resizeCephFSSubVolumeGroup resize a CephFS subvolume group. +// volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. +func resizeCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string, svgSpec *cephv1.CephFilesystemSubVolumeGroupSpec) error { + logger.Infof("resizing cephfs %q subvolume group %q", volName, groupName) + // [--no-shrink] + args := []string{"fs", "subvolumegroup", "resize", volName, groupName, "--no-shrink", fmt.Sprintf("%d", svgSpec.Quota.Value())} cmd := NewCephCommand(context, clusterInfo, args) cmd.JsonOutput = false output, err := cmd.Run() if err != nil { - return errors.Wrapf(err, "failed to create subvolume group %q. %s", volName, output) + return errors.Wrapf(err, "failed to resize subvolume group %q in filesystem %q. %s", groupName, volName, output) } - logger.Infof("successfully created cephfs %q subvolume group %q", volName, groupName) + logger.Infof("successfully resized subvolume group %q in filesystem %q to %s", groupName, volName, svgSpec.Quota) return nil } +type subvolumeGroupInfo struct { + BytesQuota int64 `json:"bytes_quota"` + BytesUsed int64 `json:"bytes_used"` + DataPool string `json:"data_pool"` +} + +// getCephFSSubVolumeGroupInfo get subvolumegroup info of the group name. +// volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. +func getCephFSSubVolumeGroupInfo(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) (*subvolumeGroupInfo, error) { + args := []string{"fs", "subvolumegroup", "info", volName, groupName} + cmd := NewCephCommand(context, clusterInfo, args) + cmd.JsonOutput = true + output, err := cmd.Run() + if err != nil { + return nil, errors.Wrapf(err, "failed to get subvolume group %q in filesystem %q. %s", groupName, volName, output) + } + + svgInfo := subvolumeGroupInfo{} + err = json.Unmarshal(output, &svgInfo) + if err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal into subvolumeGroupInfo") + } + return &svgInfo, nil +} + // DeleteCephFSSubVolumeGroup delete a CephFS subvolume group. func DeleteCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) error { logger.Infof("deleting cephfs %q subvolume group %q", volName, groupName) @@ -129,3 +201,81 @@ func validatePinningValues(pinning cephv1.CephFilesystemSubVolumeGroupSpecPinnin } return err } + +func GetOMAPKey(context *clusterd.Context, clusterInfo *ClusterInfo, omapObj, poolName, namespace string) (string, error) { + args := []string{"getomapval", omapObj, "csi.volname", "-p", poolName, "--namespace", namespace, "/dev/stdout"} + cmd := NewRadosCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return "", errors.Wrapf(err, "failed to list omapKeys for omapObj %q", omapObj) + } + + // Todo: is there a way to avoid this parsing? + respStr := string(buf) + var pvcName string + if len(respStr) != 0 { + resp := strings.Split(respStr, "\n") + if len(resp) == 2 { + pvcName = resp[1] + } + } + + if pvcName == "" { + return "", nil + } + + omapKey := fmt.Sprintf("ceph.volume.%s", pvcName) + return omapKey, nil +} + +func DeleteOmapValue(context *clusterd.Context, clusterInfo *ClusterInfo, omapValue, poolName, namespace string) error { + args := []string{"rm", omapValue, "-p", poolName, "--namespace", namespace} + cmd := NewRadosCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete omap value %q in pool %q", omapValue, poolName) + } + logger.Infof("successfully deleted omap value %q for pool %q", omapValue, poolName) + return nil +} + +func DeleteOmapKey(context *clusterd.Context, clusterInfo *ClusterInfo, omapKey, poolName, namespace string) error { + args := []string{"rmomapkey", "csi.volumes.default", omapKey, "-p", poolName, "--namespace", namespace} + cmd := NewRadosCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete omapKey %q in pool %q", omapKey, poolName) + } + logger.Infof("successfully deleted omap key %q for pool %q", omapKey, poolName) + return nil +} + +func DeleteSubVolume(context *clusterd.Context, clusterInfo *ClusterInfo, fs, subvol, svg string) error { + args := []string{"fs", "subvolume", "rm", fs, subvol, svg, "--force"} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete subvolume %q in filesystem %q", subvol, fs) + } + return nil +} + +func DeleteSubvolumeSnapshot(context *clusterd.Context, clusterInfo *ClusterInfo, fs, subvol, svg, snap string) error { + args := []string{"fs", "subvolume", "snapshot", "rm", fs, subvol, snap, "--group_name", svg} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete subvolume %q in filesystem %q", subvol, fs) + } + return nil +} + +func CancelSnapshotClone(context *clusterd.Context, clusterInfo *ClusterInfo, fs, svg, clone string) error { + args := []string{"fs", "clone", "cancel", fs, clone, "--group_name", svg} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to cancel clone %q in filesystem %q in group %q", clone, fs, svg) + } + return nil +} diff --git a/pkg/operator/ceph/cluster/cleanup.go b/pkg/operator/ceph/cluster/cleanup.go index 6c87353b09c4..92b2933591ab 100644 --- a/pkg/operator/ceph/cluster/cleanup.go +++ b/pkg/operator/ceph/cluster/cleanup.go @@ -138,7 +138,7 @@ func (c *ClusterController) cleanUpJobContainer(cluster *cephv1.CephCluster, mon SecurityContext: securityContext, VolumeMounts: volumeMounts, Env: envVars, - Args: []string{"ceph", "clean"}, + Args: []string{"ceph", "clean", "host"}, Resources: cephv1.GetCleanupResources(cluster.Spec.Resources), } } diff --git a/pkg/operator/ceph/cluster/osd/update.go b/pkg/operator/ceph/cluster/osd/update.go index 06b78a465bdd..2bfcbb87afda 100644 --- a/pkg/operator/ceph/cluster/osd/update.go +++ b/pkg/operator/ceph/cluster/osd/update.go @@ -79,6 +79,19 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) { if c.doneUpdating() { return // no more OSDs to update } + if !c.cluster.spec.SkipUpgradeChecks && c.cluster.spec.UpgradeOSDRequiresHealthyPGs { + pgHealthMsg, pgClean, err := cephclient.IsClusterClean(c.cluster.context, c.cluster.clusterInfo, c.cluster.spec.DisruptionManagement.PGHealthyRegex) + if err != nil { + logger.Warningf("failed to check PGs status to update OSDs, will try updating it again later. %v", err) + return + } + if !pgClean { + logger.Infof("PGs are not healthy to update OSDs, will try updating it again later. PGs status: %q", pgHealthMsg) + return + } + logger.Infof("PGs are healthy to proceed updating OSDs. %v", pgHealthMsg) + } + osdIDQuery, _ := c.queue.Pop() var osdIDs []int diff --git a/pkg/operator/ceph/cluster/osd/update_test.go b/pkg/operator/ceph/cluster/osd/update_test.go index 61ca4425b5be..4c7bfc786b41 100644 --- a/pkg/operator/ceph/cluster/osd/update_test.go +++ b/pkg/operator/ceph/cluster/osd/update_test.go @@ -75,6 +75,8 @@ func Test_updateExistingOSDs(t *testing.T) { updateInjectFailures k8sutil.Failures // return failures from mocked updateDeploymentAndWaitFunc returnOkToStopIDs []int // return these IDs are ok-to-stop (or not ok to stop if empty) forceUpgradeIfUnhealthy bool + requiresHealthyPGs bool + cephStatus string ) // intermediates (created from inputs) @@ -108,6 +110,7 @@ func Test_updateExistingOSDs(t *testing.T) { clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) spec := cephv1.ClusterSpec{ ContinueUpgradeAfterChecksEvenIfNotHealthy: forceUpgradeIfUnhealthy, + UpgradeOSDRequiresHealthyPGs: requiresHealthyPGs, } c = New(ctx, clusterInfo, spec, "rook/rook:master") config := c.newProvisionConfig() @@ -164,6 +167,9 @@ func Test_updateExistingOSDs(t *testing.T) { return cephclientfake.OSDDeviceClassOutput(args[3]), nil } } + if args[0] == "status" { + return cephStatus, nil + } panic(fmt.Sprintf("unexpected command %q with args %v", command, args)) }, } @@ -361,6 +367,43 @@ func Test_updateExistingOSDs(t *testing.T) { assert.Equal(t, 0, updateQueue.Len()) // the OSD should now have been removed from the queue }) + t.Run("PGs not clean to upgrade OSD", func(t *testing.T) { + clientset = fake.NewSimpleClientset() + updateQueue = newUpdateQueueWithIDs(2) + existingDeployments = newExistenceListWithIDs(2) + requiresHealthyPGs = true + cephStatus = unHealthyCephStatus + updateInjectFailures = k8sutil.Failures{} + doSetup() + + osdToBeQueried = 2 + updateConfig.updateExistingOSDs(errs) + assert.Zero(t, errs.len()) + assert.ElementsMatch(t, deploymentsUpdated, []string{}) + assert.Equal(t, 1, updateQueue.Len()) // the OSD should remain + + }) + + t.Run("PGs clean to upgrade OSD", func(t *testing.T) { + clientset = fake.NewSimpleClientset() + updateQueue = newUpdateQueueWithIDs(0) + existingDeployments = newExistenceListWithIDs(0) + requiresHealthyPGs = true + cephStatus = healthyCephStatus + forceUpgradeIfUnhealthy = true // FORCE UPDATES + updateInjectFailures = k8sutil.Failures{} + doSetup() + addDeploymentOnNode("node0", 0) + + osdToBeQueried = 0 + returnOkToStopIDs = []int{0} + updateConfig.updateExistingOSDs(errs) + assert.Zero(t, errs.len()) + assert.ElementsMatch(t, deploymentsUpdated, []string{deploymentName(0)}) + assert.Equal(t, 0, updateQueue.Len()) // should be done with updates + + }) + t.Run("continueUpgradesAfterChecksEvenIfUnhealthy = true", func(t *testing.T) { clientset = fake.NewSimpleClientset() updateQueue = newUpdateQueueWithIDs(2) diff --git a/pkg/operator/ceph/controller/cleanup.go b/pkg/operator/ceph/controller/cleanup.go new file mode 100644 index 000000000000..72234271901f --- /dev/null +++ b/pkg/operator/ceph/controller/cleanup.go @@ -0,0 +1,144 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package controller + +import ( + "context" + "strings" + + "github.com/pkg/errors" + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/rook/rook/pkg/operator/k8sutil" + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + k8sClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + volumeName = "cleanup-volume" + dataDirHostPath = "ROOK_DATA_DIR_HOST_PATH" + CleanupAppName = "resource-cleanup" + RESOURCE_CLEANUP_ANNOTATION = "rook.io/force-deletion" + + // CephFSSubVolumeGroup env resources + CephFSSubVolumeGroupNameEnv = "SUB_VOLUME_GROUP_NAME" + CephFSNameEnv = "FILESYSTEM_NAME" + CSICephFSRadosNamesaceEnv = "CSI_CEPHFS_RADOS_NAMESPACE" + CephFSMetaDataPoolNameEnv = "METADATA_POOL_NAME" +) + +// ResourceCleanup defines an rook ceph resource to be cleaned up +type ResourceCleanup struct { + resource k8sClient.Object + cluster *cephv1.CephCluster + rookImage string + // config defines the attributes of the custom resource to passed in as environment variables in the clean up job + config map[string]string +} + +func NewResourceCleanup(obj k8sClient.Object, cluster *cephv1.CephCluster, rookImage string, config map[string]string) *ResourceCleanup { + return &ResourceCleanup{ + resource: obj, + rookImage: rookImage, + cluster: cluster, + config: config, + } +} + +// Start a new job to perform clean up of the ceph resources. It returns true if the cleanup job has succeeded +func (c *ResourceCleanup) StartJob(ctx context.Context, clientset kubernetes.Interface, jobName string) error { + podSpec := c.jobTemplateSpec() + job := &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: c.resource.GetNamespace(), + OwnerReferences: c.resource.GetOwnerReferences(), + }, + Spec: batch.JobSpec{ + Template: podSpec, + }, + } + + if err := k8sutil.RunReplaceableJob(ctx, clientset, job, false); err != nil { + return errors.Wrapf(err, "failed to run clean up job for %q resource named %q in namespace %q", + c.resource.GetObjectKind().GroupVersionKind().Kind, c.resource.GetName(), c.resource.GetNamespace()) + } + + return nil +} + +func (c *ResourceCleanup) jobContainer() v1.Container { + volumeMounts := []v1.VolumeMount{} + envVars := []v1.EnvVar{} + if c.cluster.Spec.DataDirHostPath != "" { + hostPathVolumeMount := v1.VolumeMount{Name: volumeName, MountPath: c.cluster.Spec.DataDirHostPath} + volumeMounts = append(volumeMounts, hostPathVolumeMount) + envVars = append(envVars, []v1.EnvVar{ + {Name: dataDirHostPath, Value: c.cluster.Spec.DataDirHostPath}, + {Name: "ROOK_LOG_LEVEL", Value: "DEBUG"}, + {Name: k8sutil.PodNamespaceEnvVar, Value: c.resource.GetNamespace()}, + }...) + } + // append all the resource attributes as env variables. + for k, v := range c.config { + envVars = append(envVars, v1.EnvVar{Name: k, Value: v}) + } + securityContext := PrivilegedContext(true) + return v1.Container{ + Name: "resource-cleanup", + Image: c.rookImage, + SecurityContext: securityContext, + VolumeMounts: volumeMounts, + Env: envVars, + Args: []string{"ceph", "clean", c.resource.GetObjectKind().GroupVersionKind().Kind}, + Resources: cephv1.GetCleanupResources(c.cluster.Spec.Resources), + } +} + +func (c *ResourceCleanup) jobTemplateSpec() v1.PodTemplateSpec { + volumes := []v1.Volume{} + hostPathVolume := v1.Volume{Name: volumeName, VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: c.cluster.Spec.DataDirHostPath}}} + volumes = append(volumes, hostPathVolume) + + podSpec := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: CleanupAppName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + c.jobContainer(), + }, + Volumes: volumes, + RestartPolicy: v1.RestartPolicyOnFailure, + PriorityClassName: cephv1.GetCleanupPriorityClassName(c.cluster.Spec.PriorityClassNames), + ServiceAccountName: k8sutil.DefaultServiceAccount, + }, + } + + return podSpec +} + +// ForceDeleteRequested returns true if `rook.io/force-deletion:true` annotation is available on the resource +func ForceDeleteRequested(annotations map[string]string) bool { + if value, found := annotations[RESOURCE_CLEANUP_ANNOTATION]; found { + if strings.EqualFold(value, "true") { + return true + } + } + return false +} diff --git a/pkg/operator/ceph/controller/cleanup_test.go b/pkg/operator/ceph/controller/cleanup_test.go new file mode 100644 index 000000000000..632977b71e12 --- /dev/null +++ b/pkg/operator/ceph/controller/cleanup_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestJobTemplateSpec(t *testing.T) { + expectedHostPath := "var/lib/rook" + expectedNamespace := "test-rook-ceph" + rookImage := "test" + cluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: expectedNamespace, + }, + Spec: cephv1.ClusterSpec{ + DataDirHostPath: expectedHostPath, + CleanupPolicy: cephv1.CleanupPolicySpec{ + Confirmation: "yes-really-destroy-data", + }, + }, + } + svgObj := &cephv1.CephFilesystemSubVolumeGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CephFSSubvolumeGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svg", + Namespace: expectedNamespace, + }, + } + testConfig := map[string]string{ + "config1": "value1", + "config2": "value2", + } + cleanup := NewResourceCleanup(svgObj, cluster, rookImage, testConfig) + podTemplateSpec := cleanup.jobTemplateSpec() + assert.Equal(t, "CephFSSubvolumeGroup", podTemplateSpec.Spec.Containers[0].Args[2]) + assert.Equal(t, "config1", podTemplateSpec.Spec.Containers[0].Env[3].Name) + assert.Equal(t, "value1", podTemplateSpec.Spec.Containers[0].Env[3].Value) + assert.Equal(t, "config2", podTemplateSpec.Spec.Containers[0].Env[4].Name) + assert.Equal(t, "value2", podTemplateSpec.Spec.Containers[0].Env[4].Value) +} + +func TestForceDeleteRequested(t *testing.T) { + svgObj := &cephv1.CephFilesystemSubVolumeGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CephFSSubvolumeGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svg", + Namespace: "test", + Annotations: map[string]string{}, + }, + } + + result := ForceDeleteRequested(svgObj.Annotations) + assert.False(t, result) + + svgObj.Annotations[RESOURCE_CLEANUP_ANNOTATION] = "true" + result = ForceDeleteRequested(svgObj.Annotations) + assert.True(t, result) +} diff --git a/pkg/operator/ceph/file/filesystem.go b/pkg/operator/ceph/file/filesystem.go index 1db7da060731..898875c6b7e6 100644 --- a/pkg/operator/ceph/file/filesystem.go +++ b/pkg/operator/ceph/file/filesystem.go @@ -74,7 +74,7 @@ func createFilesystem( } } - err := cephclient.CreateCephFSSubVolumeGroup(context, clusterInfo, fs.Name, defaultCSISubvolumeGroup) + err := cephclient.CreateCephFSSubVolumeGroup(context, clusterInfo, fs.Name, defaultCSISubvolumeGroup, nil) if err != nil { return errors.Wrapf(err, "failed to create subvolume group %q", defaultCSISubvolumeGroup) } @@ -169,7 +169,7 @@ func newFS(name, namespace string) *Filesystem { func createOrUpdatePools(f *Filesystem, context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, spec cephv1.FilesystemSpec) error { // generating the metadata pool's name metadataPool := cephv1.NamedPoolSpec{ - Name: generateMetaDataPoolName(f), + Name: GenerateMetaDataPoolName(f.Name), PoolSpec: spec.MetadataPool, } metadataPool.Application = cephfsApplication @@ -242,7 +242,7 @@ func (f *Filesystem) doFilesystemCreate(context *clusterd.Context, clusterInfo * spec.MetadataPool.Application = cephfsApplication metadataPool := cephv1.NamedPoolSpec{ - Name: generateMetaDataPoolName(f), + Name: GenerateMetaDataPoolName(f.Name), PoolSpec: spec.MetadataPool, } if _, poolFound := reversedPoolMap[metadataPool.Name]; !poolFound { @@ -307,7 +307,7 @@ func generateDataPoolNames(f *Filesystem, spec cephv1.FilesystemSpec) []string { return dataPoolNames } -// generateMetaDataPoolName generates MetaDataPool name by prefixing the filesystem name to the constant metaDataPoolSuffix -func generateMetaDataPoolName(f *Filesystem) string { - return fmt.Sprintf("%s-%s", f.Name, metaDataPoolSuffix) +// GenerateMetaDataPoolName generates MetaDataPool name by prefixing the filesystem name to the constant metaDataPoolSuffix +func GenerateMetaDataPoolName(fsName string) string { + return fmt.Sprintf("%s-%s", fsName, metaDataPoolSuffix) } diff --git a/pkg/operator/ceph/file/filesystem_test.go b/pkg/operator/ceph/file/filesystem_test.go index 637bc4563069..ffe8aaf5dccd 100644 --- a/pkg/operator/ceph/file/filesystem_test.go +++ b/pkg/operator/ceph/file/filesystem_test.go @@ -233,6 +233,8 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool, createData return `{"standbys":[], "filesystems":[]}`, nil } else if reflect.DeepEqual(args[0:5], []string{"fs", "subvolumegroup", "create", fsName, defaultCSISubvolumeGroup}) { return "", nil + } else if reflect.DeepEqual(args[0:5], []string{"fs", "subvolumegroup", "info", fsName, defaultCSISubvolumeGroup}) { + return "", nil } else if contains(args, "osd") && contains(args, "lspools") { return "[]", nil } else if contains(args, "mds") && contains(args, "fail") { diff --git a/pkg/operator/ceph/file/mds/mds.go b/pkg/operator/ceph/file/mds/mds.go index 776a730a5bd4..a315101777d3 100644 --- a/pkg/operator/ceph/file/mds/mds.go +++ b/pkg/operator/ceph/file/mds/mds.go @@ -290,7 +290,7 @@ func (c *Cluster) upgradeMDS() error { return errors.Wrap(err, "failed to scale down deployments during upgrade") } logger.Debugf("waiting for all standbys gone") - if err := cephclient.WaitForNoStandbys(c.context, c.clusterInfo, 120*time.Second); err != nil { + if err := cephclient.WaitForNoStandbys(c.context, c.clusterInfo, 3*time.Second, 120*time.Second); err != nil { return errors.Wrap(err, "failed to wait for stopping all standbys") } diff --git a/pkg/operator/ceph/file/subvolumegroup/controller.go b/pkg/operator/ceph/file/subvolumegroup/controller.go index 0bf5db43f404..a291a8e040eb 100644 --- a/pkg/operator/ceph/file/subvolumegroup/controller.go +++ b/pkg/operator/ceph/file/subvolumegroup/controller.go @@ -40,6 +40,7 @@ import ( "github.com/rook/rook/pkg/clusterd" opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/ceph/csi" + "github.com/rook/rook/pkg/operator/ceph/file" "github.com/rook/rook/pkg/operator/ceph/reporting" "github.com/rook/rook/pkg/operator/k8sutil" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -187,7 +188,7 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) reconcile(request reconcile.Requ if cephCluster.Spec.External.Enable { logger.Warningf("external subvolume group %q deletion is not supported, delete it manually", namespacedName) } else { - err := r.deleteSubVolumeGroup(cephFilesystemSubVolumeGroup) + err = r.deleteSubVolumeGroup(cephFilesystemSubVolumeGroup, &cephCluster) if err != nil { if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) { logger.Info(opcontroller.OperatorNotInitializedMessage) @@ -317,7 +318,7 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) updateClusterConfig(cephFilesyst func (r *ReconcileCephFilesystemSubVolumeGroup) createOrUpdateSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup) error { logger.Infof("creating ceph filesystem subvolume group %s in namespace %s", cephFilesystemSubVolumeGroup.Name, cephFilesystemSubVolumeGroup.Namespace) - err := cephclient.CreateCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)) + err := cephclient.CreateCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup), &cephFilesystemSubVolumeGroup.Spec) if err != nil { return errors.Wrapf(err, "failed to create ceph filesystem subvolume group %q", cephFilesystemSubVolumeGroup.Name) } @@ -326,7 +327,8 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) createOrUpdateSubVolumeGroup(cep } // Delete the ceph filesystem subvolume group -func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup) error { +func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup, + cephCluster *cephv1.CephCluster) error { namespacedName := fmt.Sprintf("%s/%s", cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Name) logger.Infof("deleting ceph filesystem subvolume group object %q", namespacedName) if err := cephclient.DeleteCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)); err != nil { @@ -339,7 +341,17 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesys // If the subvolume group has subvolumes the command will fail with: // Error ENOTEMPTY: error in rmdir /volumes/csi if ok && (code == int(syscall.ENOTEMPTY)) { - return errors.Wrapf(err, "failed to delete ceph filesystem subvolume group %q, remove the subvolumes first", cephFilesystemSubVolumeGroup.Name) + msg := fmt.Sprintf("failed to delete ceph filesystem subvolume group %q, remove the subvolumes first", cephFilesystemSubVolumeGroup.Name) + if opcontroller.ForceDeleteRequested(cephFilesystemSubVolumeGroup.GetAnnotations()) { + // cleanup cephFS subvolumes + cleanupErr := r.cleanup(cephFilesystemSubVolumeGroup, cephCluster) + if cleanupErr != nil { + return errors.Wrapf(cleanupErr, "failed to clean up all the ceph resources created by subVolumeGroup %q", namespacedName) + } + msg = fmt.Sprintf("failed to delete ceph filesystem subvolume group %q, started clean up job to delete the subvolumes", cephFilesystemSubVolumeGroup.Name) + } + + return errors.Wrapf(err, msg) } return errors.Wrapf(err, "failed to delete ceph filesystem subvolume group %q", cephFilesystemSubVolumeGroup.Name) @@ -380,3 +392,20 @@ func buildClusterID(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolume clusterID := fmt.Sprintf("%s-%s-file-%s", cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)) return k8sutil.Hash(clusterID) } + +func (r *ReconcileCephFilesystemSubVolumeGroup) cleanup(svg *cephv1.CephFilesystemSubVolumeGroup, cephCluster *cephv1.CephCluster) error { + logger.Infof("starting cleanup of the ceph resources for subVolumeGroup %q in namespace %q", svg.Name, svg.Namespace) + cleanupConfig := map[string]string{ + opcontroller.CephFSSubVolumeGroupNameEnv: svg.Spec.Name, + opcontroller.CephFSNameEnv: svg.Spec.FilesystemName, + opcontroller.CSICephFSRadosNamesaceEnv: "csi", + opcontroller.CephFSMetaDataPoolNameEnv: file.GenerateMetaDataPoolName(svg.Spec.FilesystemName), + } + cleanup := opcontroller.NewResourceCleanup(svg, cephCluster, r.opConfig.Image, cleanupConfig) + jobName := k8sutil.TruncateNodeNameForJob("cleanup-svg-%s", fmt.Sprintf("%s-%s", svg.Spec.FilesystemName, svg.Name)) + err := cleanup.StartJob(r.clusterInfo.Context, r.context.Clientset, jobName) + if err != nil { + return errors.Wrapf(err, "failed to run clean up job to clean the ceph resources in cephFS subVolumeGroup %q", svg.Name) + } + return nil +} diff --git a/tests/framework/installer/ceph_manifests.go b/tests/framework/installer/ceph_manifests.go index 471d79e763ac..bb8fb0175ae2 100644 --- a/tests/framework/installer/ceph_manifests.go +++ b/tests/framework/installer/ceph_manifests.go @@ -652,7 +652,9 @@ metadata: name: ` + groupName + ` namespace: ` + m.settings.Namespace + ` spec: - filesystemName: ` + fsName + filesystemName: ` + fsName + ` + quota: 10G + dataPoolName: ` + fsName + "-data0" } func (m *CephManifestsMaster) GetCOSIDriver() string {