diff --git a/Documentation/Storage-Configuration/Object-Storage-RGW/ceph-object-bucket-claim.md b/Documentation/Storage-Configuration/Object-Storage-RGW/ceph-object-bucket-claim.md index 66de3d99ae93..284b1dba3b0f 100644 --- a/Documentation/Storage-Configuration/Object-Storage-RGW/ceph-object-bucket-claim.md +++ b/Documentation/Storage-Configuration/Object-Storage-RGW/ceph-object-bucket-claim.md @@ -75,6 +75,7 @@ spec: } ] } + bucketOwner: "rgw-user" ``` 1. `name` of the `ObjectBucketClaim`. This name becomes the name of the Secret and ConfigMap. @@ -94,6 +95,7 @@ If both `bucketName` and `generateBucketName` are blank or omitted then the stor * `bucketMaxSize`: The maximum size of the bucket as an individual bucket quota. * `bucketPolicy`: A raw JSON format string that defines an AWS S3 format the bucket policy. If set, the policy string will override any existing policy set on the bucket and any default bucket policy that the bucket provisioner potentially would have automatically generated. * `bucketLifecycle`: A raw JSON format string that defines an AWS S3 format bucket lifecycle configuration. Note that the rules must be sorted by `ID` in order to be idempotent. + * `bucketOwner`: The name of a pre-existing ceph rgw user account that will own the bucket. A `CephObjectStoreUser` resource may be used to create an ceph rgw user account. If the bucket already exists and is owned by a different user, the bucket will be re-linked to the specified user. ### OBC Custom Resource after Bucket Provisioning diff --git a/pkg/operator/ceph/object/bucket/provisioner.go b/pkg/operator/ceph/object/bucket/provisioner.go index 9bd271b17a54..811c694856d6 100644 --- a/pkg/operator/ceph/object/bucket/provisioner.go +++ b/pkg/operator/ceph/object/bucket/provisioner.go @@ -66,6 +66,7 @@ type additionalConfigSpec struct { bucketMaxSize *int64 bucketPolicy *string bucketLifecycle *string + bucketOwner *string } var _ apibkt.Provisioner = &Provisioner{} @@ -89,15 +90,22 @@ func (p Provisioner) GenerateUserID(obc *bktv1alpha1.ObjectBucketClaim, ob *bktv func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBucket, error) { logger.Debugf("Provision event for OB options: %+v", options) - err := p.initializeCreateOrGrant(options) + additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to process additionalConfig") + } + + bucket := &bucket{provisioner: &p, options: options, additionalConfig: additionalConfig} + + err = p.initializeCreateOrGrant(bucket) if err != nil { return nil, err } logger.Infof("Provision: creating bucket %q for OBC %q", p.bucketName, options.ObjectBucketClaim.Name) - p.accessKeyID, p.secretAccessKey, err = p.createCephUser(options.UserID) + p.accessKeyID, p.secretAccessKey, err = bucket.getUserCreds() if err != nil { - return nil, errors.Wrap(err, "Provision: can't create ceph user") + return nil, errors.Wrapf(err, "unable to get user %q creds", p.cephUserName) } err = p.setS3Agent() @@ -115,27 +123,31 @@ func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.Obje if !bucketExists { // if bucket already exists, this returns error: TooManyBuckets because we set the quota // below. If it already exists, assume we are good to go - logger.Debugf("creating bucket %q", p.bucketName) + logger.Debugf("creating bucket %q owned by user %q", p.bucketName, p.cephUserName) err = p.s3Agent.CreateBucket(p.bucketName) if err != nil { return nil, errors.Wrapf(err, "error creating bucket %q", p.bucketName) } - } else if owner != options.UserID { - return nil, errors.Errorf("bucket %q already exists and is owned by %q for different OBC", p.bucketName, owner) + } else if owner != p.cephUserName { + logger.Debugf("bucket %q already exists and is owned by user %q instead of user %q, relinking...", p.bucketName, owner, p.cephUserName) + + err = p.adminOpsClient.LinkBucket(p.clusterInfo.Context, admin.BucketLinkInput{Bucket: p.bucketName, UID: p.cephUserName}) + if err != nil { + return nil, errors.Wrapf(err, "failed to link bucket %q to user %q", p.bucketName, p.cephUserName) + } } else { logger.Debugf("bucket %q already exists", p.bucketName) } - singleBucketQuota := 1 - _, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &singleBucketQuota}) - if err != nil { - return nil, errors.Wrapf(err, "failed to set user %q bucket quota to %d", p.cephUserName, singleBucketQuota) - } - logger.Infof("set user %q bucket max to %d", p.cephUserName, singleBucketQuota) - - additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig) - if err != nil { - return nil, errors.Wrap(err, "failed to process additionalConfig") + // is the bucket owner a provisioner generated user? + if p.isObcGeneratedUser(p.cephUserName, options.ObjectBucketClaim) { + // set user quota + singleBucketQuota := 1 + _, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &singleBucketQuota}) + if err != nil { + return nil, errors.Wrapf(err, "failed to set user %q bucket quota to %d", p.cephUserName, singleBucketQuota) + } + logger.Infof("set user %q bucket max to %d", p.cephUserName, singleBucketQuota) } err = p.setAdditionalSettings(additionalConfig) @@ -143,7 +155,7 @@ func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.Obje return nil, errors.Wrapf(err, "failed to set additional settings for OBC %q in NS %q associated with CephObjectStore %q in NS %q", options.ObjectBucketClaim.Name, options.ObjectBucketClaim.Namespace, p.objectStoreName, p.clusterInfo.Namespace) } - return p.composeObjectBucket(), nil + return p.composeObjectBucket(bucket), nil } // Grant attaches to an existing rgw bucket and returns a connection info @@ -151,8 +163,15 @@ func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.Obje func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBucket, error) { logger.Debugf("Grant event for OB options: %+v", options) + additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to process additionalConfig") + } + + bucket := &bucket{provisioner: &p, options: options, additionalConfig: additionalConfig} + // initialize and set the AWS services and commonly used variables - err := p.initializeCreateOrGrant(options) + err = p.initializeCreateOrGrant(bucket) if err != nil { return nil, err } @@ -164,17 +183,19 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu return nil, errors.Wrapf(err, "bucket %s does not exist", p.bucketName) } - // get or create ceph user - p.accessKeyID, p.secretAccessKey, err = p.createCephUser(options.UserID) + p.accessKeyID, p.secretAccessKey, err = bucket.getUserCreds() if err != nil { - return nil, errors.Wrap(err, "Provision: can't create ceph user") + return nil, errors.Wrapf(err, "unable to get user %q creds", p.cephUserName) } - // restrict creation of new buckets in rgw - restrictBucketCreation := 0 - _, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &restrictBucketCreation}) - if err != nil { - return nil, err + // is the bucket owner a provisioner generated user? + if p.isObcGeneratedUser(p.cephUserName, options.ObjectBucketClaim) { + // restrict creation of new buckets in rgw + restrictBucketCreation := 0 + _, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &restrictBucketCreation}) + if err != nil { + return nil, err + } } err = p.setS3Agent() @@ -182,11 +203,6 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu return nil, err } - additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig) - if err != nil { - return nil, errors.Wrap(err, "failed to process additionalConfig") - } - // setting quota limit if it is enabled err = p.setAdditionalSettings(additionalConfig) if err != nil { @@ -195,7 +211,7 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu if additionalConfig.bucketPolicy != nil { // if the user is managing the bucket policy, there's nothing else to do - return p.composeObjectBucket(), nil + return p.composeObjectBucket(bucket), nil } // generate the bucket policy if it isn't managed by the user @@ -230,7 +246,7 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu } // returned ob with connection info - return p.composeObjectBucket(), nil + return p.composeObjectBucket(bucket), nil } // Delete is called when the ObjectBucketClaim (OBC) is deleted and the associated @@ -245,8 +261,13 @@ func (p Provisioner) Delete(ob *bktv1alpha1.ObjectBucket) error { } logger.Infof("Delete: deleting bucket %q for OB %q", p.bucketName, ob.Name) - if err := p.deleteOBCResource(p.bucketName); err != nil { - return errors.Wrapf(err, "failed to delete OBCResource bucket %q", p.bucketName) + if err := p.deleteBucket(p.bucketName); err != nil { + return errors.Wrapf(err, "failed to delete bucket %q", p.bucketName) + } + + logger.Infof("Delete: deleting user %q for OB %q", p.bucketName, ob.Name) + if err := p.deleteOBUser(ob); err != nil { + return errors.Wrapf(err, "failed to delete user %q", p.cephUserName) } return nil } @@ -337,7 +358,7 @@ func (p Provisioner) Revoke(ob *bktv1alpha1.ObjectBucket) error { } // finally, delete the user - err = p.deleteOBCResource("") + err = p.deleteOBUser(ob) if err != nil { return errors.Wrapf(err, "failed to delete user %q", p.cephUserName) } @@ -348,12 +369,12 @@ func (p Provisioner) Revoke(ob *bktv1alpha1.ObjectBucket) error { // Return the OB struct with minimal fields filled in. // initializeCreateOrGrant sets common provisioner receiver fields and // the services and sessions needed to provision. -func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) error { +func (p *Provisioner) initializeCreateOrGrant(bucket *bucket) error { logger.Info("initializing and setting CreateOrGrant services") // set the bucket name - obc := options.ObjectBucketClaim - scName := options.ObjectBucketClaim.Spec.StorageClassName + obc := bucket.options.ObjectBucketClaim + scName := bucket.options.ObjectBucketClaim.Spec.StorageClassName sc, err := p.getStorageClassWithBackoff(scName) if err != nil { logger.Errorf("failed to get storage class for OBC %q in namespace %q. %v", obc.Name, obc.Namespace, err) @@ -364,7 +385,7 @@ func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) err // defines the bucket in the parameters, it's assumed to be a request to connect to a statically // created bucket. In these cases, we forego generating a bucket. Instead we connect a newly generated // user to the existing bucket. - p.setBucketName(options.BucketName) + p.setBucketName(bucket.options.BucketName) if bucketName, isStatic := isStaticBucket(sc); isStatic { p.setBucketName(bucketName) } @@ -398,10 +419,17 @@ func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) err return errors.Wrap(err, "failed to set admin ops api client") } - if len(options.UserID) == 0 { + if len(bucket.options.UserID) == 0 { return errors.Errorf("user ID for OBC %q is empty", obc.Name) } - p.cephUserName = options.UserID + + // override generated bucket owner name if an explicit name is set via additionalConfig["bucketOwner"] + if bucketOwner := bucket.additionalConfig.bucketOwner; bucketOwner != nil { + p.cephUserName = *bucketOwner + } else { + p.cephUserName = bucket.options.UserID + } + logger.Debugf("Using user %q for OBC %q", p.cephUserName, obc.Name) return nil } @@ -448,7 +476,7 @@ func (p *Provisioner) initializeDeleteOrRevoke(ob *bktv1alpha1.ObjectBucket) err } // Return the OB struct with minimal fields filled in. -func (p *Provisioner) composeObjectBucket() *bktv1alpha1.ObjectBucket { +func (p *Provisioner) composeObjectBucket(bucket *bucket) *bktv1alpha1.ObjectBucket { conn := &bktv1alpha1.Connection{ Endpoint: &bktv1alpha1.Endpoint{ @@ -472,6 +500,15 @@ func (p *Provisioner) composeObjectBucket() *bktv1alpha1.ObjectBucket { }, } + // bucketOwner will either match CephUser, indicating that it is an + // explicitly set name, or the key will be unset, indicating that either the + // provisioner created the user or a grant was made on a pre-existing bucket + // linked to a pre-existing user. Due to the semantics of lib-bucket, it + // isn't possible to determine if it was a pre-existing bucket. + if bucket.additionalConfig.bucketOwner != nil { + conn.AdditionalState["bucketOwner"] = *bucket.additionalConfig.bucketOwner + } + return &bktv1alpha1.ObjectBucket{ Spec: bktv1alpha1.ObjectBucketSpec{ Connection: conn, @@ -831,7 +868,6 @@ func (p *Provisioner) setTlsCaCert() error { return err } } - return nil } diff --git a/pkg/operator/ceph/object/bucket/provisioner_test.go b/pkg/operator/ceph/object/bucket/provisioner_test.go index 23812f1ab653..5aad6a9402cd 100644 --- a/pkg/operator/ceph/object/bucket/provisioner_test.go +++ b/pkg/operator/ceph/object/bucket/provisioner_test.go @@ -536,6 +536,12 @@ func TestProvisioner_additionalConfigSpecFromMap(t *testing.T) { assert.NoError(t, err) assert.Equal(t, additionalConfigSpec{bucketLifecycle: &(&struct{ s string }{"foo"}).s}, *spec) }) + + t.Run("bucketOwner field should be set", func(t *testing.T) { + spec, err := additionalConfigSpecFromMap(map[string]string{"bucketOwner": "foo"}) + assert.NoError(t, err) + assert.Equal(t, additionalConfigSpec{bucketOwner: &(&struct{ s string }{"foo"}).s}, *spec) + }) } func numberOfCallsWithValue(substr string, strs []string) int { diff --git a/pkg/operator/ceph/object/bucket/rgw-handlers.go b/pkg/operator/ceph/object/bucket/rgw-handlers.go index 193f24e48487..e786be1bcdc4 100644 --- a/pkg/operator/ceph/object/bucket/rgw-handlers.go +++ b/pkg/operator/ceph/object/bucket/rgw-handlers.go @@ -1,11 +1,55 @@ package bucket import ( + "regexp" + "strings" + "github.com/ceph/go-ceph/rgw/admin" "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" + bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" + apibkt "github.com/kube-object-storage/lib-bucket-provisioner/pkg/provisioner/api" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// The Provisioner struct is a mix of "long lived" fields for the provisioner +// object instantiated by lib-bucket-provisioner and "short lived" fields used +// for each Provision/Grant/Delete/Revoke call. The intent for bucket struct is +// to incrementally migrate "short lived" request fields to the bucket struct +// and eventually to migrate methods as appropriate. +type bucket struct { + provisioner *Provisioner + options *apibkt.BucketOptions + additionalConfig *additionalConfigSpec +} + +// Retrieve the s3 access credentials for the rgw user. The rgw user will be +// created if appropriate. +func (b *bucket) getUserCreds() (accessKeyID, secretAccessKey string, err error) { + p := b.provisioner + + if b.additionalConfig.bucketOwner == nil { + // get or create user + accessKeyID, secretAccessKey, err = p.createCephUser(p.cephUserName) + if err != nil { + err = errors.Wrapf(err, "unable to create Ceph object user %q", p.cephUserName) + return + } + } else { + // only get an existing user + var user admin.User + user, err = p.adminOpsClient.GetUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName}) + if err != nil { + err = errors.Wrapf(err, "Ceph object user %q not found", p.cephUserName) + return + } + accessKeyID = user.Keys[0].AccessKey + secretAccessKey = user.Keys[0].SecretKey + } + + return +} + func (p *Provisioner) bucketExists(name string) (bool, string, error) { bucket, err := p.adminOpsClient.GetBucketInfo(p.clusterInfo.Context, admin.Bucket{Bucket: name}) if err != nil { @@ -23,12 +67,12 @@ func (p *Provisioner) createCephUser(username string) (accKey string, secKey str if len(username) == 0 { return "", "", errors.Wrap(err, "no user name provided") } - p.cephUserName = username - logger.Infof("creating Ceph user %q", username) + logger.Infof("creating Ceph object user %q", username) + userConfig := admin.User{ ID: username, - DisplayName: p.cephUserName, + DisplayName: username, } var u admin.User @@ -37,14 +81,16 @@ func (p *Provisioner) createCephUser(username string) (accKey string, secKey str if errors.Is(err, admin.ErrNoSuchUser) { u, err = p.adminOpsClient.CreateUser(p.clusterInfo.Context, userConfig) if err != nil { - return "", "", errors.Wrapf(err, "failed to create ceph object user %v", userConfig.ID) + return "", "", errors.Wrapf(err, "failed to create Ceph object user %v", userConfig.ID) } } else { - return "", "", errors.Wrapf(err, "failed to get ceph user %q", username) + return "", "", errors.Wrapf(err, "failed to get Ceph object user %q", username) } + } else { + logger.Infof("Ceph object user %q already exists", username) } - logger.Infof("successfully created Ceph user %q with access keys", username) + logger.Infof("successfully created Ceph object user %q with access keys", username) return u.Keys[0].AccessKey, u.Keys[0].SecretKey, nil } @@ -62,33 +108,57 @@ func (p *Provisioner) genUserName(obc *v1alpha1.ObjectBucketClaim) string { // If delete user failed, error is no longer returned since its permission is // already revoked and hence user is no longer able to access the bucket // Empty string is passed for bucketName only if user needs to be removed, ex Revoke() -func (p *Provisioner) deleteOBCResource(bucketName string) error { - - logger.Infof("deleting Ceph user %q and bucket %q", p.cephUserName, bucketName) - if len(bucketName) > 0 { - // delete bucket with purge option to remove all objects - thePurge := true - err := p.adminOpsClient.RemoveBucket(p.clusterInfo.Context, admin.Bucket{Bucket: bucketName, PurgeObject: &thePurge}) - if err == nil { - logger.Infof("bucket %q successfully deleted", bucketName) - } else if errors.Is(err, admin.ErrNoSuchBucket) { - // opinion: "not found" is not an error - logger.Infof("bucket %q does not exist", bucketName) - } else if errors.Is(err, admin.ErrNoSuchKey) { - // ceph might return NoSuchKey than NoSuchBucket when the target bucket does not exist. - // then we can use GetBucketInfo() to judge the existence of the bucket. - // see: https://github.com/ceph/ceph/pull/44413 - _, err2 := p.adminOpsClient.GetBucketInfo(p.clusterInfo.Context, admin.Bucket{Bucket: bucketName, PurgeObject: &thePurge}) - if errors.Is(err2, admin.ErrNoSuchBucket) { - logger.Infof("bucket info %q does not exist", bucketName) - } else { - return errors.Wrapf(err, "failed to delete bucket %q (could not get bucket info)", bucketName) - } +func (p *Provisioner) deleteBucket(bucketName string) error { + logger.Infof("deleting Ceph bucket %q", bucketName) + // delete bucket with purge option to remove all objects + thePurge := true + err := p.adminOpsClient.RemoveBucket(p.clusterInfo.Context, admin.Bucket{Bucket: bucketName, PurgeObject: &thePurge}) + if err == nil { + logger.Infof("bucket %q successfully deleted", bucketName) + } else if errors.Is(err, admin.ErrNoSuchBucket) { + // opinion: "not found" is not an error + logger.Infof("bucket %q does not exist", bucketName) + } else if errors.Is(err, admin.ErrNoSuchKey) { + // ceph might return NoSuchKey than NoSuchBucket when the target bucket does not exist. + // then we can use GetBucketInfo() to judge the existence of the bucket. + // see: https://github.com/ceph/ceph/pull/44413 + _, err2 := p.adminOpsClient.GetBucketInfo(p.clusterInfo.Context, admin.Bucket{Bucket: bucketName, PurgeObject: &thePurge}) + if errors.Is(err2, admin.ErrNoSuchBucket) { + logger.Infof("bucket info %q does not exist", bucketName) } else { - return errors.Wrapf(err, "failed to delete bucket %q", bucketName) + return errors.Wrapf(err, "failed to delete bucket %q (could not get bucket info)", bucketName) } + } else { + return errors.Wrapf(err, "failed to delete bucket %q", bucketName) } - if len(p.cephUserName) > 0 { + + return nil +} + +// Delete the user *if it was created by OBC*. Will not delete externally +// managed users / users not created by obc. +func (p *Provisioner) deleteOBUser(ob *bktv1alpha1.ObjectBucket) error { + // construct a partial obc object with only the fields set needed by + // isObcGeneratedUser() & genUserName() + obc := &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: ob.Spec.ClaimRef.Name, + Namespace: ob.Spec.ClaimRef.Namespace, + UID: ob.Spec.ClaimRef.UID, + }, + } + + if ob.Spec.Connection.AdditionalState != nil && ob.Spec.Connection.AdditionalState["bucketOwner"] != "" { + obc.Spec.AdditionalConfig = map[string]string{ + "bucketOwner": ob.Spec.Connection.AdditionalState["bucketOwner"], + } + } + + // is the bucket owner a provisioner generated user? + if p.isObcGeneratedUser(p.cephUserName, obc) { + // delete the user + logger.Infof("deleting Ceph user %q", p.cephUserName) + err := p.adminOpsClient.RemoveUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName}) if err != nil { if errors.Is(err, admin.ErrNoSuchUser) { @@ -98,6 +168,36 @@ func (p *Provisioner) deleteOBCResource(bucketName string) error { } else { logger.Infof("user %q successfully deleted", p.cephUserName) } + } else { + // do not remove externally managed users + logger.Infof("Ceph user %q does not look like an obc generated user and will not be removed", p.cephUserName) } + return nil } + +// test a string to determine if is likely to be a user name generated by the provisioner +func (p *Provisioner) isObcGeneratedUser(userName string, obc *v1alpha1.ObjectBucketClaim) bool { + // If the user name string is the same as the explicitly set bucketOwner we will + // assume it is not a machine generated user name. + if obc.Spec.AdditionalConfig != nil && + obc.Spec.AdditionalConfig["bucketOwner"] == userName { + return false + } + + // current format + if userName == p.genUserName(obc) { + return true + } + + // historical format(s) + if strings.HasPrefix(userName, "obc-"+obc.Namespace+"-"+obc.Name) { + return true + } + + matched, err := regexp.MatchString("^ceph-user-[0-9A-Za-z]{8}", userName) + if err != nil { + logger.Errorf("regex match failed. %v", err) + } + return matched +} diff --git a/pkg/operator/ceph/object/bucket/rgw-handlers_test.go b/pkg/operator/ceph/object/bucket/rgw-handlers_test.go index a4292520f102..786160a1fe30 100644 --- a/pkg/operator/ceph/object/bucket/rgw-handlers_test.go +++ b/pkg/operator/ceph/object/bucket/rgw-handlers_test.go @@ -25,12 +25,14 @@ import ( "testing" "github.com/ceph/go-ceph/rgw/admin" + bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/clusterd" "github.com/rook/rook/pkg/daemon/ceph/client" cephobject "github.com/rook/rook/pkg/operator/ceph/object" "github.com/rook/rook/pkg/operator/test" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type statusError struct { @@ -39,7 +41,7 @@ type statusError struct { HostID string `json:"HostId,omitempty"` } -func TestDeleteOBCResource(t *testing.T) { +func TestDeleteBucket(t *testing.T) { clusterInfo := client.AdminTestClusterInfo("ns") p := NewProvisioner(&clusterd.Context{RookClientset: rookclient.NewSimpleClientset(), Clientset: test.New(t, 1)}, clusterInfo) mockClient := func(errCodeRemoveBucket string, errCodeGetBucketInfo string) *cephobject.MockClient { @@ -70,7 +72,7 @@ func TestDeleteOBCResource(t *testing.T) { adminClient, err := admin.New("rook-ceph-rgw-my-store.mycluster.svc", "53S6B9S809NUP19IJ2K3", "1bXPegzsGClvoGAiJdHQD1uOW2sQBLAZM9j9VtXR", mockClient("NoSuchBucket", "")) assert.NoError(t, err) p.adminOpsClient = adminClient - err = p.deleteOBCResource("bucket") + err = p.deleteBucket("bucket") assert.NoError(t, err) }) @@ -78,7 +80,7 @@ func TestDeleteOBCResource(t *testing.T) { adminClient, err := admin.New("rook-ceph-rgw-my-store.mycluster.svc", "53S6B9S809NUP19IJ2K3", "1bXPegzsGClvoGAiJdHQD1uOW2sQBLAZM9j9VtXR", mockClient("NoSuchKey", "NoSuchBucket")) assert.NoError(t, err) p.adminOpsClient = adminClient - err = p.deleteOBCResource("bucket") + err = p.deleteBucket("bucket") assert.NoError(t, err) }) @@ -86,7 +88,127 @@ func TestDeleteOBCResource(t *testing.T) { adminClient, err := admin.New("rook-ceph-rgw-my-store.mycluster.svc", "53S6B9S809NUP19IJ2K3", "1bXPegzsGClvoGAiJdHQD1uOW2sQBLAZM9j9VtXR", mockClient("NoSuchKey", "NoSuchKey")) assert.NoError(t, err) p.adminOpsClient = adminClient - err = p.deleteOBCResource("bucket") + err = p.deleteBucket("bucket") assert.Error(t, err) }) } + +func TestIsObcGeneratedUser(t *testing.T) { + clusterInfo := client.AdminTestClusterInfo("ns") + p := NewProvisioner(&clusterd.Context{RookClientset: rookclient.NewSimpleClientset(), Clientset: test.New(t, 1)}, clusterInfo) + + t.Run("does not match any format", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("quix", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + })) + }) + + t.Run("does not match any format or bucketOwner", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("quix", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + AdditionalConfig: map[string]string{ + "bucketOwner": "baz", + }, + }, + })) + }) + + t.Run("matches current format", func(t *testing.T) { + assert.True(t, p.isObcGeneratedUser("obc-bar-foo-6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + })) + }) + + t.Run("matches old format", func(t *testing.T) { + assert.True(t, p.isObcGeneratedUser("obc-bar-foo", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + })) + }) + + t.Run("matches really old format", func(t *testing.T) { + assert.True(t, p.isObcGeneratedUser("ceph-user-12345678", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + })) + }) + + t.Run("matches bucketOwner", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("quix", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + AdditionalConfig: map[string]string{ + "bucketOwner": "quix", + }, + }, + })) + }) + + t.Run("matches bucketOwner and current format", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("obc-bar-foo-6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + AdditionalConfig: map[string]string{ + "bucketOwner": "obc-bar-foo-6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + }, + })) + }) + + t.Run("matches bucketOwner and old format", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("obc-bar-foo", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + AdditionalConfig: map[string]string{ + "bucketOwner": "obc-bar-foo", + }, + }, + })) + }) + + t.Run("matches bucketOwner and really old format", func(t *testing.T) { + assert.False(t, p.isObcGeneratedUser("ceph-user-12345678", &bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + UID: "6e7c4d3f-3494-4dc1-90dc-58527fdf05d7", + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + AdditionalConfig: map[string]string{ + "bucketOwner": "ceph-user-12345678", + }, + }, + })) + }) +} diff --git a/pkg/operator/ceph/object/bucket/util.go b/pkg/operator/ceph/object/bucket/util.go index 28a268e02871..28953500c038 100644 --- a/pkg/operator/ceph/object/bucket/util.go +++ b/pkg/operator/ceph/object/bucket/util.go @@ -130,6 +130,11 @@ func additionalConfigSpecFromMap(config map[string]string) (*additionalConfigSpe spec.bucketLifecycle = &lifecycle } + if _, ok := config["bucketOwner"]; ok { + bucketOwner := config["bucketOwner"] + spec.bucketOwner = &bucketOwner + } + return &spec, nil } diff --git a/tests/integration/ceph_object_test.go b/tests/integration/ceph_object_test.go index 39b40e2b3cd6..edd212ccd61d 100644 --- a/tests/integration/ceph_object_test.go +++ b/tests/integration/ceph_object_test.go @@ -33,6 +33,7 @@ import ( "github.com/rook/rook/tests/framework/clients" "github.com/rook/rook/tests/framework/installer" "github.com/rook/rook/tests/framework/utils" + "github.com/rook/rook/tests/integration/object/bucketowner" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -152,6 +153,8 @@ func runObjectE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, install // now test operation of the first object store testObjectStoreOperations(s, helper, k8sh, namespace, storeName, swiftAndKeystone) + bucketowner.TestObjectBucketClaimBucketOwner(s.T(), k8sh, installer, logger, tlsEnable) + bucketNotificationTestStoreName := "bucket-notification-" + storeName createCephObjectStore(s.T(), helper, k8sh, installer, namespace, bucketNotificationTestStoreName, 1, tlsEnable, swiftAndKeystone) testBucketNotifications(s, helper, k8sh, namespace, bucketNotificationTestStoreName) diff --git a/tests/integration/object/bucketowner/bucketowner.go b/tests/integration/object/bucketowner/bucketowner.go new file mode 100644 index 000000000000..4422e83eb242 --- /dev/null +++ b/tests/integration/object/bucketowner/bucketowner.go @@ -0,0 +1,834 @@ +/* +Copyright 2025 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bucketowner + +import ( + "bufio" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "testing" + "time" + + "github.com/ceph/go-ceph/rgw/admin" + "github.com/coreos/pkg/capnslog" + "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" + bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1" + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/rook/rook/tests/framework/installer" + "github.com/rook/rook/tests/framework/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" +) + +var ( + defaultName = "test-bucket-owner" + + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName, + }, + } + + objectStore = &cephv1.CephObjectStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName, + // the CephObjectstore must be in the same ns as the CephCluster + Namespace: "object-ns", + }, + Spec: cephv1.ObjectStoreSpec{ + MetadataPool: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + }, + DataPool: cephv1.PoolSpec{ + Replicated: cephv1.ReplicatedSpec{ + Size: 1, + RequireSafeReplicaSize: false, + }, + }, + Gateway: cephv1.GatewaySpec{ + Port: 80, + Instances: 1, + }, + AllowUsersInNamespaces: []string{ns.Name}, + }, + } + + objectStoreSvc = &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: objectStore.Name, + Namespace: objectStore.Namespace, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "rook-ceph-rgw", + "rook_cluster": objectStore.Namespace, + "rook_object_store": objectStore.Name, + }, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(8080), + }, + }, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeNodePort, + }, + } + + storageClass = &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName, + }, + Provisioner: objectStore.Namespace + ".ceph.rook.io/bucket", + Parameters: map[string]string{ + "objectStoreName": objectStore.Name, + "objectStoreNamespace": objectStore.Namespace, + }, + } + + osu1 = cephv1.CephObjectStoreUser{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName + "-user1", + Namespace: ns.Name, + }, + Spec: cephv1.ObjectStoreUserSpec{ + Store: objectStore.Name, + ClusterNamespace: objectStore.Namespace, + }, + } + + osu2 = cephv1.CephObjectStoreUser{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName + "-user2", + Namespace: ns.Name, + }, + Spec: cephv1.ObjectStoreUserSpec{ + Store: objectStore.Name, + ClusterNamespace: objectStore.Namespace, + }, + } + + obc1 = bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName + "-obc1", + Namespace: ns.Name, + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + BucketName: defaultName + "-obc1", + StorageClassName: storageClass.Name, + AdditionalConfig: map[string]string{ + "bucketOwner": osu1.Name, + }, + }, + } + + obc2 = bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName + "-obc2", + Namespace: ns.Name, + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + BucketName: defaultName + "-obc2", + StorageClassName: storageClass.Name, + AdditionalConfig: map[string]string{ + "bucketOwner": osu1.Name, + }, + }, + } + + obcBogusOwner = bktv1alpha1.ObjectBucketClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultName + "-bogus-owner", + Namespace: ns.Name, + }, + Spec: bktv1alpha1.ObjectBucketClaimSpec{ + BucketName: defaultName, + StorageClassName: storageClass.Name, + AdditionalConfig: map[string]string{ + "bucketOwner": defaultName + "-bogus-user", + }, + }, + } +) + +func WaitForPodLogContainingText(k8sh *utils.K8sHelper, namespace string, selector *labels.Selector, text string, timeout time.Duration) error { + pods, err := k8sh.Clientset.CoreV1().Pods(namespace).List( + context.Background(), + metav1.ListOptions{ + LabelSelector: (*selector).String(), + }, + ) + if err != nil { + return fmt.Errorf("failed to list pods: %v", err) + } + + if len(pods.Items) == 0 { + return fmt.Errorf("no pods found with labels %v in namespace %q", selector, namespace) + + } + + // if there are multiple pods, just pick the first one + selectedPod := pods.Items[0] + log.Printf("Found pod %q (first match) with labels %v\n", selectedPod.Name, *selector) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req := k8sh.Clientset.CoreV1().Pods(namespace).GetLogs(selectedPod.Name, &corev1.PodLogOptions{}) + + logStream, err := req.Stream(ctx) + if err != nil { + return fmt.Errorf("Failed to stream logs from pod %q: %v", selectedPod.Name, err) + } + defer logStream.Close() + + scanner := bufio.NewScanner(logStream) + + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, text) { + break + } + } + // Check for scanner error (could be context timeout, etc.) + if err := scanner.Err(); err != nil { + return fmt.Errorf("Error reading log stream: %v", err) + } + + return nil +} + +func TestObjectBucketClaimBucketOwner(t *testing.T, k8sh *utils.K8sHelper, installer *installer.CephInstaller, logger *capnslog.PackageLogger, tlsEnable bool) { + t.Run("OBC bucketOwner", func(t *testing.T) { + if tlsEnable { + // There is lots of coverage of rgw working with tls enabled; skipping to save time. + // If tls is to be enabled, cert generation needs to be added and a + // different CephObjectStore name needs to be set for with/without tls as + // CephObjectStore does not currently cleanup the rgw realm. + t.Skip("skipping test for TLS enabled clusters") + } + + var adminClient *admin.API + ctx := context.TODO() + + t.Run(fmt.Sprintf("create ns %q", ns.Name), func(t *testing.T) { + _, err := k8sh.Clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("create CephObjectStore %q", objectStore.Name), func(t *testing.T) { + objectStore, err := k8sh.RookClientset.CephV1().CephObjectStores(objectStore.Namespace).Create(ctx, objectStore, metav1.CreateOptions{}) + require.NoError(t, err) + + osReady := utils.Retry(180, time.Second, "CephObjectStore is Ready", func() bool { + liveOs, err := k8sh.RookClientset.CephV1().CephObjectStores(objectStore.Namespace).Get(ctx, objectStore.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + if liveOs.Status == nil { + return false + } + + // return liveOs.Status.Phase == "Ready" + return liveOs.Status.Phase == cephv1.ConditionReady + }) + require.True(t, osReady) + }) + + t.Run(fmt.Sprintf("create svc %q", objectStoreSvc.Name), func(t *testing.T) { + _, err := k8sh.Clientset.CoreV1().Services(objectStore.Namespace).Create(ctx, objectStoreSvc, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("create sc %q", storageClass.Name), func(t *testing.T) { + _, err := k8sh.Clientset.StorageV1().StorageClasses().Create(ctx, storageClass, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + // since this is an obc specific subtest we assume that CephObjectStoreUser + // is working and the rgw service state does not need to be inspected to + // confirm user creation. + t.Run(fmt.Sprintf("create CephObjectStoreUser %q", osu1.Name), func(t *testing.T) { + _, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Create(ctx, &osu1, metav1.CreateOptions{}) + require.NoError(t, err) + + // user creation may be slow right after rgw start up + osuReady := utils.Retry(120, time.Second, "CephObjectStoreUser is Ready", func() bool { + liveOsu, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Get(ctx, osu1.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + if liveOsu.Status == nil { + return false + } + + return liveOsu.Status.Phase == "Ready" + }) + require.True(t, osuReady) + }) + + t.Run(fmt.Sprintf("create obc %q with bucketOwner %q", obc1.Name, obc1.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Create(ctx, &obc1, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q has bucketOwner %q set", obc1.Name, obc1.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + var liveObc *bktv1alpha1.ObjectBucketClaim + obcBound := utils.Retry(40, time.Second, "OBC is Bound", func() bool { + var err error + liveObc, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + return liveObc.Status.Phase == bktv1alpha1.ObjectBucketClaimStatusPhaseBound + }) + require.True(t, obcBound) + + // verify that bucketOwner is set on the live obc + assert.Equal(t, obc1.Spec.AdditionalConfig["bucketOwner"], liveObc.Spec.AdditionalConfig["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("ob for obc %q has bucketOwner %q set", obc1.Name, obc1.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + var liveOb *bktv1alpha1.ObjectBucket + obBound := utils.Retry(40, time.Second, "OB is Bound", func() bool { + var err error + liveOb, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + if err != nil { + return false + } + + return liveOb.Status.Phase == bktv1alpha1.ObjectBucketStatusPhaseBound + }) + require.True(t, obBound) + + // verify that bucketOwner is set on the live ob + assert.Equal(t, obc1.Spec.AdditionalConfig["bucketOwner"], liveOb.Spec.Connection.AdditionalState["bucketOwner"]) + }) + + // the rgw admin api is used to verify bucket ownership + t.Run("setup rgw admin api client", func(t *testing.T) { + err, output := installer.Execute("radosgw-admin", []string{"user", "info", "--uid=dashboard-admin", fmt.Sprintf("--rgw-realm=%s", objectStore.Name)}, objectStore.Namespace) + require.NoError(t, err) + + // extract api creds from json output + var userInfo map[string]interface{} + err = json.Unmarshal([]byte(output), &userInfo) + require.NoError(t, err) + + s3AccessKey, ok := userInfo["keys"].([]interface{})[0].(map[string]interface{})["access_key"].(string) + require.True(t, ok) + require.NotEmpty(t, s3AccessKey) + + s3SecretKey, ok := userInfo["keys"].([]interface{})[0].(map[string]interface{})["secret_key"].(string) + require.True(t, ok) + require.NotEmpty(t, s3SecretKey) + + // extract rgw endpoint from k8s svc + svc, err := k8sh.Clientset.CoreV1().Services(objectStore.Namespace).Get(ctx, objectStore.Name, metav1.GetOptions{}) + require.NoError(t, err) + + schema := "http://" + httpClient := &http.Client{} + + if tlsEnable { + schema = "https://" + httpClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + // nolint:gosec // skip TLS verification as this is a test + InsecureSkipVerify: true, + }, + } + } + s3Endpoint := schema + svc.Spec.ClusterIP + ":80" + + logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3Endpoint, s3AccessKey, s3SecretKey) + + adminClient, err = admin.New(s3Endpoint, s3AccessKey, s3SecretKey, httpClient) + require.NoError(t, err) + + // verify that admin api is working + _, err = adminClient.GetInfo(ctx) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("bucket created with owner %q", obc1.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + bucket, err := adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Spec.BucketName}) + require.NoError(t, err) + + assert.Equal(t, obc1.Spec.AdditionalConfig["bucketOwner"], bucket.Owner) + }) + + // obc should not modify pre-existing users + t.Run(fmt.Sprintf("no user quota was set on %q", osu1.Name), func(t *testing.T) { + liveQuota, err := adminClient.GetUserQuota(ctx, admin.QuotaSpec{UID: osu1.Name}) + require.NoError(t, err) + + assert.False(t, *liveQuota.Enabled) + assert.Equal(t, int64(-1), *liveQuota.MaxSize) + assert.Equal(t, int64(-1), *liveQuota.MaxObjects) + }) + + t.Run(fmt.Sprintf("create CephObjectStoreUser %q", osu2.Name), func(t *testing.T) { + // create user2 + _, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Create(ctx, &osu2, metav1.CreateOptions{}) + require.NoError(t, err) + + osuReady := utils.Retry(40, time.Second, "CephObjectStoreUser is Ready", func() bool { + liveOsu, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Get(ctx, osu2.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + if liveOsu.Status == nil { + return false + } + + return liveOsu.Status.Phase == "Ready" + }) + require.True(t, osuReady) + }) + + t.Run(fmt.Sprintf("update obc %q to bucketOwner %q", obc1.Name, osu2.Name), func(t *testing.T) { + // update obc bucketOwner + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + liveObc.Spec.AdditionalConfig["bucketOwner"] = osu2.Name + + _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Update(ctx, liveObc, metav1.UpdateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q has bucketOwner %q set", obc1.Name, osu2.Name), func(t *testing.T) { + // obc .Status.Phase does not appear to change when updating the obc + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + // verify that bucketOwner is set on the live obc + assert.Equal(t, osu2.Name, liveObc.Spec.AdditionalConfig["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("ob for obc %q has bucketOwner %q set", obc1.Name, osu2.Name), func(t *testing.T) { + // ob .Status.Phase does not appear to change when updating the obc + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + var liveOb *bktv1alpha1.ObjectBucket + inSync := utils.Retry(40, time.Second, "OB is Bound", func() bool { + var err error + liveOb, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + if err != nil { + return false + } + + return osu2.Name == liveOb.Spec.Connection.AdditionalState["bucketOwner"] + }) + require.True(t, inSync) + + // verify that bucketOwner is set on the live ob + assert.Equal(t, osu2.Name, liveOb.Spec.Connection.AdditionalState["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("bucket owner changed to %q", osu2.Name), func(t *testing.T) { + var bucket admin.Bucket + ownerSync := utils.Retry(40, time.Second, "bucket owner in sync", func() bool { + var err error + bucket, err = adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Name}) + if err != nil { + return false + } + + return bucket.Owner == osu2.Name + }) + assert.True(t, ownerSync) + assert.Equal(t, osu2.Name, bucket.Owner) + }) + + // obc should not modify pre-existing users + t.Run(fmt.Sprintf("no user quota was set on %q", osu2.Name), func(t *testing.T) { + liveQuota, err := adminClient.GetUserQuota(ctx, admin.QuotaSpec{UID: osu2.Name}) + require.NoError(t, err) + + assert.False(t, *liveQuota.Enabled) + assert.Equal(t, int64(-1), *liveQuota.MaxSize) + assert.Equal(t, int64(-1), *liveQuota.MaxObjects) + }) + + t.Run(fmt.Sprintf("remove obc %q bucketOwner", obc1.Name), func(t *testing.T) { + // update/remove obc bucketOwner + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + liveObc.Spec.AdditionalConfig = map[string]string{} + + _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Update(ctx, liveObc, metav1.UpdateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q has no bucketOwner", obc1.Name), func(t *testing.T) { + // verify that bucketOwner is unset on the live obc + notSet := utils.Retry(40, time.Second, "bucketOwner not set", func() bool { + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + _, ok := liveObc.Spec.AdditionalConfig["bucketOwner"] + return !ok + }) + assert.True(t, notSet) + }) + + t.Run(fmt.Sprintf("ob for obc %q has no bucketOwner", obc1.Name), func(t *testing.T) { + // ob .Status.Phase does not appear to change when updating the obc + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + notSet := utils.Retry(40, time.Second, "bucketOwner not set", func() bool { + liveOb, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + if err != nil { + return false + } + + _, ok := liveOb.Spec.Connection.AdditionalState["bucketOwner"] + return !ok + }) + assert.True(t, notSet) + }) + + // the ob should retain the existing owner and not revert to a generated user + t.Run(fmt.Sprintf("bucket owner is still %q", osu2.Name), func(t *testing.T) { + var bucket admin.Bucket + ownerSync := utils.Retry(40, time.Second, "bucket owner in sync", func() bool { + var err error + bucket, err = adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Name}) + if err != nil { + return false + } + + return bucket.Owner == osu2.Name + }) + assert.True(t, ownerSync) + assert.Equal(t, osu2.Name, bucket.Owner) + }) + + // this covers setting bucketOwner on an obc initially created without an explicit owner + t.Run(fmt.Sprintf("update obc %q to bucketOwner %q", obc1.Name, osu1.Name), func(t *testing.T) { + // update obc bucketOwner + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + liveObc.Spec.AdditionalConfig = map[string]string{"bucketOwner": osu1.Name} + + _, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Update(ctx, liveObc, metav1.UpdateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q has bucketOwner %q set", obc1.Name, osu1.Name), func(t *testing.T) { + // obc .Status.Phase does not appear to change when updating the obc + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + + // verify that bucketOwner is set on the live obc + assert.Equal(t, osu1.Name, liveObc.Spec.AdditionalConfig["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("ob for obc %q has bucketOwner %q set", obc1.Name, osu1.Name), func(t *testing.T) { + // ob .Status.Phase does not appear to change when updating the obc + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + var liveOb *v1alpha1.ObjectBucket + inSync := utils.Retry(40, time.Second, "OB is Bound", func() bool { + var err error + liveOb, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + if err != nil { + return false + } + + return osu1.Name == liveOb.Spec.Connection.AdditionalState["bucketOwner"] + }) + require.True(t, inSync) + + // verify that bucketOwner is set on the live ob + assert.Equal(t, osu1.Name, liveOb.Spec.Connection.AdditionalState["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("bucket owner changed to %q", osu1.Name), func(t *testing.T) { + var bucket admin.Bucket + ownerSync := utils.Retry(40, time.Second, "bucket owner in sync", func() bool { + var err error + bucket, err = adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Name}) + if err != nil { + return false + } + + return bucket.Owner == osu1.Name + }) + assert.True(t, ownerSync) + assert.Equal(t, osu1.Name, bucket.Owner) + }) + + t.Run(fmt.Sprintf("bucket owner changed to %q", osu1.Name), func(t *testing.T) { + var bucket admin.Bucket + ownerSync := utils.Retry(40, time.Second, "bucket owner in sync", func() bool { + var err error + bucket, err = adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Name}) + if err != nil { + return false + } + + return bucket.Owner == osu1.Name + }) + assert.True(t, ownerSync) + assert.Equal(t, osu1.Name, bucket.Owner) + }) + + t.Run(fmt.Sprintf("create obc %q with bucketOwner %q", obc2.Name, obc2.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Create(ctx, &obc2, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q has bucketOwner %q set", obc2.Name, obc2.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + var liveObc *v1alpha1.ObjectBucketClaim + obcBound := utils.Retry(40, time.Second, "OBC is Bound", func() bool { + var err error + liveObc, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc2.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + return liveObc.Status.Phase == v1alpha1.ObjectBucketClaimStatusPhaseBound + }) + require.True(t, obcBound) + + // verify that bucketOwner is set on the live obc + assert.Equal(t, obc2.Spec.AdditionalConfig["bucketOwner"], liveObc.Spec.AdditionalConfig["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("ob for obc %q has bucketOwner %q set", obc2.Name, obc2.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc2.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + var liveOb *v1alpha1.ObjectBucket + obBound := utils.Retry(40, time.Second, "OB is Bound", func() bool { + var err error + liveOb, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + if err != nil { + return false + } + + return liveOb.Status.Phase == v1alpha1.ObjectBucketStatusPhaseBound + }) + require.True(t, obBound) + + // verify that bucketOwner is set on the live ob + assert.Equal(t, obc2.Spec.AdditionalConfig["bucketOwner"], liveOb.Spec.Connection.AdditionalState["bucketOwner"]) + }) + + t.Run(fmt.Sprintf("bucket %q and %q share the same owner", obc1.Spec.BucketName, obc2.Spec.BucketName), func(t *testing.T) { + bucket1, err := adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc1.Spec.BucketName}) + require.NoError(t, err) + + bucket2, err := adminClient.GetBucketInfo(ctx, admin.Bucket{Bucket: obc2.Spec.BucketName}) + require.NoError(t, err) + + assert.Equal(t, obc1.Spec.AdditionalConfig["bucketOwner"], bucket1.Owner) + assert.Equal(t, obc1.Spec.AdditionalConfig["bucketOwner"], bucket2.Owner) + }) + + t.Run(fmt.Sprintf("delete obc %q", obc1.Name), func(t *testing.T) { + // lookup ob name + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + // delete obc + err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Delete(ctx, obc1.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + absent := utils.Retry(40, time.Second, "OBC is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc1.Name, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + + absent = utils.Retry(40, time.Second, "OB is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + }) + + t.Run(fmt.Sprintf("delete obc %q", obc2.Name), func(t *testing.T) { + // lookup ob name + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc2.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + // delete obc + err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Delete(ctx, obc2.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + absent := utils.Retry(40, time.Second, "OBC is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obc2.Name, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + + absent = utils.Retry(40, time.Second, "OB is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + }) + + t.Run(fmt.Sprintf("user %q was not deleted by obc %q", osu1.Name, obc1.Name), func(t *testing.T) { + user, err := adminClient.GetUser(ctx, admin.User{ID: osu1.Name}) + require.NoError(t, err) + + assert.Equal(t, osu1.Name, user.ID) + }) + + t.Run(fmt.Sprintf("user %q was not deleted by obc %q", osu2.Name, obc1.Name), func(t *testing.T) { + user, err := adminClient.GetUser(ctx, admin.User{ID: osu2.Name}) + require.NoError(t, err) + + assert.Equal(t, osu2.Name, user.ID) + }) + + // test obc creation with bucketOwner set to a non-existent user, which should fail + // "failure" means the obc remains in Pending state + t.Run(fmt.Sprintf("create obc %q with non-existent bucketOwner %q", obcBogusOwner.Name, obcBogusOwner.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Create(ctx, &obcBogusOwner, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("operator logs failed lookup for user %q", obcBogusOwner.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + selector := labels.SelectorFromSet(labels.Set{ + "app": "rook-ceph-operator", + }) + text := `error provisioning bucket: unable to get user \"test-bucket-owner-bogus-user\" creds: Ceph object user \"test-bucket-owner-bogus-user\" not found: NoSuchUser` + + err := WaitForPodLogContainingText(k8sh, "object-ns-system", &selector, text, 10*time.Second) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("obc %q stays Pending", obcBogusOwner.Name), func(t *testing.T) { + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcBogusOwner.Namespace).Get(ctx, obcBogusOwner.Name, metav1.GetOptions{}) + require.NoError(t, err) + + assert.True(t, v1alpha1.ObjectBucketClaimStatusPhasePending == liveObc.Status.Phase) + }) + + t.Run(fmt.Sprintf("user %q does not exist", obcBogusOwner.Spec.AdditionalConfig["bucketOwner"]), func(t *testing.T) { + _, err := adminClient.GetUser(ctx, admin.User{ID: obcBogusOwner.Spec.AdditionalConfig["bucketOwner"]}) + require.ErrorIs(t, err, admin.ErrNoSuchUser) + }) + + t.Run(fmt.Sprintf("delete obc %q", obcBogusOwner.Name), func(t *testing.T) { + // lookup ob name + liveObc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obcBogusOwner.Name, metav1.GetOptions{}) + require.NoError(t, err) + obName := liveObc.Spec.ObjectBucketName + + // delete obc + err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Delete(ctx, obcBogusOwner.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + absent := utils.Retry(40, time.Second, "OBC is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(ns.Name).Get(ctx, obcBogusOwner.Name, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + + absent = utils.Retry(40, time.Second, "OB is absent", func() bool { + _, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBuckets().Get(ctx, obName, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + }) + + t.Run(fmt.Sprintf("delete CephObjectStoreUser %q", osu2.Name), func(t *testing.T) { + err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Delete(ctx, osu2.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + absent := utils.Retry(40, time.Second, "CephObjectStoreUser is absent", func() bool { + _, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Get(ctx, osu2.Name, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + }) + + t.Run(fmt.Sprintf("delete CephObjectStoreUser %q", osu1.Name), func(t *testing.T) { + err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Delete(ctx, osu1.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + absent := utils.Retry(40, time.Second, "CephObjectStoreUser is absent", func() bool { + _, err := k8sh.RookClientset.CephV1().CephObjectStoreUsers(ns.Name).Get(ctx, osu1.Name, metav1.GetOptions{}) + return err != nil + }) + assert.True(t, absent) + }) + + t.Run(fmt.Sprintf("delete sc %q", storageClass.Name), func(t *testing.T) { + err := k8sh.Clientset.StorageV1().StorageClasses().Delete(ctx, storageClass.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("delete svc %q", objectStoreSvc.Name), func(t *testing.T) { + err := k8sh.Clientset.CoreV1().Services(objectStore.Namespace).Delete(ctx, objectStoreSvc.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("delete CephObjectStore %q", objectStore.Name), func(t *testing.T) { + err := k8sh.RookClientset.CephV1().CephObjectStores(objectStore.Namespace).Delete(ctx, objectStore.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + }) + + t.Run(fmt.Sprintf("delete ns %q", ns.Name), func(t *testing.T) { + err := k8sh.Clientset.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + }) + }) +}