Skip to content

Commit

Permalink
Merge pull request rook#15311 from jhoblitt/feature/obc-bucket-owner
Browse files Browse the repository at this point in the history
object: add obc bucketOwner
  • Loading branch information
BlaineEXE authored Feb 7, 2025
2 parents 8ca9186 + 9d21fe0 commit 1cf0a83
Show file tree
Hide file tree
Showing 8 changed files with 1,186 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ spec:
}
]
}
bucketOwner: "rgw-user"
```
1. `name` of the `ObjectBucketClaim`. This name becomes the name of the Secret and ConfigMap.
Expand All @@ -94,6 +95,7 @@ If both `bucketName` and `generateBucketName` are blank or omitted then the stor
* `bucketMaxSize`: The maximum size of the bucket as an individual bucket quota.
* `bucketPolicy`: A raw JSON format string that defines an AWS S3 format the bucket policy. If set, the policy string will override any existing policy set on the bucket and any default bucket policy that the bucket provisioner potentially would have automatically generated.
* `bucketLifecycle`: A raw JSON format string that defines an AWS S3 format bucket lifecycle configuration. Note that the rules must be sorted by `ID` in order to be idempotent.
* `bucketOwner`: The name of a pre-existing ceph rgw user account that will own the bucket. A `CephObjectStoreUser` resource may be used to create an ceph rgw user account. If the bucket already exists and is owned by a different user, the bucket will be re-linked to the specified user.

### OBC Custom Resource after Bucket Provisioning

Expand Down
124 changes: 80 additions & 44 deletions pkg/operator/ceph/object/bucket/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type additionalConfigSpec struct {
bucketMaxSize *int64
bucketPolicy *string
bucketLifecycle *string
bucketOwner *string
}

var _ apibkt.Provisioner = &Provisioner{}
Expand All @@ -89,15 +90,22 @@ func (p Provisioner) GenerateUserID(obc *bktv1alpha1.ObjectBucketClaim, ob *bktv
func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBucket, error) {
logger.Debugf("Provision event for OB options: %+v", options)

err := p.initializeCreateOrGrant(options)
additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to process additionalConfig")
}

bucket := &bucket{provisioner: &p, options: options, additionalConfig: additionalConfig}

err = p.initializeCreateOrGrant(bucket)
if err != nil {
return nil, err
}
logger.Infof("Provision: creating bucket %q for OBC %q", p.bucketName, options.ObjectBucketClaim.Name)

p.accessKeyID, p.secretAccessKey, err = p.createCephUser(options.UserID)
p.accessKeyID, p.secretAccessKey, err = bucket.getUserCreds()
if err != nil {
return nil, errors.Wrap(err, "Provision: can't create ceph user")
return nil, errors.Wrapf(err, "unable to get user %q creds", p.cephUserName)
}

err = p.setS3Agent()
Expand All @@ -115,44 +123,55 @@ func (p Provisioner) Provision(options *apibkt.BucketOptions) (*bktv1alpha1.Obje
if !bucketExists {
// if bucket already exists, this returns error: TooManyBuckets because we set the quota
// below. If it already exists, assume we are good to go
logger.Debugf("creating bucket %q", p.bucketName)
logger.Debugf("creating bucket %q owned by user %q", p.bucketName, p.cephUserName)
err = p.s3Agent.CreateBucket(p.bucketName)
if err != nil {
return nil, errors.Wrapf(err, "error creating bucket %q", p.bucketName)
}
} else if owner != options.UserID {
return nil, errors.Errorf("bucket %q already exists and is owned by %q for different OBC", p.bucketName, owner)
} else if owner != p.cephUserName {
logger.Debugf("bucket %q already exists and is owned by user %q instead of user %q, relinking...", p.bucketName, owner, p.cephUserName)

err = p.adminOpsClient.LinkBucket(p.clusterInfo.Context, admin.BucketLinkInput{Bucket: p.bucketName, UID: p.cephUserName})
if err != nil {
return nil, errors.Wrapf(err, "failed to link bucket %q to user %q", p.bucketName, p.cephUserName)
}
} else {
logger.Debugf("bucket %q already exists", p.bucketName)
}

singleBucketQuota := 1
_, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &singleBucketQuota})
if err != nil {
return nil, errors.Wrapf(err, "failed to set user %q bucket quota to %d", p.cephUserName, singleBucketQuota)
}
logger.Infof("set user %q bucket max to %d", p.cephUserName, singleBucketQuota)

additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to process additionalConfig")
// is the bucket owner a provisioner generated user?
if p.isObcGeneratedUser(p.cephUserName, options.ObjectBucketClaim) {
// set user quota
singleBucketQuota := 1
_, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &singleBucketQuota})
if err != nil {
return nil, errors.Wrapf(err, "failed to set user %q bucket quota to %d", p.cephUserName, singleBucketQuota)
}
logger.Infof("set user %q bucket max to %d", p.cephUserName, singleBucketQuota)
}

err = p.setAdditionalSettings(additionalConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to set additional settings for OBC %q in NS %q associated with CephObjectStore %q in NS %q", options.ObjectBucketClaim.Name, options.ObjectBucketClaim.Namespace, p.objectStoreName, p.clusterInfo.Namespace)
}

return p.composeObjectBucket(), nil
return p.composeObjectBucket(bucket), nil
}

// Grant attaches to an existing rgw bucket and returns a connection info
// representing the bucket's endpoint and user access credentials.
func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBucket, error) {
logger.Debugf("Grant event for OB options: %+v", options)

additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to process additionalConfig")
}

bucket := &bucket{provisioner: &p, options: options, additionalConfig: additionalConfig}

// initialize and set the AWS services and commonly used variables
err := p.initializeCreateOrGrant(options)
err = p.initializeCreateOrGrant(bucket)
if err != nil {
return nil, err
}
Expand All @@ -164,29 +183,26 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu
return nil, errors.Wrapf(err, "bucket %s does not exist", p.bucketName)
}

// get or create ceph user
p.accessKeyID, p.secretAccessKey, err = p.createCephUser(options.UserID)
p.accessKeyID, p.secretAccessKey, err = bucket.getUserCreds()
if err != nil {
return nil, errors.Wrap(err, "Provision: can't create ceph user")
return nil, errors.Wrapf(err, "unable to get user %q creds", p.cephUserName)
}

// restrict creation of new buckets in rgw
restrictBucketCreation := 0
_, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &restrictBucketCreation})
if err != nil {
return nil, err
// is the bucket owner a provisioner generated user?
if p.isObcGeneratedUser(p.cephUserName, options.ObjectBucketClaim) {
// restrict creation of new buckets in rgw
restrictBucketCreation := 0
_, err = p.adminOpsClient.ModifyUser(p.clusterInfo.Context, admin.User{ID: p.cephUserName, MaxBuckets: &restrictBucketCreation})
if err != nil {
return nil, err
}
}

err = p.setS3Agent()
if err != nil {
return nil, err
}

additionalConfig, err := additionalConfigSpecFromMap(options.ObjectBucketClaim.Spec.AdditionalConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to process additionalConfig")
}

// setting quota limit if it is enabled
err = p.setAdditionalSettings(additionalConfig)
if err != nil {
Expand All @@ -195,7 +211,7 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu

if additionalConfig.bucketPolicy != nil {
// if the user is managing the bucket policy, there's nothing else to do
return p.composeObjectBucket(), nil
return p.composeObjectBucket(bucket), nil
}

// generate the bucket policy if it isn't managed by the user
Expand Down Expand Up @@ -230,7 +246,7 @@ func (p Provisioner) Grant(options *apibkt.BucketOptions) (*bktv1alpha1.ObjectBu
}

// returned ob with connection info
return p.composeObjectBucket(), nil
return p.composeObjectBucket(bucket), nil
}

// Delete is called when the ObjectBucketClaim (OBC) is deleted and the associated
Expand All @@ -245,8 +261,13 @@ func (p Provisioner) Delete(ob *bktv1alpha1.ObjectBucket) error {
}
logger.Infof("Delete: deleting bucket %q for OB %q", p.bucketName, ob.Name)

if err := p.deleteOBCResource(p.bucketName); err != nil {
return errors.Wrapf(err, "failed to delete OBCResource bucket %q", p.bucketName)
if err := p.deleteBucket(p.bucketName); err != nil {
return errors.Wrapf(err, "failed to delete bucket %q", p.bucketName)
}

logger.Infof("Delete: deleting user %q for OB %q", p.bucketName, ob.Name)
if err := p.deleteOBUser(ob); err != nil {
return errors.Wrapf(err, "failed to delete user %q", p.cephUserName)
}
return nil
}
Expand Down Expand Up @@ -337,7 +358,7 @@ func (p Provisioner) Revoke(ob *bktv1alpha1.ObjectBucket) error {
}

// finally, delete the user
err = p.deleteOBCResource("")
err = p.deleteOBUser(ob)
if err != nil {
return errors.Wrapf(err, "failed to delete user %q", p.cephUserName)
}
Expand All @@ -348,12 +369,12 @@ func (p Provisioner) Revoke(ob *bktv1alpha1.ObjectBucket) error {
// Return the OB struct with minimal fields filled in.
// initializeCreateOrGrant sets common provisioner receiver fields and
// the services and sessions needed to provision.
func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) error {
func (p *Provisioner) initializeCreateOrGrant(bucket *bucket) error {
logger.Info("initializing and setting CreateOrGrant services")

// set the bucket name
obc := options.ObjectBucketClaim
scName := options.ObjectBucketClaim.Spec.StorageClassName
obc := bucket.options.ObjectBucketClaim
scName := bucket.options.ObjectBucketClaim.Spec.StorageClassName
sc, err := p.getStorageClassWithBackoff(scName)
if err != nil {
logger.Errorf("failed to get storage class for OBC %q in namespace %q. %v", obc.Name, obc.Namespace, err)
Expand All @@ -364,7 +385,7 @@ func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) err
// defines the bucket in the parameters, it's assumed to be a request to connect to a statically
// created bucket. In these cases, we forego generating a bucket. Instead we connect a newly generated
// user to the existing bucket.
p.setBucketName(options.BucketName)
p.setBucketName(bucket.options.BucketName)
if bucketName, isStatic := isStaticBucket(sc); isStatic {
p.setBucketName(bucketName)
}
Expand Down Expand Up @@ -398,10 +419,17 @@ func (p *Provisioner) initializeCreateOrGrant(options *apibkt.BucketOptions) err
return errors.Wrap(err, "failed to set admin ops api client")
}

if len(options.UserID) == 0 {
if len(bucket.options.UserID) == 0 {
return errors.Errorf("user ID for OBC %q is empty", obc.Name)
}
p.cephUserName = options.UserID

// override generated bucket owner name if an explicit name is set via additionalConfig["bucketOwner"]
if bucketOwner := bucket.additionalConfig.bucketOwner; bucketOwner != nil {
p.cephUserName = *bucketOwner
} else {
p.cephUserName = bucket.options.UserID
}
logger.Debugf("Using user %q for OBC %q", p.cephUserName, obc.Name)

return nil
}
Expand Down Expand Up @@ -448,7 +476,7 @@ func (p *Provisioner) initializeDeleteOrRevoke(ob *bktv1alpha1.ObjectBucket) err
}

// Return the OB struct with minimal fields filled in.
func (p *Provisioner) composeObjectBucket() *bktv1alpha1.ObjectBucket {
func (p *Provisioner) composeObjectBucket(bucket *bucket) *bktv1alpha1.ObjectBucket {

conn := &bktv1alpha1.Connection{
Endpoint: &bktv1alpha1.Endpoint{
Expand All @@ -472,6 +500,15 @@ func (p *Provisioner) composeObjectBucket() *bktv1alpha1.ObjectBucket {
},
}

// bucketOwner will either match CephUser, indicating that it is an
// explicitly set name, or the key will be unset, indicating that either the
// provisioner created the user or a grant was made on a pre-existing bucket
// linked to a pre-existing user. Due to the semantics of lib-bucket, it
// isn't possible to determine if it was a pre-existing bucket.
if bucket.additionalConfig.bucketOwner != nil {
conn.AdditionalState["bucketOwner"] = *bucket.additionalConfig.bucketOwner
}

return &bktv1alpha1.ObjectBucket{
Spec: bktv1alpha1.ObjectBucketSpec{
Connection: conn,
Expand Down Expand Up @@ -831,7 +868,6 @@ func (p *Provisioner) setTlsCaCert() error {
return err
}
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/operator/ceph/object/bucket/provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ func TestProvisioner_additionalConfigSpecFromMap(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, additionalConfigSpec{bucketLifecycle: &(&struct{ s string }{"foo"}).s}, *spec)
})

t.Run("bucketOwner field should be set", func(t *testing.T) {
spec, err := additionalConfigSpecFromMap(map[string]string{"bucketOwner": "foo"})
assert.NoError(t, err)
assert.Equal(t, additionalConfigSpec{bucketOwner: &(&struct{ s string }{"foo"}).s}, *spec)
})
}

func numberOfCallsWithValue(substr string, strs []string) int {
Expand Down
Loading

0 comments on commit 1cf0a83

Please sign in to comment.