From ef00fdac538634cd3c12eb8dc26cba5a2762a065 Mon Sep 17 00:00:00 2001 From: sp98 Date: Tue, 2 Apr 2024 19:26:45 +0530 Subject: [PATCH 1/5] core: subvolumegroup clean up Cleanup the resources created by subvolumegroup when its deleted. Following resources will be cleaned up: - OMAP value - OMAP keys - Clones - Snapshots - Subvolumes Signed-off-by: sp98 --- .../Storage-Configuration/ceph-teardown.md | 24 +++ cmd/rook/ceph/cleanup.go | 71 ++++++-- pkg/daemon/ceph/cleanup/subvolumegroups.go | 143 +++++++++++++++++ .../ceph/cleanup/subvolumegroups_test.go | 151 ++++++++++++++++++ pkg/daemon/ceph/client/filesystem.go | 55 ++++++- pkg/daemon/ceph/client/subvolumegroup.go | 80 ++++++++++ pkg/operator/ceph/cluster/cleanup.go | 2 +- pkg/operator/ceph/controller/cleanup.go | 144 +++++++++++++++++ pkg/operator/ceph/controller/cleanup_test.go | 82 ++++++++++ pkg/operator/ceph/file/filesystem.go | 10 +- .../ceph/file/subvolumegroup/controller.go | 35 +++- 11 files changed, 777 insertions(+), 20 deletions(-) create mode 100644 pkg/daemon/ceph/cleanup/subvolumegroups.go create mode 100644 pkg/daemon/ceph/cleanup/subvolumegroups_test.go create mode 100644 pkg/operator/ceph/controller/cleanup.go create mode 100644 pkg/operator/ceph/controller/cleanup_test.go diff --git a/Documentation/Storage-Configuration/ceph-teardown.md b/Documentation/Storage-Configuration/ceph-teardown.md index 3e4e7fd8f9a1..4b8917ecdff4 100644 --- a/Documentation/Storage-Configuration/ceph-teardown.md +++ b/Documentation/Storage-Configuration/ceph-teardown.md @@ -2,6 +2,11 @@ title: Cleanup --- +Rook provides the following clean up options: + +1. [Uninstall: Clean up the entire cluster and delete all data](#cleaning-up-a-cluster) +1. [Force delete individual resources](#force-delete-resources) + ## Cleaning up a Cluster To tear down the cluster, the following resources need to be cleaned up: @@ -179,3 +184,22 @@ If the operator is not able to remove the finalizers (i.e., the operator is not kubectl -n rook-ceph patch configmap rook-ceph-mon-endpoints --type merge -p '{"metadata":{"finalizers": []}}' kubectl -n rook-ceph patch secrets rook-ceph-mon --type merge -p '{"metadata":{"finalizers": []}}' ``` + +## Force Delete Resources + +To keep your data safe in the cluster, Rook disallows deleting critical cluster resources by default. To override this behavior and force delete a specific custom resource, add the annotation `rook.io/force-deletion="true"` to the resource and then delete it. Rook will start a cleanup job that will delete all the related ceph resources created by that custom resource. + +For example, run the following commands to clean the `CephFilesystemSubVolumeGroup` resource named `my-subvolumegroup` + +``` console +kubectl -n rook-ceph annotate cephfilesystemsubvolumegroups.ceph.rook.io my-subvolumegroup rook.io/force-deletion="true" +kubectl -n rook-ceph delete cephfilesystemsubvolumegroups.ceph.rook.io my-subvolumegroup +``` + +Once the cleanup job is completed successfully, Rook will remove the finalizers from the deleted custom resource. + +This cleanup is supported only for the following custom resources: + +| Custom Resource | Ceph Resources to be cleaned up | +| -------- | ------- | +| CephFilesystemSubVolumeGroup | CSI stored RADOS OMAP details for pvc/volumesnapshots, subvolume snapshots, subvolume clones, subvolumes | diff --git a/cmd/rook/ceph/cleanup.go b/cmd/rook/ceph/cleanup.go index e13e92a3067e..de3bf72838de 100644 --- a/cmd/rook/ceph/cleanup.go +++ b/cmd/rook/ceph/cleanup.go @@ -17,12 +17,14 @@ limitations under the License. package ceph import ( + "fmt" "os" "github.com/rook/rook/cmd/rook/rook" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" cleanup "github.com/rook/rook/pkg/daemon/ceph/cleanup" "github.com/rook/rook/pkg/daemon/ceph/client" + opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/k8sutil" "github.com/rook/rook/pkg/util/flags" "github.com/spf13/cobra" @@ -40,22 +42,39 @@ var ( var cleanUpCmd = &cobra.Command{ Use: "clean", - Short: "Starts the cleanup process on the disks after ceph cluster is deleted", + Short: "Starts the cleanup process", +} + +var cleanUpHostCmd = &cobra.Command{ + Use: "host", + Short: "Starts the cleanup process on a host after the ceph cluster is deleted", +} + +var cleanUpSubVolumeGroupCmd = &cobra.Command{ + // the subcommand matches CRD kind of the custom resource to be cleaned up + Use: "CephFilesystemSubVolumeGroup", + Short: "Starts the cleanup process of a CephFilesystemSubVolumeGroup", } func init() { cleanUpCmd.Flags().StringVar(&dataDirHostPath, "data-dir-host-path", "", "dataDirHostPath on the node") cleanUpCmd.Flags().StringVar(&namespaceDir, "namespace-dir", "", "dataDirHostPath on the node") - cleanUpCmd.Flags().StringVar(&monSecret, "mon-secret", "", "monitor secret from the keyring") - cleanUpCmd.Flags().StringVar(&clusterFSID, "cluster-fsid", "", "ceph cluster fsid") - cleanUpCmd.Flags().StringVar(&sanitizeMethod, "sanitize-method", string(cephv1.SanitizeMethodQuick), "sanitize method to use (metadata or data)") - cleanUpCmd.Flags().StringVar(&sanitizeDataSource, "sanitize-data-source", string(cephv1.SanitizeDataSourceZero), "data source to sanitize the disk (zero or random)") - cleanUpCmd.Flags().Int32Var(&sanitizeIteration, "sanitize-iteration", 1, "overwrite N times the disk") - flags.SetFlagsFromEnv(cleanUpCmd.Flags(), rook.RookEnvVarPrefix) - cleanUpCmd.RunE = startCleanUp + cleanUpHostCmd.Flags().StringVar(&monSecret, "mon-secret", "", "monitor secret from the keyring") + cleanUpHostCmd.Flags().StringVar(&clusterFSID, "cluster-fsid", "", "ceph cluster fsid") + cleanUpHostCmd.Flags().StringVar(&sanitizeMethod, "sanitize-method", string(cephv1.SanitizeMethodQuick), "sanitize method to use (metadata or data)") + cleanUpHostCmd.Flags().StringVar(&sanitizeDataSource, "sanitize-data-source", string(cephv1.SanitizeDataSourceZero), "data source to sanitize the disk (zero or random)") + cleanUpHostCmd.Flags().Int32Var(&sanitizeIteration, "sanitize-iteration", 1, "overwrite N times the disk") + flags.SetFlagsFromEnv(cleanUpHostCmd.Flags(), rook.RookEnvVarPrefix) + + flags.SetFlagsFromEnv(cleanUpSubVolumeGroupCmd.Flags(), rook.RookEnvVarPrefix) + + cleanUpCmd.AddCommand(cleanUpHostCmd, cleanUpSubVolumeGroupCmd) + + cleanUpHostCmd.RunE = startHostCleanUp + cleanUpSubVolumeGroupCmd.RunE = startSubVolumeGroupCleanUp } -func startCleanUp(cmd *cobra.Command, args []string) error { +func startHostCleanUp(cmd *cobra.Command, args []string) error { rook.SetLogLevel() rook.LogStartupInfo(cleanUpCmd.Flags()) @@ -87,3 +106,37 @@ func startCleanUp(cmd *cobra.Command, args []string) error { return nil } + +func startSubVolumeGroupCleanUp(cmd *cobra.Command, args []string) error { + rook.SetLogLevel() + rook.LogStartupInfo(cleanUpSubVolumeGroupCmd.Flags()) + + ctx := cmd.Context() + context := createContext() + namespace := os.Getenv(k8sutil.PodNamespaceEnvVar) + clusterInfo := client.AdminClusterInfo(ctx, namespace, "") + + fsName := os.Getenv(opcontroller.CephFSNameEnv) + if fsName == "" { + rook.TerminateFatal(fmt.Errorf("ceph filesystem name is not available in the pod environment variables")) + } + subVolumeGroupName := os.Getenv(opcontroller.CephFSSubVolumeGroupNameEnv) + if subVolumeGroupName == "" { + rook.TerminateFatal(fmt.Errorf("cephFS SubVolumeGroup name is not available in the pod environment variables")) + } + csiNamespace := os.Getenv(opcontroller.CSICephFSRadosNamesaceEnv) + if csiNamespace == "" { + rook.TerminateFatal(fmt.Errorf("CSI rados namespace name is not available in the pod environment variables")) + } + poolName := os.Getenv(opcontroller.CephFSMetaDataPoolNameEnv) + if poolName == "" { + rook.TerminateFatal(fmt.Errorf("cephFS metadata pool name is not available in the pod environment variables")) + } + + err := cleanup.SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + if err != nil { + rook.TerminateFatal(fmt.Errorf("failed to cleanup cephFS %q SubVolumeGroup %q in the namespace %q. %v", fsName, subVolumeGroupName, namespace, err)) + } + + return nil +} diff --git a/pkg/daemon/ceph/cleanup/subvolumegroups.go b/pkg/daemon/ceph/cleanup/subvolumegroups.go new file mode 100644 index 000000000000..6559a58c05f4 --- /dev/null +++ b/pkg/daemon/ceph/cleanup/subvolumegroups.go @@ -0,0 +1,143 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cleanup + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/rook/rook/pkg/clusterd" + "github.com/rook/rook/pkg/daemon/ceph/client" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" +) + +func SubVolumeGroupCleanup(context *clusterd.Context, clusterInfo *client.ClusterInfo, fsName, svg, poolName, csiNamespace string) error { + logger.Infof("starting clean up cephFS subVolumeGroup resource %q", svg) + + subVolumeList, err := cephclient.ListSubvolumesInGroup(context, clusterInfo, fsName, svg) + if err != nil { + return errors.Wrapf(err, "failed to list cephFS subVolumes in subVolumeGroup %q", svg) + } + + if len(subVolumeList) == 0 { + logger.Infof("no subvolumes found in cephFS subVolumeGroup %q", svg) + return nil + } + + var retErr error + for _, subVolume := range subVolumeList { + logger.Infof("starting clean up of subvolume %q", subVolume.Name) + err := CleanUpOMAPDetails(context, clusterInfo, subVolume.Name, poolName, csiNamespace) + if err != nil { + retErr = errors.Wrapf(err, "failed to clean up OMAP details for the subvolume %q.", subVolume.Name) + logger.Error(retErr) + } + subVolumeSnapshots, err := cephclient.ListSubVolumeSnapshots(context, clusterInfo, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to list snapshots for subvolume %q in group %q.", subVolume.Name, svg) + logger.Error(retErr) + } else { + err := CancelPendingClones(context, clusterInfo, subVolumeSnapshots, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to cancel pending clones for subvolume %q in group %q", subVolume.Name, svg) + logger.Error(retErr) + } + err = DeleteSubVolumeSnapshots(context, clusterInfo, subVolumeSnapshots, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to delete snapshots for subvolume %q in group %q", subVolume.Name, svg) + logger.Error(retErr) + } + } + err = cephclient.DeleteSubVolume(context, clusterInfo, fsName, subVolume.Name, svg) + if err != nil { + retErr = errors.Wrapf(err, "failed to delete subvolume group %q.", subVolume.Name) + logger.Error(retErr) + } + } + + if retErr != nil { + return errors.Wrapf(err, "clean up for cephFS subVolumeGroup %q didn't complete successfully.", svg) + } + logger.Infof("successfully cleaned up cephFS subVolumeGroup %q", svg) + return nil +} + +func CancelPendingClones(context *clusterd.Context, clusterInfo *client.ClusterInfo, snapshots cephclient.SubVolumeSnapshots, fsName, subvol, svg string) error { + for _, snapshot := range snapshots { + logger.Infof("deleting any pending clones of snapshot %q of subvolume %q of group %q", snapshot.Name, subvol, svg) + pendingClones, err := cephclient.ListSubVolumeSnapshotPendingClones(context, clusterInfo, fsName, subvol, snapshot.Name, svg) + if err != nil { + return errors.Wrapf(err, "failed to list all the pending clones for snapshot %q", snapshot.Name) + } + for _, pendingClone := range pendingClones.Clones { + err := cephclient.CancelSnapshotClone(context, clusterInfo, fsName, svg, pendingClone.Name) + if err != nil { + return errors.Wrapf(err, "failed to cancel the pending clone %q for snapshot %q", pendingClone.Name, snapshot.Name) + } + } + } + return nil +} + +func DeleteSubVolumeSnapshots(context *clusterd.Context, clusterInfo *client.ClusterInfo, snapshots cephclient.SubVolumeSnapshots, fsName, subvol, svg string) error { + for _, snapshot := range snapshots { + logger.Infof("deleting snapshot %q for subvolume %q in group %q", snapshot.Name, subvol, svg) + err := cephclient.DeleteSubvolumeSnapshot(context, clusterInfo, fsName, subvol, svg, snapshot.Name) + if err != nil { + return errors.Wrapf(err, "failed to delete snapshot %q for subvolume %q in group %q. %v", snapshot.Name, subvol, svg, err) + } + logger.Infof("successfully deleted snapshot %q for subvolume %q in group %q", snapshot.Name, subvol, svg) + } + return nil +} + +func CleanUpOMAPDetails(context *clusterd.Context, clusterInfo *client.ClusterInfo, objName, poolName, namespace string) error { + omapValue := getOMAPValue(objName) + if omapValue == "" { + return errors.New(fmt.Sprintf("failed to get OMAP value for object %q", objName)) + } + logger.Infof("OMAP value for the object %q is %q", objName, omapValue) + omapKey, err := cephclient.GetOMAPKey(context, clusterInfo, omapValue, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to get OMAP key for omapObj %q. %v", omapValue, err) + } + logger.Infof("OMAP key for the OIMAP value %q is %q", omapValue, omapKey) + + // delete OMAP details + err = cephclient.DeleteOmapValue(context, clusterInfo, omapValue, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to delete OMAP value %q. %v", omapValue, err) + } + if omapKey != "" { + err = cephclient.DeleteOmapKey(context, clusterInfo, omapKey, poolName, namespace) + if err != nil { + return errors.Wrapf(err, "failed to delete OMAP key %q. %v", omapKey, err) + } + } + return nil +} + +func getOMAPValue(subVol string) string { + splitSubvol := strings.SplitAfterN(subVol, "-", 3) + if len(splitSubvol) < 3 { + return "" + } + subvol_id := splitSubvol[len(splitSubvol)-1] + omapval := "csi.volume." + subvol_id + return omapval +} diff --git a/pkg/daemon/ceph/cleanup/subvolumegroups_test.go b/pkg/daemon/ceph/cleanup/subvolumegroups_test.go new file mode 100644 index 000000000000..151133298e5a --- /dev/null +++ b/pkg/daemon/ceph/cleanup/subvolumegroups_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cleanup + +import ( + "errors" + "testing" + "time" + + "github.com/rook/rook/pkg/clusterd" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + exectest "github.com/rook/rook/pkg/util/exec/test" + "github.com/stretchr/testify/assert" +) + +const ( + mockSubVolumeListResp = `[{"name":"csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89"}]` + mockGetOmapValResp = `Writing to /dev/stdout +pvc-e2907819-b005-4582-97ec-8fa1b277f46fsh` + mockSubvolumeSnapshotsResp = `[{"name":"snap0"}]` + mockSubvolumeSnapshotInfoResp = `{"created_at":"2024-04-0808:46:36.267888","data_pool":"myfs-replicated","has_pending_clones":"yes","pending_clones":[{"name":"clone3"}]}` +) + +func TestSubVolumeGroupCleanup(t *testing.T) { + clusterInfo := cephclient.AdminTestClusterInfo("mycluster") + fsName := "myfs" + subVolumeGroupName := "csi" + poolName := "myfs-metadata" + csiNamespace := "csi" + + t.Run("no subvolumes in subvolumegroup", func(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithTimeout = func(timeout time.Duration, command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "ls" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, subVolumeGroupName, args[4]) + return "[]", nil + } + return "", errors.New("unknown command") + } + context := &clusterd.Context{Executor: executor} + err := SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + assert.NoError(t, err) + }) + + t.Run("subvolumes with snapshots and pending clones", func(t *testing.T) { + executor := &exectest.MockExecutor{} + executor.MockExecuteCommandWithTimeout = func(timeout time.Duration, command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + // list all subvolumes in subvolumegroup + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "ls" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, subVolumeGroupName, args[4]) + return mockSubVolumeListResp, nil + } + if args[0] == "getomapval" { + assert.Equal(t, "csi.volume.9d942d45-6f48-4d9e-a707-edfcd602bd89", args[1]) + assert.Equal(t, "csi.volname", args[2]) + assert.Equal(t, poolName, args[4]) + assert.Equal(t, csiNamespace, args[6]) + return mockGetOmapValResp, nil + } + // delete OMAP value + if args[0] == "rm" && args[1] == "csi.volume.9d942d45-6f48-4d9e-a707-edfcd602bd89" { + assert.Equal(t, "-p", args[2]) + assert.Equal(t, poolName, args[3]) + assert.Equal(t, "--namespace", args[4]) + assert.Equal(t, csiNamespace, args[5]) + return "", nil + } + // delete OMAP key + if args[0] == "rmomapkey" && args[1] == "csi.volumes.default" { + assert.Equal(t, "ceph.volume.pvc-e2907819-b005-4582-97ec-8fa1b277f46fsh", args[2]) + assert.Equal(t, "-p", args[3]) + assert.Equal(t, poolName, args[4]) + assert.Equal(t, "--namespace", args[5]) + assert.Equal(t, csiNamespace, args[6]) + return "", nil + } + // list all snapshots in a subvolume + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "ls" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "--group_name", args[6]) + assert.Equal(t, subVolumeGroupName, args[7]) + return mockSubvolumeSnapshotsResp, nil + } + // list all pending clones in a subvolume snapshot + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "info" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "snap0", args[6]) + assert.Equal(t, "--group_name", args[7]) + assert.Equal(t, subVolumeGroupName, args[8]) + return mockSubvolumeSnapshotInfoResp, nil + } + // cancel pending clones + if args[0] == "fs" && args[1] == "clone" && args[2] == "cancel" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, "clone3", args[4]) + assert.Equal(t, "--group_name", args[5]) + assert.Equal(t, subVolumeGroupName, args[6]) + return "", nil + } + // delete snapshots + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "snapshot" && args[3] == "rm" { + assert.Equal(t, fsName, args[4]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[5]) + assert.Equal(t, "snap0", args[6]) + assert.Equal(t, "--group_name", args[7]) + assert.Equal(t, subVolumeGroupName, args[8]) + return "", nil + } + // delete subvolume + if args[0] == "fs" && args[1] == "subvolume" && args[2] == "rm" { + assert.Equal(t, fsName, args[3]) + assert.Equal(t, "csi-vol-9d942d45-6f48-4d9e-a707-edfcd602bd89", args[4]) + assert.Equal(t, subVolumeGroupName, args[5]) + assert.Equal(t, "--force", args[6]) + return "", nil + } + return "", errors.New("unknown command") + } + context := &clusterd.Context{Executor: executor} + err := SubVolumeGroupCleanup(context, clusterInfo, fsName, subVolumeGroupName, poolName, csiNamespace) + assert.NoError(t, err) + }) +} + +func TestGetOmapValue(t *testing.T) { + result := getOMAPValue("csi-vol-3a41b367-9566-4dbb-8884-39e1fa306ea7") + assert.Equal(t, "csi.volume.3a41b367-9566-4dbb-8884-39e1fa306ea7", result) + + result = getOMAPValue("invalidSubVolume") + assert.Equal(t, "", result) +} diff --git a/pkg/daemon/ceph/client/filesystem.go b/pkg/daemon/ceph/client/filesystem.go index f7785ff98ee5..a3a8c884593b 100644 --- a/pkg/daemon/ceph/client/filesystem.go +++ b/pkg/daemon/ceph/client/filesystem.go @@ -435,7 +435,7 @@ type SubvolumeList []Subvolume const NoSubvolumeGroup = "" // ListSubvolumesInGroup lists all subvolumes present in the given filesystem's subvolume group by -// name. If groupName is empty, list subvolumes that are not in any group. Times out after 5 seconds. +// name. If groupName is empty, list subvolumes that are not in any group. var ListSubvolumesInGroup = listSubvolumesInGroup // with above, allow this to be overridden for unit testing @@ -453,10 +453,61 @@ func listSubvolumesInGroup(context *clusterd.Context, clusterInfo *ClusterInfo, if err != nil { return svs, errors.Wrapf(err, "failed to list subvolumes in filesystem %q subvolume group %q", fsName, groupName) } - if err := json.Unmarshal(buf, &svs); err != nil { return svs, errors.Wrapf(err, "failed to unmarshal subvolume list for filesystem %q subvolume group %q", fsName, groupName) } + return svs, nil +} + +// SubVolumeSnapshot represents snapshot of a cephFS subvolume +type SubVolumeSnapshot struct { + Name string `json:"name"` +} + +// SubVolumeSnapshots is the list of snapshots in a CephFS subvolume +type SubVolumeSnapshots []SubVolumeSnapshot + +// ListSubVolumeSnaphots lists all the subvolume snapshots present in the subvolume in the given filesystem's subvolume group. +var ListSubVolumeSnapshots = listSubVolumeSnapshots + +func listSubVolumeSnapshots(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, subVolumeName, groupName string) (SubVolumeSnapshots, error) { + svs := SubVolumeSnapshots{} + args := []string{"fs", "subvolume", "snapshot", "ls", fsName, subVolumeName, "--group_name", groupName} + cmd := NewCephCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return svs, errors.Wrapf(err, "failed to list subvolumes in filesystem %q subvolume group %q", fsName, groupName) + } + + if err := json.Unmarshal(buf, &svs); err != nil { + return svs, errors.Wrapf(err, "failed to unmarshal snapshots for subvolume %q for filesystem %q subvolume group %q", subVolumeName, fsName, groupName) + } return svs, nil } + +// SubVolumeSnapshotPendingClones refers to all the pending clones available in a cephFS subvolume snapshot +type SubVolumeSnapshotPendingClones struct { + Clones []struct { + Name string `json:"name"` + } `json:"pending_clones"` +} + +var ListSubVolumeSnapshotPendingClones = listSubVolumeSnapshotPendingClones + +// listSubVolumeSnapshotPendingClones lists all the pending clones available in a cephFS subvolume snapshot +func listSubVolumeSnapshotPendingClones(context *clusterd.Context, clusterInfo *ClusterInfo, fsName, subVolumeName, snap, groupName string) (SubVolumeSnapshotPendingClones, error) { + pendingClones := SubVolumeSnapshotPendingClones{} + args := []string{"fs", "subvolume", "snapshot", "info", fsName, subVolumeName, snap, "--group_name", groupName} + cmd := NewCephCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return pendingClones, errors.Wrapf(err, "failed to list pending clones available for snapshot %q in filesystem %q in subvolume group %q", snap, fsName, groupName) + } + + if err := json.Unmarshal(buf, &pendingClones); err != nil { + return pendingClones, errors.Wrapf(err, "failed to unmarshal pending clones list for for snapshot %q in filesystem %q subvolume group %q", snap, fsName, groupName) + } + + return pendingClones, nil +} diff --git a/pkg/daemon/ceph/client/subvolumegroup.go b/pkg/daemon/ceph/client/subvolumegroup.go index 933fa4075551..671eb384a422 100644 --- a/pkg/daemon/ceph/client/subvolumegroup.go +++ b/pkg/daemon/ceph/client/subvolumegroup.go @@ -19,10 +19,12 @@ package client import ( "fmt" "strconv" + "strings" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" "github.com/rook/rook/pkg/clusterd" + "github.com/rook/rook/pkg/util/exec" "k8s.io/apimachinery/pkg/types" ) @@ -129,3 +131,81 @@ func validatePinningValues(pinning cephv1.CephFilesystemSubVolumeGroupSpecPinnin } return err } + +func GetOMAPKey(context *clusterd.Context, clusterInfo *ClusterInfo, omapObj, poolName, namespace string) (string, error) { + args := []string{"getomapval", omapObj, "csi.volname", "-p", poolName, "--namespace", namespace, "/dev/stdout"} + cmd := NewRadosCommand(context, clusterInfo, args) + buf, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return "", errors.Wrapf(err, "failed to list omapKeys for omapObj %q", omapObj) + } + + // Todo: is there a way to avoid this parsing? + respStr := string(buf) + var pvcName string + if len(respStr) != 0 { + resp := strings.Split(respStr, "\n") + if len(resp) == 2 { + pvcName = resp[1] + } + } + + if pvcName == "" { + return "", nil + } + + omapKey := fmt.Sprintf("ceph.volume.%s", pvcName) + return omapKey, nil +} + +func DeleteOmapValue(context *clusterd.Context, clusterInfo *ClusterInfo, omapValue, poolName, namespace string) error { + args := []string{"rm", omapValue, "-p", poolName, "--namespace", namespace} + cmd := NewRadosCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete omap value %q in pool %q", omapValue, poolName) + } + logger.Infof("successfully deleted omap value %q for pool %q", omapValue, poolName) + return nil +} + +func DeleteOmapKey(context *clusterd.Context, clusterInfo *ClusterInfo, omapKey, poolName, namespace string) error { + args := []string{"rmomapkey", "csi.volumes.default", omapKey, "-p", poolName, "--namespace", namespace} + cmd := NewRadosCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete omapKey %q in pool %q", omapKey, poolName) + } + logger.Infof("successfully deleted omap key %q for pool %q", omapKey, poolName) + return nil +} + +func DeleteSubVolume(context *clusterd.Context, clusterInfo *ClusterInfo, fs, subvol, svg string) error { + args := []string{"fs", "subvolume", "rm", fs, subvol, svg, "--force"} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete subvolume %q in filesystem %q", subvol, fs) + } + return nil +} + +func DeleteSubvolumeSnapshot(context *clusterd.Context, clusterInfo *ClusterInfo, fs, subvol, svg, snap string) error { + args := []string{"fs", "subvolume", "snapshot", "rm", fs, subvol, snap, "--group_name", svg} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to delete subvolume %q in filesystem %q", subvol, fs) + } + return nil +} + +func CancelSnapshotClone(context *clusterd.Context, clusterInfo *ClusterInfo, fs, svg, clone string) error { + args := []string{"fs", "clone", "cancel", fs, clone, "--group_name", svg} + cmd := NewCephCommand(context, clusterInfo, args) + _, err := cmd.RunWithTimeout(exec.CephCommandsTimeout) + if err != nil { + return errors.Wrapf(err, "failed to cancel clone %q in filesystem %q in group %q", clone, fs, svg) + } + return nil +} diff --git a/pkg/operator/ceph/cluster/cleanup.go b/pkg/operator/ceph/cluster/cleanup.go index 6c87353b09c4..92b2933591ab 100644 --- a/pkg/operator/ceph/cluster/cleanup.go +++ b/pkg/operator/ceph/cluster/cleanup.go @@ -138,7 +138,7 @@ func (c *ClusterController) cleanUpJobContainer(cluster *cephv1.CephCluster, mon SecurityContext: securityContext, VolumeMounts: volumeMounts, Env: envVars, - Args: []string{"ceph", "clean"}, + Args: []string{"ceph", "clean", "host"}, Resources: cephv1.GetCleanupResources(cluster.Spec.Resources), } } diff --git a/pkg/operator/ceph/controller/cleanup.go b/pkg/operator/ceph/controller/cleanup.go new file mode 100644 index 000000000000..72234271901f --- /dev/null +++ b/pkg/operator/ceph/controller/cleanup.go @@ -0,0 +1,144 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package controller + +import ( + "context" + "strings" + + "github.com/pkg/errors" + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/rook/rook/pkg/operator/k8sutil" + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + k8sClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + volumeName = "cleanup-volume" + dataDirHostPath = "ROOK_DATA_DIR_HOST_PATH" + CleanupAppName = "resource-cleanup" + RESOURCE_CLEANUP_ANNOTATION = "rook.io/force-deletion" + + // CephFSSubVolumeGroup env resources + CephFSSubVolumeGroupNameEnv = "SUB_VOLUME_GROUP_NAME" + CephFSNameEnv = "FILESYSTEM_NAME" + CSICephFSRadosNamesaceEnv = "CSI_CEPHFS_RADOS_NAMESPACE" + CephFSMetaDataPoolNameEnv = "METADATA_POOL_NAME" +) + +// ResourceCleanup defines an rook ceph resource to be cleaned up +type ResourceCleanup struct { + resource k8sClient.Object + cluster *cephv1.CephCluster + rookImage string + // config defines the attributes of the custom resource to passed in as environment variables in the clean up job + config map[string]string +} + +func NewResourceCleanup(obj k8sClient.Object, cluster *cephv1.CephCluster, rookImage string, config map[string]string) *ResourceCleanup { + return &ResourceCleanup{ + resource: obj, + rookImage: rookImage, + cluster: cluster, + config: config, + } +} + +// Start a new job to perform clean up of the ceph resources. It returns true if the cleanup job has succeeded +func (c *ResourceCleanup) StartJob(ctx context.Context, clientset kubernetes.Interface, jobName string) error { + podSpec := c.jobTemplateSpec() + job := &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: c.resource.GetNamespace(), + OwnerReferences: c.resource.GetOwnerReferences(), + }, + Spec: batch.JobSpec{ + Template: podSpec, + }, + } + + if err := k8sutil.RunReplaceableJob(ctx, clientset, job, false); err != nil { + return errors.Wrapf(err, "failed to run clean up job for %q resource named %q in namespace %q", + c.resource.GetObjectKind().GroupVersionKind().Kind, c.resource.GetName(), c.resource.GetNamespace()) + } + + return nil +} + +func (c *ResourceCleanup) jobContainer() v1.Container { + volumeMounts := []v1.VolumeMount{} + envVars := []v1.EnvVar{} + if c.cluster.Spec.DataDirHostPath != "" { + hostPathVolumeMount := v1.VolumeMount{Name: volumeName, MountPath: c.cluster.Spec.DataDirHostPath} + volumeMounts = append(volumeMounts, hostPathVolumeMount) + envVars = append(envVars, []v1.EnvVar{ + {Name: dataDirHostPath, Value: c.cluster.Spec.DataDirHostPath}, + {Name: "ROOK_LOG_LEVEL", Value: "DEBUG"}, + {Name: k8sutil.PodNamespaceEnvVar, Value: c.resource.GetNamespace()}, + }...) + } + // append all the resource attributes as env variables. + for k, v := range c.config { + envVars = append(envVars, v1.EnvVar{Name: k, Value: v}) + } + securityContext := PrivilegedContext(true) + return v1.Container{ + Name: "resource-cleanup", + Image: c.rookImage, + SecurityContext: securityContext, + VolumeMounts: volumeMounts, + Env: envVars, + Args: []string{"ceph", "clean", c.resource.GetObjectKind().GroupVersionKind().Kind}, + Resources: cephv1.GetCleanupResources(c.cluster.Spec.Resources), + } +} + +func (c *ResourceCleanup) jobTemplateSpec() v1.PodTemplateSpec { + volumes := []v1.Volume{} + hostPathVolume := v1.Volume{Name: volumeName, VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: c.cluster.Spec.DataDirHostPath}}} + volumes = append(volumes, hostPathVolume) + + podSpec := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: CleanupAppName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + c.jobContainer(), + }, + Volumes: volumes, + RestartPolicy: v1.RestartPolicyOnFailure, + PriorityClassName: cephv1.GetCleanupPriorityClassName(c.cluster.Spec.PriorityClassNames), + ServiceAccountName: k8sutil.DefaultServiceAccount, + }, + } + + return podSpec +} + +// ForceDeleteRequested returns true if `rook.io/force-deletion:true` annotation is available on the resource +func ForceDeleteRequested(annotations map[string]string) bool { + if value, found := annotations[RESOURCE_CLEANUP_ANNOTATION]; found { + if strings.EqualFold(value, "true") { + return true + } + } + return false +} diff --git a/pkg/operator/ceph/controller/cleanup_test.go b/pkg/operator/ceph/controller/cleanup_test.go new file mode 100644 index 000000000000..632977b71e12 --- /dev/null +++ b/pkg/operator/ceph/controller/cleanup_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestJobTemplateSpec(t *testing.T) { + expectedHostPath := "var/lib/rook" + expectedNamespace := "test-rook-ceph" + rookImage := "test" + cluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: expectedNamespace, + }, + Spec: cephv1.ClusterSpec{ + DataDirHostPath: expectedHostPath, + CleanupPolicy: cephv1.CleanupPolicySpec{ + Confirmation: "yes-really-destroy-data", + }, + }, + } + svgObj := &cephv1.CephFilesystemSubVolumeGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CephFSSubvolumeGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svg", + Namespace: expectedNamespace, + }, + } + testConfig := map[string]string{ + "config1": "value1", + "config2": "value2", + } + cleanup := NewResourceCleanup(svgObj, cluster, rookImage, testConfig) + podTemplateSpec := cleanup.jobTemplateSpec() + assert.Equal(t, "CephFSSubvolumeGroup", podTemplateSpec.Spec.Containers[0].Args[2]) + assert.Equal(t, "config1", podTemplateSpec.Spec.Containers[0].Env[3].Name) + assert.Equal(t, "value1", podTemplateSpec.Spec.Containers[0].Env[3].Value) + assert.Equal(t, "config2", podTemplateSpec.Spec.Containers[0].Env[4].Name) + assert.Equal(t, "value2", podTemplateSpec.Spec.Containers[0].Env[4].Value) +} + +func TestForceDeleteRequested(t *testing.T) { + svgObj := &cephv1.CephFilesystemSubVolumeGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "CephFSSubvolumeGroup", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svg", + Namespace: "test", + Annotations: map[string]string{}, + }, + } + + result := ForceDeleteRequested(svgObj.Annotations) + assert.False(t, result) + + svgObj.Annotations[RESOURCE_CLEANUP_ANNOTATION] = "true" + result = ForceDeleteRequested(svgObj.Annotations) + assert.True(t, result) +} diff --git a/pkg/operator/ceph/file/filesystem.go b/pkg/operator/ceph/file/filesystem.go index 1db7da060731..56cf9a0468c9 100644 --- a/pkg/operator/ceph/file/filesystem.go +++ b/pkg/operator/ceph/file/filesystem.go @@ -169,7 +169,7 @@ func newFS(name, namespace string) *Filesystem { func createOrUpdatePools(f *Filesystem, context *clusterd.Context, clusterInfo *cephclient.ClusterInfo, clusterSpec *cephv1.ClusterSpec, spec cephv1.FilesystemSpec) error { // generating the metadata pool's name metadataPool := cephv1.NamedPoolSpec{ - Name: generateMetaDataPoolName(f), + Name: GenerateMetaDataPoolName(f.Name), PoolSpec: spec.MetadataPool, } metadataPool.Application = cephfsApplication @@ -242,7 +242,7 @@ func (f *Filesystem) doFilesystemCreate(context *clusterd.Context, clusterInfo * spec.MetadataPool.Application = cephfsApplication metadataPool := cephv1.NamedPoolSpec{ - Name: generateMetaDataPoolName(f), + Name: GenerateMetaDataPoolName(f.Name), PoolSpec: spec.MetadataPool, } if _, poolFound := reversedPoolMap[metadataPool.Name]; !poolFound { @@ -307,7 +307,7 @@ func generateDataPoolNames(f *Filesystem, spec cephv1.FilesystemSpec) []string { return dataPoolNames } -// generateMetaDataPoolName generates MetaDataPool name by prefixing the filesystem name to the constant metaDataPoolSuffix -func generateMetaDataPoolName(f *Filesystem) string { - return fmt.Sprintf("%s-%s", f.Name, metaDataPoolSuffix) +// GenerateMetaDataPoolName generates MetaDataPool name by prefixing the filesystem name to the constant metaDataPoolSuffix +func GenerateMetaDataPoolName(fsName string) string { + return fmt.Sprintf("%s-%s", fsName, metaDataPoolSuffix) } diff --git a/pkg/operator/ceph/file/subvolumegroup/controller.go b/pkg/operator/ceph/file/subvolumegroup/controller.go index 0bf5db43f404..c77c16bafc9c 100644 --- a/pkg/operator/ceph/file/subvolumegroup/controller.go +++ b/pkg/operator/ceph/file/subvolumegroup/controller.go @@ -40,6 +40,7 @@ import ( "github.com/rook/rook/pkg/clusterd" opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/ceph/csi" + "github.com/rook/rook/pkg/operator/ceph/file" "github.com/rook/rook/pkg/operator/ceph/reporting" "github.com/rook/rook/pkg/operator/k8sutil" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -187,7 +188,7 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) reconcile(request reconcile.Requ if cephCluster.Spec.External.Enable { logger.Warningf("external subvolume group %q deletion is not supported, delete it manually", namespacedName) } else { - err := r.deleteSubVolumeGroup(cephFilesystemSubVolumeGroup) + err = r.deleteSubVolumeGroup(cephFilesystemSubVolumeGroup, &cephCluster) if err != nil { if strings.Contains(err.Error(), opcontroller.UninitializedCephConfigError) { logger.Info(opcontroller.OperatorNotInitializedMessage) @@ -326,7 +327,8 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) createOrUpdateSubVolumeGroup(cep } // Delete the ceph filesystem subvolume group -func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup) error { +func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup, + cephCluster *cephv1.CephCluster) error { namespacedName := fmt.Sprintf("%s/%s", cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Name) logger.Infof("deleting ceph filesystem subvolume group object %q", namespacedName) if err := cephclient.DeleteCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)); err != nil { @@ -339,7 +341,17 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) deleteSubVolumeGroup(cephFilesys // If the subvolume group has subvolumes the command will fail with: // Error ENOTEMPTY: error in rmdir /volumes/csi if ok && (code == int(syscall.ENOTEMPTY)) { - return errors.Wrapf(err, "failed to delete ceph filesystem subvolume group %q, remove the subvolumes first", cephFilesystemSubVolumeGroup.Name) + msg := fmt.Sprintf("failed to delete ceph filesystem subvolume group %q, remove the subvolumes first", cephFilesystemSubVolumeGroup.Name) + if opcontroller.ForceDeleteRequested(cephFilesystemSubVolumeGroup.GetAnnotations()) { + // cleanup cephFS subvolumes + cleanupErr := r.cleanup(cephFilesystemSubVolumeGroup, cephCluster) + if cleanupErr != nil { + return errors.Wrapf(cleanupErr, "failed to clean up all the ceph resources created by subVolumeGroup %q", namespacedName) + } + msg = fmt.Sprintf("failed to delete ceph filesystem subvolume group %q, started clean up job to delete the subvolumes", cephFilesystemSubVolumeGroup.Name) + } + + return errors.Wrapf(err, msg) } return errors.Wrapf(err, "failed to delete ceph filesystem subvolume group %q", cephFilesystemSubVolumeGroup.Name) @@ -380,3 +392,20 @@ func buildClusterID(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolume clusterID := fmt.Sprintf("%s-%s-file-%s", cephFilesystemSubVolumeGroup.Namespace, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)) return k8sutil.Hash(clusterID) } + +func (r *ReconcileCephFilesystemSubVolumeGroup) cleanup(svg *cephv1.CephFilesystemSubVolumeGroup, cephCluster *cephv1.CephCluster) error { + logger.Infof("starting cleanup of the ceph resources for subVolumeGroup %q in namespace %q", svg.Name, svg.Namespace) + cleanupConfig := map[string]string{ + opcontroller.CephFSSubVolumeGroupNameEnv: svg.Spec.Name, + opcontroller.CephFSNameEnv: svg.Spec.FilesystemName, + opcontroller.CSICephFSRadosNamesaceEnv: "csi", + opcontroller.CephFSMetaDataPoolNameEnv: file.GenerateMetaDataPoolName(svg.Spec.FilesystemName), + } + cleanup := opcontroller.NewResourceCleanup(svg, cephCluster, r.opConfig.Image, cleanupConfig) + jobName := k8sutil.TruncateNodeNameForJob("cleanup-svg-%s", fmt.Sprintf("%s-%s", svg.Spec.FilesystemName, svg.Name)) + err := cleanup.StartJob(r.clusterInfo.Context, r.context.Clientset, jobName) + if err != nil { + return errors.Wrapf(err, "failed to run clean up job to clean the ceph resources in cephFS subVolumeGroup %q", svg.Name) + } + return nil +} From f57a8b6bbe8211bb1bfba7e55653acc5c6bebdc9 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Fri, 5 Apr 2024 11:49:26 +0200 Subject: [PATCH 2/5] subvolumegroup: add support for size and datapool cephfs subvolumegroup supports creating svg with quota and the datapool, This PR adds the support for the same. Signed-off-by: Madhu Rajanna --- .../ceph-fs-subvolumegroup-crd.md | 10 ++- Documentation/CRDs/specification.md | 48 ++++++++++++ .../charts/rook-ceph/templates/resources.yaml | 10 +++ deploy/examples/crds.yaml | 10 +++ deploy/examples/subvolumegroup.yaml | 4 + pkg/apis/ceph.rook.io/v1/types.go | 6 ++ pkg/daemon/ceph/client/subvolumegroup.go | 78 ++++++++++++++++++- pkg/operator/ceph/file/filesystem.go | 2 +- pkg/operator/ceph/file/filesystem_test.go | 2 + .../ceph/file/subvolumegroup/controller.go | 2 +- tests/framework/installer/ceph_manifests.go | 4 +- 11 files changed, 168 insertions(+), 8 deletions(-) diff --git a/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md b/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md index 8f8941c4f49b..4016ee495517 100644 --- a/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md +++ b/Documentation/CRDs/Shared-Filesystem/ceph-fs-subvolumegroup-crd.md @@ -32,6 +32,10 @@ spec: distributed: 1 # distributed=<0, 1> (disabled=0) # export: # export=<0-256> (disabled=-1) # random: # random=[0.0, 1.0](disabled=0.0) + # Quota size of the subvolume group. + #quota: 10G + # data pool name for the subvolume group layout instead of the default data pool. + #dataPoolName: myfs-replicated ``` ## Settings @@ -48,7 +52,11 @@ If any setting is unspecified, a suitable default will be used automatically. * `filesystemName`: The metadata name of the CephFilesystem CR where the subvolume group will be created. -* `pinning`: To distribute load across MDS ranks in predictable and stable ways. Reference: https://docs.ceph.com/en/latest/cephfs/fs-volumes/#pinning-subvolumes-and-subvolume-groups. +* `quota`: Quota size of the Ceph Filesystem subvolume group. + +* `dataPoolName`: The data pool name for the subvolume group layout instead of the default data pool. + +* `pinning`: To distribute load across MDS ranks in predictable and stable ways. See the Ceph doc for [Pinning subvolume groups](https://docs.ceph.com/en/latest/cephfs/fs-volumes/#pinning-subvolumes-and-subvolume-groups). * `distributed`: Range: <0, 1>, for disabling it set to 0 * `export`: Range: <0-256>, for disabling it set to -1 * `random`: Range: [0.0, 1.0], for disabling it set to 0.0 diff --git a/Documentation/CRDs/specification.md b/Documentation/CRDs/specification.md index b84be542f70d..47743d26044b 100644 --- a/Documentation/CRDs/specification.md +++ b/Documentation/CRDs/specification.md @@ -1562,6 +1562,30 @@ reference + + +quota
+ +k8s.io/apimachinery/pkg/api/resource.Quantity + + + +(Optional) +

Quota size of the Ceph Filesystem subvolume group.

+ + + + +dataPoolName
+ +string + + + +(Optional) +

The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired.

+ +

CephFilesystemSubVolumeGroupSpecPinning diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index d02d81c0d8bd..385b0abf492b 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -7977,6 +7977,9 @@ spec: spec: description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup properties: + dataPoolName: + description: The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + type: string filesystemName: description: |- FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of @@ -8018,6 +8021,13 @@ spec: x-kubernetes-validations: - message: only one pinning type should be set rule: (has(self.export) && !has(self.distributed) && !has(self.random)) || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + description: Quota size of the Ceph Filesystem subvolume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index 942b083342ef..fc868138cd8e 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -7971,6 +7971,9 @@ spec: spec: description: Spec represents the specification of a Ceph Filesystem SubVolumeGroup properties: + dataPoolName: + description: The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + type: string filesystemName: description: |- FilesystemName is the name of Ceph Filesystem SubVolumeGroup volume name. Typically it's the name of @@ -8012,6 +8015,13 @@ spec: x-kubernetes-validations: - message: only one pinning type should be set rule: (has(self.export) && !has(self.distributed) && !has(self.random)) || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + description: Quota size of the Ceph Filesystem subvolume group. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object diff --git a/deploy/examples/subvolumegroup.yaml b/deploy/examples/subvolumegroup.yaml index 629cbe4b8a76..50345142a222 100644 --- a/deploy/examples/subvolumegroup.yaml +++ b/deploy/examples/subvolumegroup.yaml @@ -17,3 +17,7 @@ spec: distributed: 1 # distributed=<0, 1> (disabled=0) # export: # export=<0-256> (disabled=-1) # random: # random=[0.0, 1.0](disabled=0.0) + # Quota size of the subvolume group. + #quota: 10G + # data pool name for the subvolume group layout instead of the default data pool. + #dataPoolName: myfs-replicated diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index c7995cfc5cde..7923a18fd915 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -3016,6 +3016,12 @@ type CephFilesystemSubVolumeGroupSpec struct { // only one out of (export, distributed, random) can be set at a time // +optional Pinning CephFilesystemSubVolumeGroupSpecPinning `json:"pinning,omitempty"` + // Quota size of the Ceph Filesystem subvolume group. + // +optional + Quota *resource.Quantity `json:"quota,omitempty"` + // The data pool name for the Ceph Filesystem subvolume group layout, if the default CephFS pool is not desired. + // +optional + DataPoolName string `json:"dataPoolName"` } // CephFilesystemSubVolumeGroupSpecPinning represents the pinning configuration of SubVolumeGroup diff --git a/pkg/daemon/ceph/client/subvolumegroup.go b/pkg/daemon/ceph/client/subvolumegroup.go index 671eb384a422..bfe991a378ba 100644 --- a/pkg/daemon/ceph/client/subvolumegroup.go +++ b/pkg/daemon/ceph/client/subvolumegroup.go @@ -17,9 +17,11 @@ limitations under the License. package client import ( + "encoding/json" "fmt" "strconv" "strings" + "syscall" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" @@ -30,21 +32,89 @@ import ( // CreateCephFSSubVolumeGroup create a CephFS subvolume group. // volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. -func CreateCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) error { +func CreateCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string, svgSpec *cephv1.CephFilesystemSubVolumeGroupSpec) error { logger.Infof("creating cephfs %q subvolume group %q", volName, groupName) - // [--pool_layout ] [--uid ] [--gid ] [--mode ] + // [] [--pool_layout ] [--uid ] [--gid ] [--mode ] args := []string{"fs", "subvolumegroup", "create", volName, groupName} + if svgSpec != nil { + if svgSpec.Quota != nil { + // convert the size to bytes as ceph expect the size in bytes + args = append(args, fmt.Sprintf("--size=%d", svgSpec.Quota.Value())) + } + if svgSpec.DataPoolName != "" { + args = append(args, fmt.Sprintf("--pool_layout=%s", svgSpec.DataPoolName)) + } + } + + svgInfo, err := getCephFSSubVolumeGroupInfo(context, clusterInfo, volName, groupName) + if err != nil { + // return error other than not found. + if code, ok := exec.ExitStatus(err); ok && code != int(syscall.ENOENT) { + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q", groupName, volName) + } + } + + // if the subvolumegroup exists, resize the subvolumegroup + if err == nil && svgSpec != nil && svgSpec.Quota != nil && svgSpec.Quota.CmpInt64(svgInfo.BytesQuota) != 0 { + err = resizeCephFSSubVolumeGroup(context, clusterInfo, volName, groupName, svgSpec) + if err != nil { + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q", groupName, volName) + } + } + cmd := NewCephCommand(context, clusterInfo, args) cmd.JsonOutput = false output, err := cmd.Run() if err != nil { - return errors.Wrapf(err, "failed to create subvolume group %q. %s", volName, output) + return errors.Wrapf(err, "failed to create subvolume group %q in filesystem %q. %s", groupName, volName, output) } - logger.Infof("successfully created cephfs %q subvolume group %q", volName, groupName) + logger.Infof("successfully created subvolume group %q in filesystem %q", groupName, volName) return nil } +// resizeCephFSSubVolumeGroup resize a CephFS subvolume group. +// volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. +func resizeCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string, svgSpec *cephv1.CephFilesystemSubVolumeGroupSpec) error { + logger.Infof("resizing cephfs %q subvolume group %q", volName, groupName) + // [--no-shrink] + args := []string{"fs", "subvolumegroup", "resize", volName, groupName, "--no-shrink", fmt.Sprintf("%d", svgSpec.Quota.Value())} + cmd := NewCephCommand(context, clusterInfo, args) + cmd.JsonOutput = false + output, err := cmd.Run() + if err != nil { + return errors.Wrapf(err, "failed to resize subvolume group %q in filesystem %q. %s", groupName, volName, output) + } + + logger.Infof("successfully resized subvolume group %q in filesystem %q to %s", groupName, volName, svgSpec.Quota) + return nil +} + +type subvolumeGroupInfo struct { + BytesQuota int64 `json:"bytes_quota"` + BytesUsed int64 `json:"bytes_used"` + DataPool string `json:"data_pool"` +} + +// getCephFSSubVolumeGroupInfo get subvolumegroup info of the group name. +// volName is the name of the Ceph FS volume, the same as the CephFilesystem CR name. +func getCephFSSubVolumeGroupInfo(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) (*subvolumeGroupInfo, error) { + args := []string{"fs", "subvolumegroup", "info", volName, groupName} + cmd := NewCephCommand(context, clusterInfo, args) + cmd.JsonOutput = true + output, err := cmd.Run() + if err != nil { + return nil, errors.Wrapf(err, "failed to get subvolume group %q in filesystem %q. %s", groupName, volName, output) + } + + svgInfo := subvolumeGroupInfo{} + err = json.Unmarshal(output, &svgInfo) + if err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal into subvolumeGroupInfo") + } + return &svgInfo, nil +} + // DeleteCephFSSubVolumeGroup delete a CephFS subvolume group. func DeleteCephFSSubVolumeGroup(context *clusterd.Context, clusterInfo *ClusterInfo, volName, groupName string) error { logger.Infof("deleting cephfs %q subvolume group %q", volName, groupName) diff --git a/pkg/operator/ceph/file/filesystem.go b/pkg/operator/ceph/file/filesystem.go index 56cf9a0468c9..898875c6b7e6 100644 --- a/pkg/operator/ceph/file/filesystem.go +++ b/pkg/operator/ceph/file/filesystem.go @@ -74,7 +74,7 @@ func createFilesystem( } } - err := cephclient.CreateCephFSSubVolumeGroup(context, clusterInfo, fs.Name, defaultCSISubvolumeGroup) + err := cephclient.CreateCephFSSubVolumeGroup(context, clusterInfo, fs.Name, defaultCSISubvolumeGroup, nil) if err != nil { return errors.Wrapf(err, "failed to create subvolume group %q", defaultCSISubvolumeGroup) } diff --git a/pkg/operator/ceph/file/filesystem_test.go b/pkg/operator/ceph/file/filesystem_test.go index 637bc4563069..ffe8aaf5dccd 100644 --- a/pkg/operator/ceph/file/filesystem_test.go +++ b/pkg/operator/ceph/file/filesystem_test.go @@ -233,6 +233,8 @@ func fsExecutor(t *testing.T, fsName, configDir string, multiFS bool, createData return `{"standbys":[], "filesystems":[]}`, nil } else if reflect.DeepEqual(args[0:5], []string{"fs", "subvolumegroup", "create", fsName, defaultCSISubvolumeGroup}) { return "", nil + } else if reflect.DeepEqual(args[0:5], []string{"fs", "subvolumegroup", "info", fsName, defaultCSISubvolumeGroup}) { + return "", nil } else if contains(args, "osd") && contains(args, "lspools") { return "[]", nil } else if contains(args, "mds") && contains(args, "fail") { diff --git a/pkg/operator/ceph/file/subvolumegroup/controller.go b/pkg/operator/ceph/file/subvolumegroup/controller.go index c77c16bafc9c..a291a8e040eb 100644 --- a/pkg/operator/ceph/file/subvolumegroup/controller.go +++ b/pkg/operator/ceph/file/subvolumegroup/controller.go @@ -318,7 +318,7 @@ func (r *ReconcileCephFilesystemSubVolumeGroup) updateClusterConfig(cephFilesyst func (r *ReconcileCephFilesystemSubVolumeGroup) createOrUpdateSubVolumeGroup(cephFilesystemSubVolumeGroup *cephv1.CephFilesystemSubVolumeGroup) error { logger.Infof("creating ceph filesystem subvolume group %s in namespace %s", cephFilesystemSubVolumeGroup.Name, cephFilesystemSubVolumeGroup.Namespace) - err := cephclient.CreateCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup)) + err := cephclient.CreateCephFSSubVolumeGroup(r.context, r.clusterInfo, cephFilesystemSubVolumeGroup.Spec.FilesystemName, getSubvolumeGroupName(cephFilesystemSubVolumeGroup), &cephFilesystemSubVolumeGroup.Spec) if err != nil { return errors.Wrapf(err, "failed to create ceph filesystem subvolume group %q", cephFilesystemSubVolumeGroup.Name) } diff --git a/tests/framework/installer/ceph_manifests.go b/tests/framework/installer/ceph_manifests.go index 471d79e763ac..bb8fb0175ae2 100644 --- a/tests/framework/installer/ceph_manifests.go +++ b/tests/framework/installer/ceph_manifests.go @@ -652,7 +652,9 @@ metadata: name: ` + groupName + ` namespace: ` + m.settings.Namespace + ` spec: - filesystemName: ` + fsName + filesystemName: ` + fsName + ` + quota: 10G + dataPoolName: ` + fsName + "-data0" } func (m *CephManifestsMaster) GetCOSIDriver() string { From 6d4afe23e5e6ee1ed70b505ab6863150d9266af8 Mon Sep 17 00:00:00 2001 From: mmaoyu Date: Thu, 28 Mar 2024 18:45:04 +0800 Subject: [PATCH 3/5] osd: add option upgradeOSDRequiresHealthyPGs For check if cluster PGs are healthy before osd updates Which helps gainning more confidence in the osd upgrades Signed-off-by: mmaoyu --- .../CRDs/Cluster/ceph-cluster-crd.md | 1 + Documentation/CRDs/specification.md | 28 ++++++++++++ deploy/charts/rook-ceph-cluster/values.yaml | 5 +++ .../charts/rook-ceph/templates/resources.yaml | 6 +++ deploy/examples/cluster.yaml | 4 ++ deploy/examples/crds.yaml | 6 +++ pkg/apis/ceph.rook.io/v1/types.go | 6 +++ pkg/operator/ceph/cluster/osd/update.go | 13 ++++++ pkg/operator/ceph/cluster/osd/update_test.go | 43 +++++++++++++++++++ 9 files changed, 112 insertions(+) diff --git a/Documentation/CRDs/Cluster/ceph-cluster-crd.md b/Documentation/CRDs/Cluster/ceph-cluster-crd.md index ae3c2deed90a..206696dafdd0 100755 --- a/Documentation/CRDs/Cluster/ceph-cluster-crd.md +++ b/Documentation/CRDs/Cluster/ceph-cluster-crd.md @@ -38,6 +38,7 @@ Settings can be specified at the global level to apply to the cluster as a whole If this value is empty, each pod will get an ephemeral directory to store their config files that is tied to the lifetime of the pod running on that node. More details can be found in the Kubernetes [empty dir docs](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir). * `skipUpgradeChecks`: if set to true Rook won't perform any upgrade checks on Ceph daemons during an upgrade. Use this at **YOUR OWN RISK**, only if you know what you're doing. To understand Rook's upgrade process of Ceph, read the [upgrade doc](../../Upgrade/rook-upgrade.md#ceph-version-upgrades). * `continueUpgradeAfterChecksEvenIfNotHealthy`: if set to true Rook will continue the OSD daemon upgrade process even if the PGs are not clean, or continue with the MDS upgrade even the file system is not healthy. +* `upgradeOSDRequiresHealthyPGs`: if set to true OSD upgrade process won't start until PGs are healthy. * `dashboard`: Settings for the Ceph dashboard. To view the dashboard in your browser see the [dashboard guide](../../Storage-Configuration/Monitoring/ceph-dashboard.md). * `enabled`: Whether to enable the dashboard to view cluster status * `urlPrefix`: Allows to serve the dashboard under a subpath (useful when you are accessing the dashboard via a reverse proxy) diff --git a/Documentation/CRDs/specification.md b/Documentation/CRDs/specification.md index 47743d26044b..64197c517dcf 100644 --- a/Documentation/CRDs/specification.md +++ b/Documentation/CRDs/specification.md @@ -945,6 +945,20 @@ The default wait timeout is 10 minutes.

+upgradeOSDRequiresHealthyPGs
+ +bool + + + +(Optional) +

UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to true OSD upgrade process won’t start until PGs are healthy. +This configuration will be ignored if skipUpgradeChecks is true. +Default is false.

+ + + + disruptionManagement
@@ -4418,6 +4432,20 @@ The default wait timeout is 10 minutes.

+upgradeOSDRequiresHealthyPGs
+ +bool + + + +(Optional) +

UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to true OSD upgrade process won’t start until PGs are healthy. +This configuration will be ignored if skipUpgradeChecks is true. +Default is false.

+ + + + disruptionManagement
diff --git a/deploy/charts/rook-ceph-cluster/values.yaml b/deploy/charts/rook-ceph-cluster/values.yaml index ed0ce51daaa5..6d04631afcec 100644 --- a/deploy/charts/rook-ceph-cluster/values.yaml +++ b/deploy/charts/rook-ceph-cluster/values.yaml @@ -121,6 +121,11 @@ cephClusterSpec: # The default wait timeout is 10 minutes. waitTimeoutForHealthyOSDInMinutes: 10 + # Whether or not requires PGs are clean before an OSD upgrade. If set to `true` OSD upgrade process won't start until PGs are healthy. + # This configuration will be ignored if `skipUpgradeChecks` is `true`. + # Default is false. + upgradeOSDRequiresHealthyPGs: false + mon: # Set the number of mons to be started. Generally recommended to be 3. # For highest availability, an odd number of mons should be specified. diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index 385b0abf492b..f61b5f9ad121 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -5004,6 +5004,12 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + description: |- + UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + This configuration will be ignored if `skipUpgradeChecks` is `true`. + Default is false. + type: boolean waitTimeoutForHealthyOSDInMinutes: description: |- WaitTimeoutForHealthyOSDInMinutes defines the time the operator would wait before an OSD can be stopped for upgrade or restart. diff --git a/deploy/examples/cluster.yaml b/deploy/examples/cluster.yaml index 6292cb3037cf..1c0c68434075 100644 --- a/deploy/examples/cluster.yaml +++ b/deploy/examples/cluster.yaml @@ -43,6 +43,10 @@ spec: # continue with the upgrade of an OSD even if its not ok to stop after the timeout. This timeout won't be applied if `skipUpgradeChecks` is `true`. # The default wait timeout is 10 minutes. waitTimeoutForHealthyOSDInMinutes: 10 + # Whether or not requires PGs are clean before an OSD upgrade. If set to `true` OSD upgrade process won't start until PGs are healthy. + # This configuration will be ignored if `skipUpgradeChecks` is `true`. + # Default is false. + upgradeOSDRequiresHealthyPGs: false mon: # Set the number of mons to be started. Generally recommended to be 3. # For highest availability, an odd number of mons should be specified. diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index fc868138cd8e..511facee7854 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -5002,6 +5002,12 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + description: |- + UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + This configuration will be ignored if `skipUpgradeChecks` is `true`. + Default is false. + type: boolean waitTimeoutForHealthyOSDInMinutes: description: |- WaitTimeoutForHealthyOSDInMinutes defines the time the operator would wait before an OSD can be stopped for upgrade or restart. diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index 7923a18fd915..9babbeeca60e 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -162,6 +162,12 @@ type ClusterSpec struct { // +optional WaitTimeoutForHealthyOSDInMinutes time.Duration `json:"waitTimeoutForHealthyOSDInMinutes,omitempty"` + // UpgradeOSDRequiresHealthyPGs defines if OSD upgrade requires PGs are clean. If set to `true` OSD upgrade process won't start until PGs are healthy. + // This configuration will be ignored if `skipUpgradeChecks` is `true`. + // Default is false. + // +optional + UpgradeOSDRequiresHealthyPGs bool `json:"upgradeOSDRequiresHealthyPGs,omitempty"` + // A spec for configuring disruption management. // +nullable // +optional diff --git a/pkg/operator/ceph/cluster/osd/update.go b/pkg/operator/ceph/cluster/osd/update.go index 06b78a465bdd..2bfcbb87afda 100644 --- a/pkg/operator/ceph/cluster/osd/update.go +++ b/pkg/operator/ceph/cluster/osd/update.go @@ -79,6 +79,19 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) { if c.doneUpdating() { return // no more OSDs to update } + if !c.cluster.spec.SkipUpgradeChecks && c.cluster.spec.UpgradeOSDRequiresHealthyPGs { + pgHealthMsg, pgClean, err := cephclient.IsClusterClean(c.cluster.context, c.cluster.clusterInfo, c.cluster.spec.DisruptionManagement.PGHealthyRegex) + if err != nil { + logger.Warningf("failed to check PGs status to update OSDs, will try updating it again later. %v", err) + return + } + if !pgClean { + logger.Infof("PGs are not healthy to update OSDs, will try updating it again later. PGs status: %q", pgHealthMsg) + return + } + logger.Infof("PGs are healthy to proceed updating OSDs. %v", pgHealthMsg) + } + osdIDQuery, _ := c.queue.Pop() var osdIDs []int diff --git a/pkg/operator/ceph/cluster/osd/update_test.go b/pkg/operator/ceph/cluster/osd/update_test.go index 61ca4425b5be..4c7bfc786b41 100644 --- a/pkg/operator/ceph/cluster/osd/update_test.go +++ b/pkg/operator/ceph/cluster/osd/update_test.go @@ -75,6 +75,8 @@ func Test_updateExistingOSDs(t *testing.T) { updateInjectFailures k8sutil.Failures // return failures from mocked updateDeploymentAndWaitFunc returnOkToStopIDs []int // return these IDs are ok-to-stop (or not ok to stop if empty) forceUpgradeIfUnhealthy bool + requiresHealthyPGs bool + cephStatus string ) // intermediates (created from inputs) @@ -108,6 +110,7 @@ func Test_updateExistingOSDs(t *testing.T) { clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) spec := cephv1.ClusterSpec{ ContinueUpgradeAfterChecksEvenIfNotHealthy: forceUpgradeIfUnhealthy, + UpgradeOSDRequiresHealthyPGs: requiresHealthyPGs, } c = New(ctx, clusterInfo, spec, "rook/rook:master") config := c.newProvisionConfig() @@ -164,6 +167,9 @@ func Test_updateExistingOSDs(t *testing.T) { return cephclientfake.OSDDeviceClassOutput(args[3]), nil } } + if args[0] == "status" { + return cephStatus, nil + } panic(fmt.Sprintf("unexpected command %q with args %v", command, args)) }, } @@ -361,6 +367,43 @@ func Test_updateExistingOSDs(t *testing.T) { assert.Equal(t, 0, updateQueue.Len()) // the OSD should now have been removed from the queue }) + t.Run("PGs not clean to upgrade OSD", func(t *testing.T) { + clientset = fake.NewSimpleClientset() + updateQueue = newUpdateQueueWithIDs(2) + existingDeployments = newExistenceListWithIDs(2) + requiresHealthyPGs = true + cephStatus = unHealthyCephStatus + updateInjectFailures = k8sutil.Failures{} + doSetup() + + osdToBeQueried = 2 + updateConfig.updateExistingOSDs(errs) + assert.Zero(t, errs.len()) + assert.ElementsMatch(t, deploymentsUpdated, []string{}) + assert.Equal(t, 1, updateQueue.Len()) // the OSD should remain + + }) + + t.Run("PGs clean to upgrade OSD", func(t *testing.T) { + clientset = fake.NewSimpleClientset() + updateQueue = newUpdateQueueWithIDs(0) + existingDeployments = newExistenceListWithIDs(0) + requiresHealthyPGs = true + cephStatus = healthyCephStatus + forceUpgradeIfUnhealthy = true // FORCE UPDATES + updateInjectFailures = k8sutil.Failures{} + doSetup() + addDeploymentOnNode("node0", 0) + + osdToBeQueried = 0 + returnOkToStopIDs = []int{0} + updateConfig.updateExistingOSDs(errs) + assert.Zero(t, errs.len()) + assert.ElementsMatch(t, deploymentsUpdated, []string{deploymentName(0)}) + assert.Equal(t, 0, updateQueue.Len()) // should be done with updates + + }) + t.Run("continueUpgradesAfterChecksEvenIfUnhealthy = true", func(t *testing.T) { clientset = fake.NewSimpleClientset() updateQueue = newUpdateQueueWithIDs(2) From 309b164c496cd4aaa29890220a5acd841a0969a8 Mon Sep 17 00:00:00 2001 From: Travis Nielsen Date: Thu, 11 Apr 2024 11:06:29 -0600 Subject: [PATCH 4/5] test: reduce wait time for mds standby test The mds standby test was timing out after six seconds on three different attempts, causing a single unit test to take almost 20 seconds. Now we reduce the wait time to a total of three seconds. Signed-off-by: Travis Nielsen --- pkg/daemon/ceph/client/filesystem.go | 4 ++-- pkg/daemon/ceph/client/filesystem_test.go | 7 +++---- pkg/operator/ceph/file/mds/mds.go | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/daemon/ceph/client/filesystem.go b/pkg/daemon/ceph/client/filesystem.go index a3a8c884593b..048f7ce1bd0d 100644 --- a/pkg/daemon/ceph/client/filesystem.go +++ b/pkg/daemon/ceph/client/filesystem.go @@ -362,8 +362,8 @@ func deleteFSPool(context *clusterd.Context, clusterInfo *ClusterInfo, poolNames } // WaitForNoStandbys waits for all standbys go away -func WaitForNoStandbys(context *clusterd.Context, clusterInfo *ClusterInfo, timeout time.Duration) error { - err := wait.PollUntilContextTimeout(clusterInfo.Context, 3*time.Second, timeout, true, func(ctx ctx.Context) (bool, error) { +func WaitForNoStandbys(context *clusterd.Context, clusterInfo *ClusterInfo, retryInterval, timeout time.Duration) error { + err := wait.PollUntilContextTimeout(clusterInfo.Context, retryInterval, timeout, true, func(ctx ctx.Context) (bool, error) { mdsDump, err := GetMDSDump(context, clusterInfo) if err != nil { logger.Errorf("failed to get fs dump. %v", err) diff --git a/pkg/daemon/ceph/client/filesystem_test.go b/pkg/daemon/ceph/client/filesystem_test.go index 5df458464330..e23a8412891a 100644 --- a/pkg/daemon/ceph/client/filesystem_test.go +++ b/pkg/daemon/ceph/client/filesystem_test.go @@ -542,7 +542,7 @@ func TestWaitForNoStandbys(t *testing.T) { return "", errors.Errorf("unexpected ceph command %q", args) } - err := WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err := WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.Error(t, err) executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) { @@ -555,7 +555,7 @@ func TestWaitForNoStandbys(t *testing.T) { return "", errors.Errorf("unexpected ceph command %q", args) } - err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.Error(t, err) firstCall := true @@ -582,9 +582,8 @@ func TestWaitForNoStandbys(t *testing.T) { } return "", errors.Errorf("unexpected ceph command %q", args) } - err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), 6*time.Second) + err = WaitForNoStandbys(context, AdminTestClusterInfo("mycluster"), time.Millisecond, 5*time.Millisecond) assert.NoError(t, err) - } func TestListSubvolumeGroups(t *testing.T) { diff --git a/pkg/operator/ceph/file/mds/mds.go b/pkg/operator/ceph/file/mds/mds.go index 776a730a5bd4..a315101777d3 100644 --- a/pkg/operator/ceph/file/mds/mds.go +++ b/pkg/operator/ceph/file/mds/mds.go @@ -290,7 +290,7 @@ func (c *Cluster) upgradeMDS() error { return errors.Wrap(err, "failed to scale down deployments during upgrade") } logger.Debugf("waiting for all standbys gone") - if err := cephclient.WaitForNoStandbys(c.context, c.clusterInfo, 120*time.Second); err != nil { + if err := cephclient.WaitForNoStandbys(c.context, c.clusterInfo, 3*time.Second, 120*time.Second); err != nil { return errors.Wrap(err, "failed to wait for stopping all standbys") } From 5d2c92b6731c0b0642259de6e873f3e7d18d1bac Mon Sep 17 00:00:00 2001 From: Ceph Jenkins Date: Fri, 12 Apr 2024 04:02:38 -0400 Subject: [PATCH 5/5] csv: add additional csv changes that other commits bring add generated csv changes Signed-off-by: Ceph Jenkins --- build/csv/ceph/ceph.rook.io_cephclusters.yaml | 2 ++ .../ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/build/csv/ceph/ceph.rook.io_cephclusters.yaml b/build/csv/ceph/ceph.rook.io_cephclusters.yaml index 5ae53cab3430..9e69a9015d68 100644 --- a/build/csv/ceph/ceph.rook.io_cephclusters.yaml +++ b/build/csv/ceph/ceph.rook.io_cephclusters.yaml @@ -2951,6 +2951,8 @@ spec: type: object type: array type: object + upgradeOSDRequiresHealthyPGs: + type: boolean waitTimeoutForHealthyOSDInMinutes: format: int64 type: integer diff --git a/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml b/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml index 3039e73b4e9b..497e168a8d92 100644 --- a/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml +++ b/build/csv/ceph/ceph.rook.io_cephfilesystemsubvolumegroups.yaml @@ -33,6 +33,8 @@ spec: type: object spec: properties: + dataPoolName: + type: string filesystemName: type: string x-kubernetes-validations: @@ -67,6 +69,12 @@ spec: || (!has(self.export) && has(self.distributed) && !has(self.random)) || (!has(self.export) && !has(self.distributed) && has(self.random)) || (!has(self.export) && !has(self.distributed) && !has(self.random)) + quota: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true required: - filesystemName type: object